So I have discovered some issues with my code. The following code seems to be a working example with Atomikos (resource-context.xml) with Test Case (ServiceMixXaRollbackTest) using a ServiceMix 4.3 ActiveMq instance (activemq-broker.xml).
Unfortunately, on transaction rollback (and retry exhaustion) I am expecting my ActiveMq message to move to the Dead Letter Queue "DLQ.my_test_thirdparty" as specified in the activemq-broker.xml, since I don't override any message redelivery and dlq handling behaviour in Camel. Instead I am finding that the ActiveMq message lands in the queue "ActiveMQ.DLQ". What am I missing in my understanding on the behaviour of ActiveMq and Camel ? Updated code below. +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Start code snippet: "src/main/resources/META-INF/spring/resource-context.xml" +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi" xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0" xmlns:broker="http://activemq.apache.org/schema/core" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="transacted" value="true"/> <property name="transactionManager" ref="jtaTransactionManager"/> </bean> <bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> <property name="transacted" value="true"/> <property name="transactionManager" ref="jtaTransactionManager"/> </bean> <bean id="sql" class="org.apache.camel.component.sql.SqlComponent"> <property name="dataSource" ref="dataSource"/> </bean> <bean id="jdbc" class="org.apache.camel.component.jdbc.JdbcComponent"> <property name="dataSource" ref="dataSource"/> </bean> <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close" > <property name="forceShutdown" value="false" /> </bean> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" > <property name="transactionTimeout" value="150" /> </bean> <bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager" ref="atomikosTransactionManager" /> <property name="userTransaction" ref="atomikosUserTransaction" /> <property name="allowCustomIsolationLevels" value="true" /> </bean> <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> <property name="transactionManager" ref="jtaTransactionManager" /> <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" /> <property name="transactionTemplate" ref="transactionTemplate" /> </bean> <bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate"> <property name="transactionManager" ref="jtaTransactionManager" /> <property name="isolationLevelName" value="ISOLATION_SERIALIZABLE"/> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="dataSource"/> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="atomikosConnectionFactory" /> </bean> <bean id="jmsXaConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName" value="amq1" /> <property name="xaConnectionFactory" ref="jmsXaConnectionFactory" /> <property name="localTransactionMode" value ="false" /> </bean> <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"> <property name="uniqueResourceName" value="ds1" /> <property name="testQuery" value="select 1" /> <property name="xaDataSource"> <bean class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource"> <property name="serverName" value="localhost" /> <property name="portNumber" value="1433" /> <property name="selectMethod" value="cursor" /> <property name="databaseName" value="messaging" /> <property name="user" value="__user__" /> <property name="password" value="_password__" /> </bean> </property> </bean> </beans> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ End code snippet: "src/main/resources/META-INF/spring/resource-context.xml" +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Note 1 (Broke my transaction wrapping): +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="transacted" value="true"/> <property name="transactionManager" ref="jtaTransactionManager"/> </bean> <bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> <property name="transacted" value="true"/> <property name="transactionManager" ref="jtaTransactionManager"/> </bean> +++++++++++++++++++ IS NOT EQUIVALENT TO +++++++++++++++++++ <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="transactionManager" ref="jtaTransactionManager"/> <property name="connectionFactory" ref="atomikosConnectionFactory"/> <property name="transacted" value="true"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean> <bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> <property name="configuration" ref="jmsConfig"/> </bean> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Note 2 (Broke my transaction wrapping): +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"> <property name="uniqueResourceName" value="ds1" /> <property name="testQuery" value="select 1" /> <property name="xaDataSource"> <bean class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource"> <property name="serverName" value="localhost" /> <property name="portNumber" value="1433" /> <property name="selectMethod" value="cursor" /> <property name="databaseName" value="messaging" /> <property name="user" value="__user__" /> <property name="password" value="_password__" /> </bean> </property> </bean> +++++++++++++++++++ IS NOT EQUIVALENT TO +++++++++++++++++++ <bean id="atomikosDataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"> <property name="uniqueResourceName" value="ds1" /> <property name="xaDataSource" ref="dataSource" /> <property name="testQuery" value="select 1" /> </bean> <bean id="dataSource" class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource"> <property name="serverName" value="localhost" /> <property name="portNumber" value="1433" /> <property name="selectMethod" value="cursor" /> <property name="databaseName" value="messaging" /> <property name="user" value="__user__" /> <property name="password" value="_password__" /> </bean> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Start code snippet: "src/test/java/xa/ServiceMixXaRollbackTest.java" +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ package test.xa; import junit.framework.Assert; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.language.XPath; import org.apache.camel.spring.SpringRouteBuilder; import org.apache.camel.test.CamelSpringTestSupport; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.springframework.context.support.AbstractXmlApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jdbc.core.JdbcTemplate; public class ServiceMixXaRollbackTest extends CamelSpringTestSupport { protected JdbcTemplate jdbc; @Override @BeforeClass public void setUp() throws Exception { super.setUp(); jdbc = context.getRegistry().lookup("jdbcTemplate", JdbcTemplate.class); try { jdbc.execute( "drop table messaging.my_test_thirdparty" ); } catch ( Exception e ) { // ignore } jdbc.execute("create table messaging.my_test_thirdparty ( thirdparty_id varchar(10), name varchar(128), created varchar(20), status_code varchar(3) )"); } @Override @AfterClass public void tearDown() throws Exception { jdbc.execute("drop table messaging.my_test_thirdparty"); } @Override protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); camelContext.addRoutes( createRouteBuilder() ); return camelContext; } @Override protected AbstractXmlApplicationContext createApplicationContext() { return new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/resource-context.xml"}); } @Test public void testXaRollbackAfterDb() throws Exception { Assert.assertEquals( 0, jdbc.queryForInt("select count(*) from my_test_thirdparty")); String xml = "<?xml version=\"1.0\"?><thirdparty id=\"123\"><name>THE TEST THIRDPARTY</name><date>201110140815</date><code>200</code></thirdparty>"; template.sendBody("activemq:queue:my_test_thirdparty", xml); Thread.sleep(15000); Assert.assertEquals( 0, jdbc.queryForInt("select count(*) from my_test_thirdparty")); String dlq = consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty", String.class); Assert.assertNotNull("Should not lose message", dlq); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new SpringRouteBuilder() { @Override public void configure() throws Exception { from("activemq:queue:my_test_thirdparty") .transacted() .log("+++ Before Database Call +++") .bean(ServiceMixXaRollbackTest.class, "toSql") .to("jdbc:dataSource") .log("+++ After Database Call +++") .throwException(new IllegalArgumentException("Unexpected Exception")) ; } }; } public static String toSql(@XPath("thirdparty/@id") int thirdpartyId, @XPath("thirdparty/name/text()") String name, @XPath("thirdparty/date/text()") long created, @XPath("thirdparty/code/text()") int status_code) { if (thirdpartyId <= 0) { throw new IllegalArgumentException("ThirdPartyId is invalid, was " + thirdpartyId); } StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id, name, created, status_code) VALUES ("); sb.append("'").append(thirdpartyId).append("', "); sb.append("'").append(name).append("', "); sb.append("'").append(created).append("', "); sb.append("'").append(status_code).append("') "); return sb.toString(); } } +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ End code snippet: "src/test/java/xa/ServiceMixXaRollbackTest.java" +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Start code snippet: "apache-servicemix-4.3.0/etc/activemq-broker.xml" ++ (Just removed comments from activemq-broker.xml, otherwise the same source xml as the default ServiceMix installation) +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ <blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0" xmlns:ext="http://aries.apache.org/blueprint/xmlns/blueprint-ext/v1.0.0" xmlns:amq="http://activemq.apache.org/schema/core"> <ext:property-placeholder /> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="default" dataDirectory="${karaf.data}/activemq/default" useShutdownHook="false"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true" memoryLimit="10mb"> <pendingSubscriberPolicy> <vmCursor /> </pendingSubscriberPolicy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" /> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext createConnector="false"/> </managementContext> <persistenceAdapter> <kahaDB directory="${karaf.data}/activemq/default/kahadb"/> </persistenceAdapter> <transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61616"/> <transportConnector name="stomp" uri="stomp://localhost:61613"/> </transportConnectors> </broker> <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="maxConnections" value="8" /> <property name="connectionFactory" ref="activemqConnectionFactory" /> </bean> <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource"> <property name="transactionManager" ref="transactionManager" /> <property name="connectionFactory" ref="activemqConnectionFactory" /> <property name="resourceName" value="activemq.default" /> </bean> <reference id="transactionManager" interface="javax.transaction.TransactionManager" /> <service ref="pooledConnectionFactory" interface="javax.jms.ConnectionFactory"> <service-properties> <entry key="name" value="localhost"/> </service-properties> </service> </blueprint> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ End code snippet: "apache-servicemix-4.3.0/etc/activemq-broker.xml" +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -- View this message in context: http://servicemix.396122.n5.nabble.com/JMS-and-Database-interactions-under-the-same-transactional-context-tp4762819p4940331.html Sent from the ServiceMix - User mailing list archive at Nabble.com.
