본 장에서는 JEUS MQ 클라이언트의 유형, 작성 절차 및 관리 객체에 대해서 설명한다.
JEUS MQ 클라이언트의 유형과 작성 절차에 대해 설명한다.
JMS 클라이언트는 메시지를 주고받는 역할에 따라 메시지 송신자와 메시지 수신자로 구분할 수 있으며 하나의 클라이언트가 2가지 역할을 모두 수행하기도 한다.
이와는 별도로 Java 애플리케이션을 배치하고 실행하는 방식에 따라 JEUS MQ 클라이언트를 다음과 같이 분류할 수 있다.
독립 애플리케이션
Java SE 환경에서 독립적으로 실행되는 클라이언트이다.
Java EE 애플리케이션
EJB(Enterprise JavaBeans)나 서블릿처럼 Java EE 서버에 deploy되어 실행되는 형태의 클라이언트다. JEUS에 Java EE 애플리케이션을 deploy하고 실행하는 방법에 대해서는 "JEUS Applications & Deployment 안내서"를 참고한다.
Message Driven Bean
EJB의 한 종류로 Java EE 애플리케이션의 특수한 경우라고 볼 수 있지만, 동작 방식에 있어서는 싱글 스레드 방식으로 작성된 Java SE 애플리케이션을 멀티 스레드 방식으로 확장해준다. Message Driven Bean(이하 MDB)에 대한 자세한 내용은 EJB 관련 서적이나 "Java EE Tutorials"를 참고한다.
JEUS에서 MDB를 사용하기 위한 설정 방법은 “JEUS EJB 안내서”의 “제9장 Message Driven Bean(MDB)”를 참고한다.
현재 JEUS MQ에서는 Java 이외의 언어로 작성된 클라이언트는 지원하지 않는다.
JEUS MQ 클라이언트의 유형 중 독립 애플리케이션의 경우를 기준으로 JEUS MQ 클라이언트를 작성하는 전형적인 과정에 대해 설명한다.
Java EE 환경에서 실행되는 클라이언트의 경우에는 XML Deployment Descriptor에 설정해서 초기 JNDI(Java Naming and Directory Interface) Lookup 과정을 대체할 수 있다.
Context context = new InitialContext();
JEUS의 JNDI 서비스를 사용하기 위한 환경설정 방법은 “2.2.1. JNDI 서비스 정의”를 참고한다.
JNDI Lookup을 통해 Connection Factory를 얻는다.
ConnectionFactory connectionFactory =
(ConnectionFactory) context.lookup("jms/ConnectionFactory");
JEUS MQ 전용 API를 사용하여 JNDI Lookup 없이 Connection Factory를 얻는 방법도 있다. 자세한 내용은 “2.2.2. Connection Factory”를 참고한다.
Connection Factory를 이용해 커넥션을 맺는다.
Connection connection = connectionFactory.createConnection();
커넥션으로부터 세션을 생성한다.
Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
JNDI Lookup을 통해 Destination을 얻는다.
Destination destination = (Destination) context.lookup("ExamplesQueue");
Connection Factory의 경우와 마찬가지로 JEUS MQ 전용 API를 사용하면 JNDI Lookup 없이 Destination을 얻을 수 있다. 이 내용은 “2.2.3. Destination”에서 설명한다.
메시지를 보내는 클라이언트라면 세션으로부터 메시지 송신자를 생성한다.
MessageProducer producer = session.createProducer(destination);
또는 메시지를 받길 원하는 경우 세션으로부터 메시지 수신자를 생성한다.
MessageConsumer consumer = session.createConsumer(destination);
메시지를 비동기적으로 수신하여 처리하려면 추가로 메시지 수신자에 MessageListener 인터페이스를 구현한 객체를 등록한다.
MessageListener listener = new MyMessageListener();
consumer.setMessageListener(listener);
커넥션을 시작한다.
connection.start();
비즈니스 로직을 수행하면서 메시지를 보내거나 받는다.
TextMessage message = session.createTextMessage("Hello, World"); producer.send(msg);
또는 동기적으로 메시지를 수신하는 경우 메시지 수신자의 receive() 메소드를 호출한다.
Message message = consumer.receive();
반면에 비동기적으로 메시지를 수신하는 경우 수신된 메시지는 단계 6에서 등록한 MessageListener 객체의 onMessage() 메소드에서 처리하게 된다.
모든 메시지 송수신이 완료되면 커넥션을 종료한다.
connection.close();
JMS 관리 객체(Administered Object)에는 Connection Factory와 Destination의 2가지 종류가 있다.
주로 JMS 서버의 설정에 의해 서버 내에서 생성되고 서버 관리자의 관리를 받는다. JMS 클라이언트는 JMS 서비스를 이용하기 위해 이 관리 객체(또는 그에 대한 Reference)들을 서버로부터 가져와 사용한다. 클라이언트가 JMS 관리 객체를 서버로부터 얻는 방법으로는 JNDI Lookup 방식이 가장 일반적이다.
본 절에서는 JEUS MQ 서버로부터 Connection Factory나 Destination의 Reference를 얻는 방법과 JNDI Lookup을 위해 클라이언트에서 JEUS JNDI 서비스를 정의하는 방법을 설명한다. 또한 클라이언트에서 API를 사용하여 JEUS MQ 서버에 동적으로 Destination을 생성하는 방법과 JEUS MQ 서버의 비정상 메시지 Destination(Dead Message Destination)에 대해서 설명한다.
JMS 관리 객체를 JNDI Lookup을 통해 얻기 위해서는 먼저 JNDI 서비스가 정의되어 있어야 한다.
JNDI 서비스는 다음의 3가지 프로퍼티로 정의할 수 있다.
java.naming.factory.initial
java.naming.factory.url.pkgs
java.naming.provider.url
정의된 3가지 프로퍼티는 다음의 방법으로 적용할 수 있다.
JNDI 프로퍼티 파일 작성
JNDI 서비스를 정의하는 가장 편리한 방법은 프로퍼티들의 값을 지정한 jndi.properties 파일을 작성하는 것이다.
다음은 JEUS의 JNDI 서비스를 사용하기 위한 환경을 기술한 jndi.properties 파일의 예제이다.
[예 2.1] <<jndi.properties>>
java.naming.factory.initial=jeus.jndi.JEUSContextFactory
java.naming.factory.url.pkgs=jeus.jndi.jns.url
java.naming.provider.url=127.0.0.1:9736
이와 같은 jndi.properties 파일을 작성하여 클래스 패스에 위치시키거나 클라이언트 프로그램을 배포할 때 JAR 파일에 클래스 파일들과 함께 포함시키면 InitialContext 객체가 jndi.properties 파일의 설정을 읽을 수 있다.
시스템 프로퍼티로 전달
클라이언트를 실행하는 시점에 Customizing이 필요한 상황에서는 JAR 파일에 포함된 jndi.properties 파일을 수정하기가 어렵다. 이때에는 클라이언트를 실행할 때 다음과 같이 시스템 프로퍼티로 설정값을 전달하는 방법을 사용할 수 있다.
java -Djava.naming.factory.initial=jeus.jndi.JEUSContextFactory \
-Djava.naming.factory.url.pkgs=jeus.jndi.jns.url \
-Djava.naming.provider.url=127.0.0.1:9736 \
. . .
애플릿 프로퍼티로 전달
JEUS MQ 클라이언트가 애플릿 형태로 되어 있을 때는 jndi.properties 파일을 작성하는 것보다 다음과 같이 HTML 파일에서 <applet> 태그에 <param> 태그를 사용하여 넘겨주는 방법이 Customizing에 유리하다.
<applet code="JeusMqApplet" width="640" height="480">
<param name="java.naming.factory.initial"
value="jeus.jndi.JEUSContextFactory"/>
<param name="java.naming.factory.url.pkgs" value="jeus.jndi.jns.url"/>
<param name="java.naming.provider.url" value="127.0.0.1:9736"/>
</applet>
애플릿 내에는 Applet.getParameter() 메소드를 사용하여 InitialContext 생성에 필요한 환경을 설정하는 코드를 다음과 같이 작성해야 한다.
Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, getParameter("java.naming.factory.initial")); env.put(Context.URL_PKG_PREFIXES, getParameter("java.naming.factory.url.pkgs")); env.put(Context.PROVIDER_URL, getParameter("java.naming.provider.url")); Context context = new InitialContext(env);
코드 삽입
클라이언트 코드 내에서 InitialContext 생성에 필요한 환경을 직접 설정하는 방법이다.
다음과 같이 프로퍼티들을 Hashtable 객체에 담아 InitialContext를 생성할 때 생성자에게 파라미터로 전달한다.
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, "jeus.jndi.JEUSContextFactory");
env.put(Context.URL_PKG_PREFIXES, "jeus.jndi.jns.url");
env.put(Context.PROVIDER_URL, "127.0.0.1:9736");
Context context = new InitialContext(env);
이 방법을 사용하면 다른 JNDI 서비스를 사용하거나 서비스 정의가 바뀔 때 소스 코드도 수정해야 하므로 유지보수가 어려워진다.
Connection Factory는 JEUS MQ 서버에 커넥션(Connection)을 맺는 데 필요한 정보를 가지고 있다.
일반적으로 JMS 클라이언트는 JNDI Lookup을 통해 Connection Factory를 얻고 이를 사용해서 JMS 서버로의 커넥션을 생성한다.
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("jms/ConnectionFactory"); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context.lookup("jms/QueueConnectionFactory"); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) context.lookup("jms/TopicConnectionFactory");
JEUS MQ의 분산 트랜잭션을 사용하려면 XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory도 위와 비슷하게 Lookup하여 사용할 수 있다.
Java EE 클라이언트에서는 InitialContext.lookup() 메소드를 호출하는 대신 @Resource Annotation을 사용해서 다음과 같이 Connection Factory를 가져오는 방법도 있다.
@Resource(mappedName="jms/ConnectionFactory")
private static ConnectionFactory connectionFactory;
JNDI 서비스를 이용할 수 없는 상황에서는 JEUS MQ가 제공하는 전용 API를 사용하여 JNDI Lookup 없이 Connection Factory를 얻을 수 있다.
jeus.jms.client.util.JeusConnectionFactoryCreator connectionFactoryCreator = new jeus.jms.client.util.JeusConnectionFactoryCreator(); connectionFactoryCreator.setFactoryName("ConnectionFactory"); connectionFactoryCreator.addBrokerAddress("127.0.0.1", 9741, "internal"); ConnectionFactory connectionFactory = (ConnectionFactory) connectionFactoryCreator.createConnectionFactory();
JNDI Lookup이나 @Resource Annotation을 사용할 때는 Connection Factory를 얻기 위해 Connection Factory의 JNDI 이름(본 절의 예에서는 "jms/ConnectionFactory")을 참조하지만 JeusConnectionFactoryCreator를 사용할 때에는 JEUS MQ 서버가 내부적으로 사용하는 Connection Factory 이름(마지막 예에서 "ConnectionFactory")으로 참조하고 있는 점에 유의한다. Connection Factory 이름과 JNDI 이름의 차이점에 대해서는 “3.3.3. Connection Factory 설정”의 예를 참고한다.
Destination은 서버에 의해 관리되는 메시지를 저장하는 공간으로 Queue와 Topic의 2가지 종류가 있다. JMS 클라이언트는 서버에서 관리하는 Destination 객체의 Reference를 얻어서 Destination에 메시지를 송신하거나 Destination으로부터 메시지를 수신한다.
JEUS MQ 클라이언트에서 Destination을 얻는 가장 일반적인 방법은 JEUS MQ 서버가 Destination Reference를 등록한 JNDI 서버를 통해 JNDI 이름으로 Lookup하는 것이다.
Queue queue = (Queue) context.lookup("jms/ExamplesQueue"); Topic topic = (Topic) context.lookup("jms/ExamplesTopic");
Java EE 클라이언트에서는 InitialContext.lookup() 메소드를 호출하는 대신 @Resource Annotation을 사용하여 다음과 같이 Destination Reference를 가져오는 방법도 있다.
@Resource(mappedName="jms/ExamplesQueue")
private static Queue queue;
Connection Factory와 마찬가지로 JEUS MQ 전용 API를 사용하면 JNDI Lookup 없이 Destination Reference를 얻을 수 있다. 이때 Destination을 참조하기 위해서는 JEUS MQ 서버가 내부적으로 사용하는 Destination 이름을 사용해야 한다.
jeus.jms.client.util.JeusDestinationCreator destinationCreator =
new jeus.jms.client.util.JeusDestinationCreator();
destinationCreator.setDestinationName("ExamplesQueue");
destinationCreator.setDestinationClass(Destination.class);
Destination destination = (Destination) destinationCreator.createDestination();
또는 JMS 구현체마다 다르게 구현하도록 되어 있는 Session.createQueue(String), Session.createTopic(String) 메소드를 사용할 수 있다. 앞의 예에서와 마찬가지로 실제 Destination 이름을 파라미터로 넘겨주어야 한다.
Queue queue = session.createQueue("ExamplesQueue"); Topic topic = session.createTopic("ExamplesTopic");
JEUS MQ는 클라이언트가 Session.createQueue(String), Session.createTopic(String) 메소드를 사용해서 서버에 동적으로 Destination을 생성할 수 있는 방법을 제공한다.
이때 파라미터로 전달하는 문자열은 생성하려는 Destination 이름 뒤에 물음표(?)를 붙이고, 설정하려는 옵션이 있는 경우 그 뒤에 "param=value" 형태로 원하는 설정을 나열한다. 옵션을 2개 이상 설정할 때에는 앰퍼샌드(&)를 구분자로 사용한다. param 및 value 값으로 domain.xml 환경 파일에 Destination을 설정할 때와 동일한 이름 및 값을 사용할 수 있다.
현재 Destination을 동적으로 생성할 때 설정할 수 있는 옵션은 Queue의 경우 export-name과 multiple-receiver, Topic의 경우 export-name이 있다.
다음의 예에서는 Destination 이름이 "DynamicQueue", JNDI 이름이 "jms/DynamicQueue"이고 동시에 2개 이상의 메시지 수신자를 허용하는 Queue를 동적으로 생성하고 있다.
QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) context.lookup("jms/QueueConnectionFactory"); QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = queueSession.createQueue( "DynamicQueue?export-name=jms/DynamicQueue&multiple-receiver=true");
이 API에는 다음과 같은 특징이 있다.
지정한 이름의 Destination이 서버에 이미 존재하거나 지정한 JNDI 이름으로 이미 다른 객체가 등록되어 있으면 JMSException이 발생한다. JNDI 이름을 설정하지 않은 경우 Destination 이름이 JNDI 이름으로 사용된다.
JEUS MQ 서버가 보안 기능을 사용하는 경우 커넥션을 생성할 때 사용한 Subject에게 "jeus.jms.destination.creation" 리소스에 대한 "createDestination" 권한이 없으면 JMSException이 발생한다.
그 밖의 설정은 Destination의 기본 설정을 따른다.
동적으로 생성한 Destination은 JEUS MQ 서버가 종료될 때 삭제된다.
위의 예에서 생성한 Destination은 WebAdmin에서 다음과 같이 설정한 Destination과 JEUS MQ 서버가 종료되기 전까지 동일하게 동작한다.
JEUS MQ 클러스터링을 사용하고 있을 때에는 Destination 동적 생성을 사용할 수 없다.
비정상 메시지 Destination은 특별한 이유로 정상적으로 처리되지 못한 메시지들을 보관하는 시스템 Destination이다. 메시지에 설정된 Destination을 찾을 수 없거나 지정한 횟수 이상 반복해서 메시지가 복구되는 경우가 여기에 포함된다.
메시지 수신자에게 전달된 메시지가 서버 측 Destination으로 복구되는 상황은 다음과 같다.
메시지 수신자에서 Session.rollback(), Session.recover() 등을 호출한 경우
XA 트랜잭션의 rollback과 같은 외부적인 요인에 의한 경우
Message Listener의 onMessage() 메소드를 수행하는 중에 예외가 발생한 경우 등
복구된 메시지는 메시지 수신자에게 다시 전달되는데 만일 비즈니스 로직이나 클라이언트의 오류로 인해 반복적으로 특정 메시지 처리에 실패한다면 계속해서 Destination에는 메시지가 쌓이고 클라이언트는 메시지를 수신하지 못하게 된다.
이런 문제를 피하기 위해 JEUS MQ는 메시지가 복구되는 최대 횟수를 제한할 수 있는 기능을 제공한다.
"JMS_JEUS_RedeliveryLimit" 메시지 프로퍼티 값을 설정하고 메시지를 송신한다.
>message.setIntProperty("JMS_JEUS_RedeliveryLimit", <integer value>)
메시지 프로퍼티의 기본값은 메시지 송신자 클라이언트를 실행할 때 다음과 같이 "jeus.jms.client.default-redelivery-limit" 시스템 프로퍼티를 설정하여 변경할 수 있다.
-Djeus.jms.client.default-redelivery-limit=<integer value>
별도로 설정하지 않으면 3이 사용된다. JEUS MQ 서버는 기동할 때 "JEUSMQ_DLQ"라는 이름의 비정상 메시지 Destination을 생성한다. 비정상 메시지 Destination에 쌓인 메시지는 보통 시스템 관리 툴에서 처리하지만 일반 클라이언트도 이 이름으로 비정상 메시지 Destination에 접근하여 원하는 작업을 할 수 있다.
JMS 클라이언트가 서버와 메시지를 주고받기 위해서는 서버와 커넥션을 맺고 세션을 생성해야 한다. 커넥션과 세션은 JMS에서 매우 중요한 클라이언트 리소스로 시스템의 성능 및 클라이언트가 장애 상황에 대처하는 방식에 영향을 미친다. JEUS MQ는 커넥션의 성능에 대한 몇 가지 옵션을 제공한다.
본 절에서는 이런 옵션들을 설정하는 방법과 옵션을 설정했을 때 JEUS MQ 클라이언트가 어떻게 동작하는지를 설명한다.
커넥션은 클라이언트가 JMS 서버와 작업하기 위해 최초로 생성하는 객체이며 JMS 서버와의 물리적 혹은 논리적 연결을 나타낸다. 커넥션은 Connection Factory의 createConnection() 메소드를 호출하여 생성한다.
public Connection createConnection() throws JMSException; public Connection createConnection(String userName, String password) throws JMSException;
JEUS MQ 서버가 보안 기능을 사용하는 경우 userName/password에 해당하는 Subject에게 "jeus.jms.client.connectionFactory" 리소스에 대한 "createConnection" 권한이 없으면 JMSException이 발생한다.
JEUS MQ 클라이언트를 실행할 때 다음과 같은 옵션을 시스템 프로퍼티로 설정할 수 있다.
-Djeus.jms.client.connect.timeout=<long value>
커넥션이 맺어질 때까지 ConnectionFactory.createConnection() 메소드가 대기하는 시간이다. 이 시간 동안 연결되지 않으면 ConnectionFactory.createConnection() 메소드에서 JMSException이 발생한다. 0으로 설정하면 연결에 성공할 때까지 무한 대기한다. (기본값: 5초, 단위: ms)
클라이언트 코드에 다음과 같은 문장을 추가하면 런타임에도 설정 변경이 가능하다.
System.setProperty("jeus.jms.client.connect.timeout", <long value>);
또는
System.setProperty(jeus.jms.common.JMSProperties.KEY_CLIENT_CONNECT_TIMEOUT,
<long value>);
일반적으로 애플리케이션이 메시지를 전송하는 과정은 다음과 같다.
커넥션 생성
세션 생성
Message Producer 생성
메시지 전송
커넥션 종료
만약 JMS 커넥션이 물리적 커넥션(Socket)과 1대 1의 관계를 갖고 있다면 위와 같은 사용 패턴에서는 메시지를 한 번 보낼 때마다 매번 새로운 물리적 연결을 맺어야 하므로 성능이 많이 저하된다. 실제 메시지를 전송하는 시간보다 물리적 연결을 맺기 위한 시간이 더 많이 걸리게 된다. 뿐만 아니라 물리적 연결이 증가하면 file descriptor의 사용이 많아져 IOException을 발생시키기도 하므로 안정성도 떨어진다.
JEUS v6.0 Fix#6 이후부터는 물리적 커넥션을 공유함으로서 1대 1 관계에서 성능, 안전성 상에 발생할 수 있는 문제들을 해결하였다. 그러나 일부 환경에서는 물리적 연결을 공유하는 것보다 매번 연결을 새로 생성하는 것이 나을 수 있으므로 이 기능은 클라이언트 옵션을 설정하여 적용할 수 있도록 하였다.
기본적으로 물리적 연결은 ConnectionFactory와 1대 1 관계를 갖으나 JEUS MQ 장애 극복을 설정하거나 "jeus.jms.client.use-single-server-entry" 시스템 속성을 false로 설정하는 경우 커넥션과 물리적 연결이 1대 1 관계가 된다. JEUS MQ 장애 극복에 관한 자세한 사항은 “제5장 JEUS MQ 장애 극복”을 참고한다.
다음은 물리적 연결 공유와 관련된 JVM 옵션들이다.
-Djeus.jms.client.use-single-server-entry=<boolean value>
ConnectionFactory 당 하나의 물리적 연결을 생성하고 커넥션끼리 공유할지 여부를 설정한다. 이 값을 false로 할 경우 커넥션과 물리적 연결은 1대 1 관계를 갖는다. (기본값: true)
클라이언트 코드에 다음과 같은 문장을 추가하면 런타임에도 설정 변경이 가능하다.
System.setProperty("jeus.jms.client.use-single-server-entry", <boolean value>);
또는
System.setProperty(jeus.jms.common.JMSProperties.USE_SINGLE_SERVER_ENTRY,
<boolean value>);
-Djeus.jms.client.single-server-entry.shutdown-delay=<long value>
이 값은 공유하고 있는 물리적 연결을 사용하는 커넥션들이 0개일 때 물리적 연결을 언제 끊을지를 결정한다. 물리적 연결을 JVM이 종료할 때까지 유지하지 않고 idle 시간에는 리소스를 시스템에 반환할 수 있도록 한다. (기본값: 600000(10분), 단위: ms)
클라이언트 코드에 다음과 같은 문장을 추가하면 런타임에도 설정 변경이 가능하다.
System.setProperty("jeus.jms.client.single-server-entry.shutdown-delay",
<long value>);
또는
System.setProperty(jeus.jms.common.JMSProperties.SINGLE_SERVER_ENTRY_SHUTDOWN_DELAY,
<long value>);
JEUS MQ 커넥션은 기반이 되는 JEUS 네트워크 라이브러리의 Ping 기능을 사용하여 자체적으로 일정 시간마다 커넥션의 상태를 확인하게 되어 있다. 만일 서버로 Ping 메시지를 보냈는데 정해진 제한시간 동안 서버로부터 응답이 오지 않거나 네트워크 오류가 발생하는 경우 연결이 끊어진 것으로 판단하여 커넥션 장애를 극복하기 위한 시도를 하게 된다.
JEUS 네트워크 라이브러리의 Ping 기능은 다음의 2가지 시스템 프로퍼티로 제어할 수 있다.
-Djeus.net.ping.period=<long value>
Ping 메시지를 보내는 주기를 0으로 설정하면 Ping 기능을 사용하지 않는다.
(기본값: 600000(10분), 단위: ms)
-Djeus.net.ping.timeout=<long value>
Ping 메시지의 응답을 기다리는 시간이다. (기본값: 60000(1분), 단위: ms)
JMS 세션은 메시지를 생성하여 Destination에 송신하거나 Destination으로부터 메시지를 수신하는 것과 같은 모든 메시징 작업의 기본 단위가 된다. 또한 세션은 JMS 클라이언트가 로컬 및 XA 트랜잭션에 참여하는 단위이기도 하다.
세션과 세션을 거치는 모든 작업들은 기본적으로 단일 스레드 컨텍스트에 의해 처리되어야 하며 이는 세션 객체 자체가 멀티 스레드에 안전하지 않다는 것을 의미한다.
JEUS MQ 클라이언트 라이브러리는 동시에 여러 스레드가 하나의 세션을 사용하는 경우에 대해 안전한 동작을 보장하지 않는다. 세션을 사용하여 동시에 실행되어야 하는 스레드의 수만큼 세션을 생성하여 사용할 것을 권장한다.
세션은 커넥션 객체로부터 다음 API를 사용하여 생성한다.
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException;
JMS 스펙은 다음의 4가지 ACKNOWLEDGE 모드를 정의하고 있다.
Session.AUTO_ACKNOWLEDGE = 1 Session.CLIENT_ACKNOWLEDGE = 2 Session.DUPS_OK_ACKNOWLEDGE = 3 Session.SESSION_TRANSACTED = 0
JEUS MQ는 메시지 송수신 성능을 극대화시켜야 하는 경우를 위해 추가적인 ACKNOWLEDGE 모드를 지원한다.
jeus.jms.JeusSession.NONE_ACKNOWLEDGE = -1
앞서 커넥션 공유에서 설명했듯이 일반적인 애플리케이션 사용 패턴에서는 Client Facility들을(Connection, Session, Message Producer) 반복적으로 생성하여 사용한다. 그런데 이런 객체들은 한 번 생성될 때마다 서버와 관리 메시지를 주고받게 되고 사용자 메시지를 한 번 보내기 위해서 실제로는 더 많은 수의 관리 메시지를 주고받게 되어 성능에 영향을 미친다.
이런 문제를 해결하기 위해서 JEUS MQ에서는 Client Facility들을 Pooling하는 기능을 제공한다. 이 기능은 트랜잭션이 아니거나 JEUS MQ 장애 극복이 설정되지 않은 환경에서 Message Producer를 사용하는 경우에만 적용되며 메시지 수신자나 트랜잭션이 사용될 때는 Pooling을 하지 않고 사용되었던 Client Facility들은 close할 때 바로 제거한다.
이 기능은 선택적으로 적용할 수 있으며 다음과 같은 클라이언트 JVM 옵션을 설정할 수 있다.
-Djeus.jms.client.use-pooled-connection-factory=<boolean value>
Client Facility Pooling을 사용할지 여부를 설정한다. (기본값: true)
-Djeus.jms.client.pooled-connection.check-period=<long value>
사용되지 않는 Pooling된 객체들을 삭제할 검사 주기를 설정한다. (기본값: 60000(1분), 단위: ms )
-Djeus.jms.client.pooled-connection.unused-timeout=<long value>
현재 Pooling된 객체들 중에 사용되지 않은 시간이 이 시간보다 길면 제거한다.
(기본값: 120000(2분), 단위: ms)
트랜잭션 처리된 세션을 제외하면 JMS의 기본 ACKNOWLEDGE 모드는 메시지를 수신하는 경우에만 의미가 있지만 NONE_ACKNOWLEDGE 모드는 메시지 송신 동작에도 영향을 미친다.
이 ACKNOWLEDGE 모드를 사용하면 성능은 높아지지만 JMS의 중요한 특징 중 하나인 신뢰성있는 메시지의 송수신은 어렵게 된다. 그러므로 해당 세션을 통해 클라이언트가 다룰 메시지의 특성과 성능 및 신뢰도 요구 사항, 그리고 기타 주변 상황을 고려하여 NONE_ACKNOWLEDGE 모드 사용 여부를 판단해야 한다.
FileMessage를 주고받거나 트랜잭션 처리된 세션 내에서는 NONE_ACKNOWLEDGE 모드도 AUTO_ACKNOWLEDGE 모드와 동일하게 동작한다.
클라이언트가 송신한 메시지가 JMS 서버에 도착했다는 것을 보장하기 위해서는 서버로부터 어떤 응답이 올 때까지 클라이언트 스레드는 MessageProducer.send() 메소드를 호출한 시점에서 멈춰 있어야 한다.
반면에 NONE_ACKNOWLEDGE 모드를 사용하면 MessageProducer.send() 메소드가 서버로 JMS 메시지를 전송한 뒤 바로 리턴하므로 대기시간을 줄일 수 있어 메시지 송신 성능을 높일 수 있다.
다음은 일반적인 경우와 NONE_ACKNOWLEDGE 모드일 때 메시지 송신 방식의 차이를 보여준다. 회색 상자는 서버에 메시지가 도착했음을 기록으로 남기는 작업을 나타낸다.
다음과 같은 경우 메시지가 유실될 수 있다.
클라이언트에서 서버로 메시지가 전송되는 과정에서 네트워크 장애가 발생하는 경우
JEUS MQ 서버에 도착한 메시지가 Destination에 추가되기 전에 JEUS MQ 서버가 장애를 일으키는 경우
이렇게 유실된 메시지는 메시지 전달 방식을 DeliveryMode.PERSISTENT로 지정한 경우에도 복구되지 않는다.
JMS에서는 메시지 전달을 보장하기 위해 메시지를 수신한 클라이언트로부터 메시지가 도착했다는 Acknowledge를 받을 때까지 서버에서 메시지 정보를 삭제하지 않는다. 이 방식은 메시징의 신뢰성은 높여주지만 네트워크 통신이 한 번 더 발생하고 JMS 서버가 메시지 상태를 복잡하게 관리해야 하므로 성능면에서는 좋지 않다.
세션의 ACKNOWLEDGE 모드를 NONE_ACKNOWLEDGE 모드로 설정하면 JEUS MQ 서버는 네트워크의 메시지를 수신하는 클라이언트로 JMS 메시지를 전송한 뒤 클라이언트의 Acknowledge 없이 서버에서 바로 메시지 정보를 삭제하므로 결과적으로 메시지 수신 속도를 더 빠르게 할 수 있다.
다음은 AUTO_ACKNOWLEDGE 모드일 때와 NONE_ACKNOWLEDGE 모드일 때 메시지 수신 방식의 차이를 보여준다. 회색 상자는 클라이언트에 전송한 메시지 정보를 서버에서 삭제하는 작업을 나타낸다.
다음과 같은 경우 메시지가 유실될 수 있다.
서버에서 클라이언트로 메시지가 전송되는 과정에서 네트워크 장애가 발생하는 경우
JEUS MQ 클라이언트 라이브러리에서 메시지를 처리하는 도중 클라이언트가 오류를 일으키는 경우
클라이언트가 등록한 MessageListener 객체의 onMessage() 메소드 내에서 예외가 발생하는 경우
유실된 메시지는 Session.recover() 메소드를 호출해도 다시 수신할 수 없다.
본 절에서는 JEUS MQ가 JMS 스펙 이외의 부가 기능을 제공하기 위해 JMS 메시지를 확장한 부분에 대해 설명한다.
JMS는 다음과 같은 메시지 헤더 필드들을 정의하고 있다.
JMSDestination
JMSDeliveryMode
JMSMessageID
JMSTimestamp
JMSCorrelationID
JMSReplyTo
JMSRedelivered
JMSType
JMSExpiration
JMSPriority
JEUS MQ는 메시지에 유일한 메시지 ID를 부여하므로 다음과 같은 기능은 제공하지 않는다.
MessageProducer.setDisableMessageID(boolean) 메소드를 사용하여 JMS 메시지에 메시지 ID를 할당하지 않는 기능
MessageProducer.setDisableTimestamp(boolean) 메소드를 사용하여 JMS 메시지에 Timestamp를 부여하지 않는 기능
관리자 설정에 의해 클라이언트에서 설정한 JMSDeliveryMode, JMSExpiration, JMSPriority 필드값을 덮어쓰는 기능
승인 모드(Acknowledge Mode)를 NONE_ACKNOWLEDGE 모드로 설정한 세션을 통해 메시지를 송신한 뒤 Message.getJMSMessageID()를 호출하면 메시지 ID가 null로 나타난다. 이는 MessageProducer.send() 메소드가 서버로부터 응답을 받기 전에 리턴하기 때문이다. 자세한 내용은 “2.3.6. NONE_ACKNOWLEDGE 모드”를 참고한다.
JMS는 다음과 같이 "JMSX?"로 시작하는 프로퍼티 이름을 정의하고 있다.
JMSXUserID
JMSXAppID
JMSXDeliveryCount
JMSXGroupID
JMSXGroupSeq
JMSXProducerTXID
JMSXConsumerTXID
JMSXRcvTimestamp
JMSXState
"JMSX?" 메시지 프로퍼티에 대한 지원은 필수 사항이 아니며, JEUS MQ에서는 이 프로퍼티들을 사용하지 않는다.
다음은 JEUS MQ가 지원하는 전용 메시지 프로퍼티이다.
JMS_JEUS_Schedule
JEUS MQ 서버가 메시지 수신자에게 메시지 전달을 미루는 시간이다. JEUS MQ 서버는 송신된 메시지가 서버에 도착한 시각(JMSTimestamp 필드값)으로부터 이 시간만큼이 지나기 전에는 메시지 수신자에게 메시지를 전달하지 않는다. 설정값은 long 형태로 설정한다. (단위: ms)
Message.setLongProperty("JMS_JEUS_Schedule", <long value>);
JMS_JEUS_Compaction
메시지 바디를 압축해서 전송할지 여부를 설정한다. true로 설정하면 네트워크를 통해 메시지를 전송할 때 ZLIB 라이브러리를 이용하여 메시지의 바디 부분을 압축한다.
Message.setBooleanProperty("JMS_JEUS_Compaction", <boolean value>);
JMS_JEUS_RedeliveryLimit
메시지를 수신자에 재전송할 수 있는 최대 횟수이다. 이 횟수를 넘기면 메시지는 더 이상 재전송되지 않고 비정상 메시지 Destination(절 2.2.3. “비정상 메시지 Destination”)에 저장된다.
Message.setIntProperty("JMS_JEUS_RedeliveryLimit", <integer value>);
JMS 스펙은 메시지 바디의 형태에 따라 5가지 타입의 메시지를 정의하고 있다.
각 타입의 메시지는 다음 API를 사용해서 세션 객체로부터 생성할 수 있다.
public StreamMessage createStreamMessage() throws JMSException; public MapMessage createMapMessage() throws JMSException; public TextMessage createTextMessage() throws JMSException; public TextMessage createTextMessage(String text) throws JMSException; public ObjectMessage createObjectMessage() throws JMSException; public ObjectMessage createObjectMessage(Serializable object) throws JMSException; public BytesMessage createBytesMessage() throws JMSException;
JEUS MQ는 JMS 기본 메시지 타입 외에 FileMessage를 추가로 지원한다.
JMS는 메시지 기반으로 동작하기 때문에 메모리에 메시지 내용이 모두 올라와 있어야만 메시지 송수신이 가능하다. 이는 메시지의 크기가 매우 클 경우 클라이언트나 서버에서 메모리 Overflow가 발생할 수 있으므로 문제가 된다.
JEUS MQ가 제공하는 FileMessage를 사용하면 파일의 내용을 블록 단위로 전송하기 때문에 이와 같은 문제를 피할 수 있다.
jeus.jms.JeusSession 클래스에 정의되어 있는 다음 메소드를 사용해 FileMessage를 생성한다.
public jeus.jms.FileMessage createFileMessage() throws javax.jms.JMSException; public jeus.jms.FileMessage createFileMessage(java.net.URL url) throws javax.jms.JMSException;
JEUS MQ 클라이언트 라이브러리를 통해 생성한 Session, QueueSession, TopicSession 객체는 각각 jeus.jms.JeusSession, jeus.jms.JeusQueueSession, jeus.jms.JeusTopicSesison으로 캐스팅할 수 있다.
jeus.jms.FileMessage 인터페이스 정의는 다음과 같다.
public interface FileMessage extends javax.jms.Message { public java.net.URL getURL(); public void setURL(java.net.URL url) throws javax.jms.MessageNotWriteableException; public boolean isURLOnly(); public void setURLOnly(boolean urlOnly); }
메시지를 송신하기 전에 setURL() 메소드로 전송할 파일의 URL을 지정할 수 있다. 또는 메시지를 생성할 때 JeusSession.createFileMessage() 메소드에 파라미터로 넘겨줘도 된다.
urlOnly 속성은 메시지 수신자에게 서버에 위치한 파일의 URL만 전송할지 여부를 결정한다. 이 속성값에 따라 수신한 FileMessage에 대해 getURL() 메소드를 호출했을 때 얻는 URL이 다르다.
[표 2.1] urlOnly 속성값에 따른 getURL() 값의 의미
urlOnly | getURL() |
---|---|
true | JEUS MQ 서버에 위치한 파일의 URL이다. 이 URL을 사용하여 HTTP, FTP와 같은 별도의 프로토콜을 통해 파일 수신이 가능하다. |
false | JEUS MQ 클라이언트 라이브러리가 파일의 내용까지 전송받아 로컬에 저장한 임시 파일의 URL이다. 자세한 내용은 임시 파일 저장 경로를 참고한다. |
FileMessage를 송신할 때 MessageProducer.send() 메소드는 항상(세션의 승인 모드(Acknowledge Mode)를 NONE_ACKNOWLEDGE로 사용하는 경우에도) 서버에 파일 내용이 모두 전송된 뒤에 리턴한다.
FileMessage에 포함된 파일은 기본적으로 4KB 단위로 나누어 전송된다. 이 블록 사이즈는 JEUS MQ 클라이언트를 실행할 때 "-Djeus.jms.file.blocksize=<integer value>"와 같이 시스템 프로퍼티를 설정해서 변경할 수 있다.
메시지 수신자가 FileMessage를 수신할 때 파일을 같이 전송받는 경우 전송받은 파일은 임시 파일 경로에 저장된다. 임시 파일 저장 경로는 다음 조건을 검사해서 결정한다.
JEUS에 deploy되어 있는 애플리케이션인 경우
SERVER_HOME/.workspace/client/
jeus.jms.client.workdir 시스템 프로퍼티가 설정되어 있는 경우
시스템 프로퍼티로 지정한 경로
그 밖의 경우
$USER_HOME/.jeusmq_client_work/
FileMessage를 전송하는 간단한 예를 통해 FileMessage API를 사용하는 과정을 설명한다.
다음의 Java 코드는 "/home/jeus/send_test/send.file"이라는 파일을 FileMessage를 사용하여 송신하는 예제이다.
[예 2.2] FileMessage 송신
. . .
jeus.jms.JeusSession session = (jeus.jms.JeusSession)
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
jeus.jms.FileMessage message = session.createFileMessage();
File file = new File("/home/jeus/send_test/send.file");
message.setURL(file.toURI.toURL());
producer.send(message);
. . .
다음은 수신한 파일의 URL로부터 InputStream을 얻어 파일 내용을 "/home/jeus/recv_test/recv.file"이라는 이름의 파일에 쓰는 예제이다.
[예 2.3] FileMessage 수신
. . .
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
if (message instanceof jeus.jms.FileMessage) {
URL url = ((jeus.jms.FileMessage) message).getURL();
if (url != null) {
InputStream inputStream = url.openStream();
BufferedInputStream bufInputStream = new BufferedInputStream(inputStream);
File outFile = new File("/home/jeus/recv_test/recv.file");
FileOutputStream fileOutputStream = new FileOutputStream(outFile);
BufferedOutputStream bufOutputStream =
new BufferedOutputStream(fileOutputStream);
int buf;
while ((buf = bufInputStream.read()) != -1) {
bufOutputStream.write(buf);
}
bufOutputStream.close();
bufInputStream.close();
}
}
. . .
본 절에서는 JEUS MQ가 지원하는 로컬 트랜잭션, 분산(XA) 트랜잭션에서 각각의 특성과 트랜잭션 적용 범위, 그리고 트랜잭션 처리에 있어서 클라이언트 개발자가 알아야 할 내용을 설명한다.
JMS 트랜잭션은 하나의 세션 내에서 메시지를 송신하고 수신하는 작업을 포함한다. JMS 스펙은 세션 내에서 시작, 완료되는 로컬 트랜잭션과 하나 이상의 JMS 세션 및 EJB나 JDBC 등의 기타 트랜잭션 리소스의 작업을 포함하는 분산 트랜잭션을 정의하고 있다.
트랜잭션에 참여하는 세션을 통해서 송신한 메시지는 실제 서버에 도착한 것으로 간주되지 않기 때문에 동일한 세션에서 생성한 메시지 수신자에 전달되지 않는다. 또한 동일한 세션을 이용해서 생성된 큐 브라우저를 통해서도 확인할 수 없다.
로컬 트랜잭션은 Connection.createSession(boolean transacted, int acknowledgeMode) 메소드에 transacted 파라미터 값을 true로 설정하고 생성한 세션에 의해 수행되며 이전에 commit이나 rollback이 수행된 시점 또는 처음 세션이 생성된 시점 이후로 해당 세션을 통해 이루어진 모든 메시징 작업들을 포함한다. 즉, 모든 작업은 어느 하나의 트랜잭션에 속하게 된다는 것을 의미한다.
하나의 로컬 트랜잭션 내에서 여러 개의 세션을 처리하는 것은 불가능하지만 하나 이상의 메시지 송신자를 생성하여 다수의 Destination에 메시지를 송신하는 것은 가능하다. 마찬가지로 다수의 Destination으로부터 메시지를 수신할 수 있다.
다음은 로컬 트랜잭션에 참여하는 작업들과 그 범위를 나타낸 것이다.
로컬 트랜잭션의 완료는 세션의 commit(), rollback() API를 사용해서 이루어진다. 트랜잭션 처리된 세션은 항상 특정 트랜잭션에 참가하고 있으므로 명시적으로 트랜잭션을 시작하는 API는 존재하지 않는다.
로컬 트랜잭션은 분산 트랜잭션과 달리 트랜잭션의 commit과 rollback이 클라이언트에 의해 직접 일어날 수 있으므로 비동기적인 메시지의 처리를 트랜잭션 내에서 하는 것이 가능하다.
JMS 스펙은 XASession이 제공하는 XAResource를 트랜잭션에 등록함으로써 XA 트랜잭션에 참여할 수 있다. 이는 JMS의 부가적인 스펙으로서 실제 구현 방식은 각 벤더에 따라 다르다.
JEUS MQ 클라이언트에서 XASession을 사용할 때에는 다음 사항에 주의해야 한다.
XASession에 대해 Session.getAcknowledgeMode()를 호출하면 이 XASession이 글로벌 트랜잭션에 참여하고 있는 경우 Session.SESSION_TRANSACTED를 리턴하고, 그렇지 않은 경우 Session.AUTO_ACKNOWLEDGE를 리턴한다.
글로벌 트랜잭션에 참여하고 있는 XASession에 대해 Session.commit()이나 Session.rollback()을 호출하면 TransactionInProgressException이나 IllegalStateException이 발생한다.
JEUS MQ 클라이언트 라이브러리는 XASession의 생성 시점과 무관하게 JMS API를 이용하는 스레드의 글로벌 트랜잭션에 XAResource를 등록시킨다. 이렇게 클라이언트 스레드의 트랜잭션이 전파되기 위해서는 명시적인 API 호출이 필요하다. 따라서 JEUS MQ의 분산 트랜잭션 참가는 동기적 형태의 API 호출을 통해서만 이루어진다. 메시지를 송신하거나 동기적으로 수신하는 경우가 이에 해당하며 MessageListener를 통해 비동기적인 메시지 수신을 분산 트랜잭션에 포함시켜서는 안 된다.
분산 트랜잭션 내에서 비동기적으로 수신한 메시지를 처리하고 싶다면 MDB를 사용할 수 있다. 자세한 내용은 “JEUS EJB 안내서”의 “제9장 Message Driven Bean(MDB)”을 참고한다.
JEUS MQ 서버는 트랜잭션 내에서 진행 중인 세션의 작업들을 스토리지에 보관하며 불의의 사고로 인한 서버의 재기동에도 해당 트랜잭션의 작업을 복구할 수 있다. 트랜잭션 매니저는 XASession에서 얻어진 XAResource를 통해서 JEUS MQ 내에 진행 중인 트랜잭션 ID들을 얻을 수 있으며, 이를 이용하여 해당 트랜잭션을 commit하거나 rollback할 수 있다.
트랜잭션 중에 장애가 발생할 경우 좀 더 빨리 장애를 극복하기 위해서는 JEUS MQ 장애 극복 기능을 사용할 것을 권장한다. 자세한 내용은 “제5장 JEUS MQ 장애 극복”을 참고한다.
JEUS MQ 서버는 In-Doubt 상태가 아닌 트랜잭션의 작업은 기동할 때 자동적으로 rollback 처리를 한다.