On 8/21/06, Eugene Prokopiev <[EMAIL PROTECTED]> wrote:
Hi,

I need to use XA connection and XA session created from it in
separate threads. Example context looks like:

<beans>

        <bean id="broker" class="org.apache.activemq.broker.BrokerService"
init-method="start" destroy-method="stop">
                <property name="persistent" value="false"/>
                <property name="transportConnectorURIs">
                        <list>
                                <value>tcp://localhost:5000</value>
                        </list>
                </property>
        </bean>

        <bean id="jotm"
class="org.springframework.transaction.jta.JotmFactoryBean"/>
        <bean id="jotmTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
                <property name="userTransaction" ref="jotm"/>
        </bean>

        <bean id="connectionFactory"
class="org.apache.activemq.ActiveMQXAConnectionFactory">
                <property name="brokerURL" value="tcp://localhost:5000" />
        </bean>

        <bean id="messageReceiver"
class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean">
         <property name="transactionManager" ref="jotmTransactionManager"/>
         <property name="transactionAttributes">
             <props>
                 <prop key="*">PROPAGATION_REQUIRED</prop>
             </props>
         </property>
         <property name="target">
                <bean class="simple.MessageReceiverSimple">
                                <property name="jmsTemplate">
                                        <bean 
class="org.springframework.jms.core.JmsTemplate">
                                                <property name="connectionFactory" 
ref="connectionFactory"/>
                                                <property name="defaultDestinationName" 
value="messages.input"/>
                                        </bean>
                                </property>
                        </bean>
         </property>
                <property name="proxyTargetClass" value="true"/>
     </bean>

</beans>

MessageReceiverSimple.java is:

public class MessageReceiverSimple {

        private Log log = LogFactory.getLog(getClass());

        private JmsTemplate jmsTemplate;

        public void setJmsTemplate(JmsTemplate jmsTemplate) {
                this.jmsTemplate = jmsTemplate;
        }

        public void receive() {
                Thread readerThread = new Thread(new Runnable(){
                        public void run() {
                                while(!Thread.currentThread().isInterrupted()) {
                                        Message message = jmsTemplate.receive();
                                        log.debug(message);
                                }
                        }
                });
                readerThread.start();
        }

}

In this example code plain JMS API can be used instead of JmsTemplate
but it is not important in this case. Result will be the same.

On running this example I got:

javax.jms.JMSException: Session's XAResource has not been enlisted in a
distributed transaction.
        at
org.apache.activemq.ActiveMQXASession.doStartTransaction(ActiveMQXASession.java:109)
        at
org.apache.activemq.ActiveMQMessageConsumer.acknowledge(ActiveMQMessageConsumer.java:711)
        at
org.apache.activemq.ActiveMQMessageConsumer.dispose(ActiveMQMessageConsumer.java:572)
        at
org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:515)
        at
org.springframework.jms.support.JmsUtils.closeMessageConsumer(JmsUtils.java:105)
        at 
org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:714)
        at 
org.springframework.jms.core.JmsTemplate.doReceive(JmsTemplate.java:677)
        at 
org.springframework.jms.core.JmsTemplate$9.doInJms(JmsTemplate.java:635)
        at 
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:432)
        at 
org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:632)
        at 
org.springframework.jms.core.JmsTemplate.receive(JmsTemplate.java:619)
        at simple.MessageReceiverSimple$1.run(MessageReceiverSimple.java:22)
        at java.lang.Thread.run(Thread.java:595)

So, I write simple ActiveMQXAConnectionFactory wrapper for enlisting
Session's XAResource in distributed transaction:

public class ActiveMQXAConnectionFactory implements ConnectionFactory,
XAConnectionFactory {

        private XAConnectionFactory connectionFactory;
        private JtaTransactionManager transactionManager;

        public void setConnectionFactory(XAConnectionFactory connectionFactory) 
{
                this.connectionFactory = connectionFactory;
        }

        public void setTransactionManager(JtaTransactionManager
transactionManager) {
                this.transactionManager = transactionManager;
        }

        public Connection createConnection() throws JMSException {
                return createXAConnection();
        }

        public Connection createConnection(String userName, String password)
throws JMSException {
                return createXAConnection(userName, password);
        }

        public XAConnection createXAConnection() throws JMSException {
                XAConnection connection = 
connectionFactory.createXAConnection();
                return new ActiveMQXAConnection(connection, transactionManager);
        }

        public XAConnection createXAConnection(String userName, String
password) throws JMSException {
                XAConnection connection =
connectionFactory.createXAConnection(userName, password);
                return new ActiveMQXAConnection(connection, transactionManager);
        }

        public class ActiveMQXAConnection implements XAConnection {

                private XAConnection connection;
                private JtaTransactionManager transactionManager;

                public ActiveMQXAConnection(XAConnection connection,
JtaTransactionManager transactionManager) {
                        this.connection = connection;
                        this.transactionManager = transactionManager;
                }

                public Session createSession(boolean transacted, int 
acknowledgeMode)
throws JMSException {
                        return createXASession();
                }

                public XASession createXASession() throws JMSException {
                        XASession session = connection.createXASession();
                        try {
                                transactionManager.getUserTransaction().begin();

transactionManager.getTransactionManager().getTransaction().enlistResource(session.getXAResource());
                        } catch (Exception e) {
                                e.printStackTrace();
                        }
                        return session;
                }

                public void close() throws JMSException {
                        connection.close();
                }

                public ConnectionConsumer createConnectionConsumer(Destination 
arg0,
String arg1, ServerSessionPool arg2, int arg3) throws JMSException {
                        return connection.createConnectionConsumer(arg0, arg1, 
arg2, arg3);
                }

                public ConnectionConsumer createDurableConnectionConsumer(Topic 
arg0,
String arg1, String arg2, ServerSessionPool arg3, int arg4) throws
JMSException {
                        return connection.createDurableConnectionConsumer(arg0, 
arg1, arg2,
arg3, arg4);
                }

                public String getClientID() throws JMSException {
                        return connection.getClientID();
                }

                public ExceptionListener getExceptionListener() throws 
JMSException {
                        return connection.getExceptionListener();
                }

                public ConnectionMetaData getMetaData() throws JMSException {
                        return connection.getMetaData();
                }

                public void setClientID(String arg0) throws JMSException {
                        connection.setClientID(arg0);
                }

                public void setExceptionListener(ExceptionListener arg0) throws
JMSException {
                        connection.setExceptionListener(arg0);
                }

                public void start() throws JMSException {
                        connection.start();
                }

                public void stop() throws JMSException {
                        connection.stop();
                }

        }

}

On using it instead of org.apache.activemq.ActiveMQXAConnectionFactory
it I got:

INFO  BrokerService - ActiveMQ null JMS Message Broker (localhost) is
starting
INFO  BrokerService - For help or more information please see:
http://incubator.apache.org/activemq/
INFO  TransportServerThreadSupport - Listening for connections at:
tcp://prokopiev.stc.donpac.ru:5000
INFO  TransportConnector - Connector tcp://prokopiev.stc.donpac.ru:5000
Started
INFO  BrokerService - ActiveMQ JMS Message Broker (localhost,
ID:prokopiev.stc.donpac.ru-40533-1156158196797-0:0) started
INFO  jotm - JOTM started with a local transaction factory which is not
bound.
INFO  jotm - CAROL initialization
INFO  ConfigurationRepository - No protocols were defined for property
'carol.protocols', trying with default protocol = 'jrmp'.
INFO  jta - JOTM 2.0.10
INFO  JtaTransactionManager - Using JTA UserTransaction:
[EMAIL PROTECTED]
INFO  JtaTransactionManager - Using JTA TransactionManager:
[EMAIL PROTECTED]
INFO  ManagementContext - JMX consoles can connect to
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
INFO  DefaultAopProxyFactory - CGLIB2 available: proxyTargetClass
feature enabled
DEBUG MessageReceiverSimple - ActiveMQObjectMessage {commandId = 5,
responseRequired = true, messageId =
ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1:1,
originalDestination = null, originalTransactionId = null, producerId =
ID:prokopiev.stc.donpac.ru-40541-1156158200439-0:0:1:1, destination =
queue://messages.input, transactionId = null, expiration = 0, timestamp
= 1156158200912, arrival = 0, correlationId = null, replyTo = null,
persistent = true, type = null, priority = 4, groupID = null,
groupSequence = 0, targetConsumerId = null, compressed = false, userID =
null, content = [EMAIL PROTECTED],
marshalledProperties = null, dataStructure = null, redeliveryCounter =
0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody
= true}
INFO  jotm - set rollback only (tx=bb14:38:0:011d45be9b8fb301e8...44b402:)

So, my questions are:

1. My ActiveMQXAConnection.createXASession() implementation looks like
dirty hack and can't work propertly because transaction started but not
commited anywhere. What is the rigth place to start and commit/rollback
transaction?

Normally the JCA container does this. If you are using Spring then the
Spring Transaction Manager or Message Listener container shoudl do
this - not the connection factory


2. Is it possible to include similar ActiveMQXAConnectionFactory
implementation into ActiveMQ? It will be very useful for using with
Spring DefaultMessageListenerContainer for example.

I'd rather fix Spring's container to work with any JMS provider
properly than adding a dirty hack to ActiveMQ

Enlistment is the responsibility of the container - be it Jencks, MDB
container or Spring.


 Now JTA transactions
can't work with ActiveMQ/Spring/DefaultMessageListenerContainer.

You'd best ask the Spring guys - I'm not sure if the Spring container
supports JTA
--

James
-------
http://radio.weblogs.com/0112098/

Reply via email to