Chapter 2. JEUS MQ Client Programming

Table of Contents

2.1. Overview
2.2. JMS Administered Objects
2.2.1. Defining JNDI Services
2.2.2. Connection Factories
2.2.3. Destinations
2.3. Connections and Sessions
2.3.1. Creating Connections
2.3.2. Sharing Physical Connections
2.3.3. Connection Ping
2.3.4. Creating Sessions
2.3.5. Client Facility Pooling
2.3.6. NONE_ACKNOWLEDGE Mode
2.4. Messages
2.4.1. Message Header Field
2.4.2. Message Properties
2.4.3. Message Body
2.4.4. FileMessage
2.5. Transactions
2.5.1. Local Transactions
2.5.2. Distributed Transactions

This chapter describes JEUS MQ client types, procedures for creating a client, and administered objects.

This section discusses JEUS MQ client types and procedures for creating a client.

Client Types

There are two types of JMS clients, message producers and message consumers, and a single client can play both roles.

JEUS MQ clients can be divided into the following categories according to the methods used for Java application deployment and execution.

  • Independent applications

    Independently executed in the Java Standard Edition (SE) environment.

  • Message-driven beans

    A message-driven bean is a type of EJB. This allows a single-threaded Java SE application to run by using multiple threads concurrently. For more information about message-driven beans (hereafter, MDB), refer to EJB related books or Java EE Tutorials.

    For information about how to use MDB in JEUS, refer to JEUS EJB Guide. "Chapter 9. Message Driven Bean(MDB)".

Note

Currently, JEUS MQ does not support clients that are written in a language other than Java.

Steps for Creating a Client

The following are the steps for creating a JEUS MQ client using an independent application, a JEUS MQ client.

When the client program that runs in the Java EE environment is specified in the XML Deployment Descriptor, the lookup process of the initial Java Naming and Directory Interface (hereafter JNDI) is not required.

  1. Create a JNDI InitialContext.

    Context context = new InitialContext();

    The JEUS JNDI service configuration will be discussed in "2.2.1. Defining JNDI Services".

  2. Obtain a connection factory through JNDI.

    ConnectionFactory connectionFactory =
        (ConnectionFactory) context.lookup("jms/ConnectionFactory");

    A connection factory can be obtained by using the JEUS MQ API, without using JNDI lookup. For more information, refer to "2.2.2. Connection Factories".

  3. Create a connection through the obtained connection factory.

    Connection connection = connectionFactory.createConnection();
  4. Create a session from the connection.

    Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
  5. Look up a destination through JNDI.

    Destination destination = (Destination) context.lookup("ExamplesQueue");

    Like the connection factory, a destination can be obtained using JEUS MQ API, without using JNDI lookup. For more information, refer to "2.2.3. Destinations".

  6. To send a message, create a message producer using the session object.

    MessageProducer producer = session.createProducer(destination);

    To receive a message, create a message consumer using the session object.

    MessageConsumer consumer = session.createConsumer(destination);

    To asynchronously receive the message, register the object that implements the MessageListener interface in the message consumer.

    MessageListener listener = new MyMessageListener();
    consumer.setMessageListener(listener);
  7. Initiate the connection.

    connection.start();
  8. Send or receive messages to execute the business logic.

    Send a message through the message producer.

    TextMessage message = session.createTextMessage("Hello, World");
    producer.send(msg);

    To synchronously receive messages, invoke the receive() method of the message consumer.

    Message message = consumer.receive();

    To asynchronously receive messages, the received messages are processed by the onMessage() method of the MessageListener object registered in Step 6.

  9. After all messages have been sent and received, close the connection.

    connection.close();

There are two types of JMS administered objects, the connection factory and destination.

In general, they are created on the server through JMS server settings, and managed by the administrator. The JMS client obtains the objects or object references from the server in order to use the JMS Service. JNDI lookup is a typical way for the client to obtain the JMS administered objects from the server.

This section explains how to obtain a connection factory or destination reference from the JEUS MQ server, and how to implement JEUS JNDI service in the client programs to use JNDI lookup. In addition, it discusses how to dynamically create a destination in the JEUS MQ server by using the API, and describes the dead message destination of JEUS MQ server.

JNDI service should be defined first to obtain JMS administered objects through JNDI lookup.

JNDI service can be defined using the following three properties.

  • java.naming.factory.initial

  • java.naming.factory.url.pkgs

  • java.naming.provider.url

The three defined properties can be applied in the following ways.

A connection factory has useful information on creating a connection to the JEUS MQ server.

In general, the JMS client obtains a connection factory through JNDI lookup, and uses it to create a connection with the JMS server.

ConnectionFactory connectionFactory =
    (ConnectionFactory) context.lookup("jms/ConnectionFactory");
QueueConnectionFactory queueConnectionFactory =
    (QueueConnectionFactory) context.lookup("jms/QueueConnectionFactory");
TopicConnectionFactory topicConnectionFactory =
    (TopicConnectionFactory) context.lookup("jms/TopicConnectionFactory");

To use distributed transactions in JEUS MQ, look up XAConnectionFactory, XAQueueConnectionFactory, and XATopicConnectionFactory classes as in the following.

The Java EE client can obtain a connection factory by using the Resource annotation, instead of using the InitialContext.lookup() method.

@Resource(mappedName="jms/ConnectionFactory")
private static ConnectionFactory connectionFactory;

If the JNDI Service is not available, a connection factory can be obtained by using the API provided by JEUS MQ.

jeus.jms.client.util.JeusConnectionFactoryCreator connectionFactoryCreator =
    new jeus.jms.client.util.JeusConnectionFactoryCreator();
connectionFactoryCreator.setFactoryName("ConnectionFactory");
connectionFactoryCreator.addBrokerAddress("127.0.0.1", 9741, "internal");
ConnectionFactory connectionFactory =
    (ConnectionFactory) connectionFactoryCreator.createConnectionFactory();

There are two types of destinations, queue and topic, used as a storage for messages managed by the server. The JMS client obtains a reference to the destination object managed by the server in order to receive or send a message from/to a destination .

In general, the JEUS MQ client obtains a destination by looking up the JNDI name on the JNDI server where the destination references are registered.

Queue queue = (Queue) context.lookup("jms/ExamplesQueue");
Topic topic = (Topic) context.lookup("jms/ExamplesTopic");

The Java EE client can obtain a destination reference by using the Resource annotation, instead of using the InitialContext.lookup() method as in the following.

@Resource(mappedName="jms/ExamplesQueue")
private static Queue queue;

Like connection factories, JEUS MQ API can be used to obtain a destination reference without JNDI Lookup. To refer to the destination, use the destination name that is internally used by the JEUS MQ server.

jeus.jms.client.util.JeusDestinationCreator destinationCreator =
    new jeus.jms.client.util.JeusDestinationCreator();
destinationCreator.setDestinationName("ExamplesQueue");
destinationCreator.setDestinationClass(Destination.class);
Destination destination = (Destination) destinationCreator.createDestination();

It is also possible to use the Session.createQueue (string) and Session.createTopic (string) methods. They are implemented differently for each JMS implementation. As shown in the previous example, the actual destination name must be passed as a parameter.

Queue queue = session.createQueue("ExamplesQueue");
Topic topic = session.createTopic("ExamplesTopic");

Dynamic Creation of Destinations

JEUS MQ enables a client to dynamically create a destination on the server by using the Session.createQueue (string) and Session.createTopic (string) methods.

When specifying a destination string, append a "?" after the destination name. To use options, append each option using the form "param=value" after the "?". To set two or more options, use "&" as a delimiter. The destination parameters and values can be specified like those set in the domain.xml configuration file.

Note

Currently, there are two options used to dynamically create a destination, the export-name and multiple-receiver for a queue and the export-name for a topic.

The following example shows how to dynamically create a queue by using the destination name "Dynamic- Queue" and the JNDI name "jms/DynamicQueue". A queue can simultaneously receive two or more message consumers.

QueueConnectionFactory queueConnectionFactory =
    (QueueConnectionFactory) context.lookup("jms/QueueConnectionFactory");
QueueConnection queueConnection = queueConnectionFactory.createQueueConnection();
QueueSession queueSession =
    queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = queueSession.createQueue(
    "DynamicQueue?export-name=jms/DynamicQueue&multiple-receiver=true");

The API has the following features.

  • If the specified destination name already exists on the server, or another object is already registered as the specified JNDI name, a JMSException is thrown. If a JNDI name is not provided, the destination name is used as the JNDI name.

  • A connection needs to be created in order to enable the JEUS MQ server security function. If the subject used for creating a connection does not have "createDestination" permission on the "jeus.jms.destination.creation" resource, a JMSException is thrown.

  • Other settings follow the basic settings of the destination.

  • The dynamically created destinations are deleted when the JEUS MQ server shuts down.

Destinations can also be created using WebAdmin as in the following.


Note

A destination cannot be dynamically created when JEUS MQ clustering is enabled.

Dead Message Destination

Dead message destination is a system destination where failed messages are saved. A message may fail if the destination of the message could not be found, or the message has been recovered more than the allowed number of times.

The following are the cases when a message that is received by the consumer is recovered to the server-side destination.

  • The message consumer calls Session.rollback() or Session.recover().

  • Due to external reasons such as XA transaction rollback.

  • An exception occurs while executing the onMessage() method of the message listener.

The recovered message gets re-sent to the message consumer. However, if message processing fails repeatedly due to a business logic or client program error, messages will accumulate at the destination and the client cannot receive them.

To resolve this problem, JEUS MQ limits the number of times that a message can be recovered.

Set a value in the "JMS_JEUS_RedeliveryLimit" message property variable, and send the message.

>message.setIntProperty("JMS_JEUS_RedeliveryLimit", <integer value>)

The default message property value can be modified by setting the "jeus.jms.client.default-redelivery-limit" system property when executing the message producer client.

-Djeus.jms.client.default-redelivery-limit=<integer value>

If not set, 3 is used as the default value. When booting, the JEUS MQ server creates a dead message destination with the name "JEUSMQ_DLQ". In general, the messages saved in the dead message destination are processed using the system administration tool. General clients can also access and process the dead message destination by using this name.

When a JMS client exchanges messages with a server, it must connect to the server and create a session. The connections and the sessions are very important client resources in JMS that may affect the system performance and determine error response methods. JEUS MQ provides a number of options to enhance connection performance.

This section describes how to set the connection options and how the JEUS MQ client runs based on the specified options.

An application sends a message using the following steps.

  1. Create a connection

  2. Create a session

  3. Create a message producer

  4. Send a message

  5. Close a connection

If a JMS connection can be connected to only one physical connection (socket), performance is reduced because a new physical connection has to be created whenever a message is sent. Creating a physical connection takes longer than sending a message. The more physical connections are created, the more file descriptors are used which may cause IOExceptions to occur and disrupt system stability.

In a JEUS6 fix6 and later, physical connections can be shared to resolve performance and stability problems mentioned previously. However, in some environments, since it would be better to create a new physical connection each time instead of sharing connections, an option is provided to set this function.

In general, one physical connection is connected to each ConnectionFactory, but if JEUS MQ Failover is configured or "jeus.jms.client.use-single-server-entry" is set to "false", one physical connection is used for each connection. For more information about JEUS MQ Failover, refer to "Chapter 5. JEUS MQ Failover".

The following are JVM options related to physical connection sharing.

  • -Djeus.jms.client.use-single-server-entry=<boolean value>

    This option determines whether to create one physical connection for each ConnectionFactory and share it with other connections. The default value is true. If set to false, the physical connection is used for only one connection.

    To modify the setting at runtime, add either of the following statements to the client code.

    System.setProperty("jeus.jms.client.use-single-server-entry", <boolean value>);

    or

    System.setProperty(jeus.jms.common.JMSProperties.USE_SINGLE_SERVER_ENTRY,
                             <boolean value>);

  • -Djeus.jms.client.single-server-entry.shutdown-delay=<long value>

    This option determines when to disconnect the physical connection if it is not in use. Idle physical connections are not maintained but returned to the system. (Unit: milliseconds, default value: 600,000 (10 minutes))

    To modify the setting at runtime, add either of the following statements to the client code.

    System.setProperty("jeus.jms.client.single-server-entry.shutdown-delay",
                        <long value>);

    or

    System.setProperty(jeus.jms.common.JMSProperties.SINGLE_SERVER_ENTRY_SHUTDOWN_DELAY,
                       <long value>);

Except for transacted sessions, JMS message acknowledge modes are meaningful only when receiving messages, but the NONE_ACKNOWLEDGE mode also affects the sending of messages.

The acknowledge mode improves performance, but makes reliable messaging difficult. Therefore, when deciding whether to use the NONE_ACKNOWLEDGE mode, consider the message's characteristics, performance and reliability requirements, etc.

Note

When exchanging FileMessages or in transacted sessions, the NONE_ACKNOWLEDGE mode works like the AUTO_ACKNOWLEDGE mode.

Sending Messages

In general, after a client sends a message to the JMS server, the client thread calls the MessageProducer.send() method and suspends its operation until it receives a reply from the server.

In the NONE_ACKNOWLEDGE mode, MessageProducer.send() returns immediately after the client sends a JMS message to the server. This can improve message transmission performance reducing the client waiting time.

The following figure shows the difference in how a message is sent in general and in the NONE_ACKNOWLEDGE mode. The grey boxes indicate that the messages are logged when they arrive at the server.


A message may be lost in the following cases.

  • When a network failure occurs while a message is being transmitted from the client to the server.

  • When an error occurs on the JEUS MQ server before a message arrives at the JEUS MQ server and is added to the destination.

The lost message is not recovered even if the message transmission method is set to DeliveryMode.PERSISTENT.

Receiving Messages

A JMS server does not delete the message information until it receives an acknowledgement from the client that received the message. This can help improve the reliability of messaging, but is not good for performance because each acknowledgment message incurs network overhead.

In the NONE_ACKNOWLEDGE mode, a user can expect faster message transmission speed. This is because the JEUS MQ server sends a JMS message to the client and then immediately deletes the message information from the server without sending an acknowledgement message to the client.

The following figure shows how a message is received in the AUTO_ACKNOWLEDGE and NONE_ACKNOWLEDGE modes. The gray box indicates that a message is deleted from the server.


In the following cases, a message may be lost.

  • When a network error occurs while a message is being transmitted from the client to the server.

  • When an error occurs while the message is being processed by the JEUS MQ client library.

  • When an exception occurs in the onMessage() method of the MessageListener object registered by the client.

The lost message cannot be recovered by calling the Session.recover() method.

This section discusses the extended features of JMS messaging supported by JEUS MQ.

JEUS MQ supports the FileMessage type in addition to the basic JMS message types.

Since JMS runs based on the message content, the memory has to retain the message content when sending and receiving messages. This may cause memory overflow on the client or server if the message size is too big.

To avoid this problem, JEUS MQ supports the FileMessage type that sends message contents in block units.

Creating Messages

Create a FileMessage using the following method defined in the jeus.jms.JeusSession class.

public jeus.jms.FileMessage createFileMessage()
                                       throws javax.jms.JMSException;
public jeus.jms.FileMessage createFileMessage(java.net.URL url)
                                       throws javax.jms.JMSException;

The Session, QueueSession, and TopicSession objects created by the JEUS MQ client library can be casted to a jeus.jms.JeusSession, jeus.jms.JeusQueueSession, and jeus.jms.JeusTopicSesison objects respectively.

FileMessage Interface

The following is the definition of the jeus.jms.FileMessage interface.

public interface FileMessage extends javax.jms.Message {
    public java.net.URL getURL();
    public void setURL(java.net.URL url)
                throws javax.jms.MessageNotWriteableException;

    public boolean isURLOnly();
    public void setURLOnly(boolean urlOnly);    
}

To send a message, the URL of the message file can be set using the setURL() method. The file can also be passed to the JeusSession.createFileMessage() method as a parameter when the message is created.

The urlOnly property determines whether to only send the URL of the file on the server to the message consumer. This property determines the return value of getURL() that is called on the FileMessage object.


When sending a FileMessage, the MessageProducer.send() method always returns a value after all file contents are sent to the server. This is also applicable in the NONE_ACKNOWLEDGE mode.

Note

A file included in a FileMessage is divided into 4 KB blocks. The block size can be modified by specifying the system property like "-Djeus.jms.file.blocksize=<integer value>" when executing the JEUS MQ client.

Temporary File Path

If a message consumer receives a file in a FileMessage, the file is saved in the temporary file path. The path is specified according to the following conditions.

  • If JEUS_HOME is set:

    JEUS_HOME/logs/jms/<broker-name>/client/
  • If the jeus.jms.client.workdir system property is set:

    A path specified as the system property
  • Otherwise:

    USER_HOME/jeusmq_client_work/

FileMessage Transmission Example

This section shows how to use the FileMessage API to send a FileMessage.


The following Java code shows how to send the "/home/jeus/send_test/send.file" file using a FileMessage object.


The following example shows how to obtain an InputStream from the file's URL and write the file contents in the "D:/recv_test/recv.file" file.


This section discusses the features of local transactions in JEUS MQ and distributed(XA) transactions and the scope of the transactions. It also explains what a client developer needs to know to process a transaction.

JMS transaction involves the task of sending and receiving messages inside a session. JMS specification defines the local transaction that starts and ends inside a session. It also defines the distributed transaction that involves the processing of other resources such as one or more JMS sessions, EJBs, and JDBCs.

Since a message sent through a session that is involved in a transaction is not seen as having arrived at the server, the message does not get delivered to the message consumer that is created in the same session. They also do not appear in a queue browser that is created in the same session.

In the JMS specification, XAResource provided by XASession can be registered to participate in an XA transaction. The actual implementation may vary for each vendor.

The following points must be taken into consideration to use XASession in the JEUS MQ client.

  • When invoking Session.getAcknowledgeMode() on an XASession, if the XASession is participating in a global transaction it returns Session.SESSION_TRANSACTED, and otherwise it returns Session.AUTO_ACKNOWLEDGE.

  • If Session.commit() or Session.rollback() is called on an XASession participating in a global transaction, the TransactionInProgressException or IllegalStateException is triggered.

Propagation of Distributed Transactions

The JEUS MQ client library registers an XAResource in a global transaction of the thread that uses a JMS API, regardless of when the XASession was created. To propagate a transaction associated with the client thread, a specific API invocation is required. JEUS MQ's participation in a distributed transaction is made only through a synchronous API invocation, which includes the transmission or synchronous reception of messages. The messages asynchronously received through a MessageListener must not be included in a distributed transaction.

Recovery of Distributed Transactions

The JEUS MQ server stores the ongoing session tasks of a transaction in a storage in order to recover them when the server has to restart due to an unexpected error. The transaction manager can obtain the IDs of ongoing transactions in JEUS MQ through the XAResource obtained from the XASession, and use them to commit or roll back the transactions.

To quickly recover from a failure, it is recommend to use the JEUS MQ failover function. For more information, refer to "Chapter 5. JEUS MQ Failover".

When restarting, JEUS MQ server automatically rolls back the tasks of transactions that are not in an in-doubt state.