I am using ServiceMix and I am attempting to use XA transactions correctly. Basically, I want my route wrapped in an XA transaction in a way that ActiveMQ and Microsoft SqlServer persistance is atomic.
Ideally, I want all transactions that fail X amount of times to move into a Dead Letter Queue for later processing. --------------------------------------------------------------- Currently for the class "ServiceMixXaRollbackTest" below... If a transactions is to fail more than 2 times it will move to the Dead Letter Queue: "DLQ.my_test_thirdparty". However if I comment out the code: onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" ); Then the JMS XML message that has been dropped into "my_test_thirdparty" will get consumed and disappear after 4 retries. --------------------------------------------------------------- Is the way in which I undertake my Camel Error Handling correct? Any help would be much appreciated. Regards, Mark --------------------------------------------------------------- Below are some of the articles that may be related. http://servicemix.396122.n5.nabble.com/smx4-camel2-2-transactional-error-handling-td420449.html http://camel.465427.n5.nabble.com/Transaction-Error-Handler-with-Dead-Letter-Channel-td3232320.html http://tmielke.blogspot.com/2011/07/error-handling-in-camel-for-jms.html http://weblog.plexobject.com/?p=1672 http://camel.apache.org/transactionerrorhandler.html http://camel.apache.org/transactional-client.html --------------------------------------------------------------- +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * I have a one "src/main/resources/META-INF/spring/resource-context.xml" file: +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi" xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0" xmlns:broker="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <bean id="AtomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close"> <property name="forceShutdown" value="false" /> </bean> <bean id="AtomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"> <property name="transactionTimeout" value="1000" /> </bean> <bean id="JtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"> <property name="transactionManager" ref="AtomikosTransactionManager" /> <property name="userTransaction" ref="AtomikosUserTransaction" /> </bean> <bean id="activemq" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616" /> </bean> <bean id="ConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName" value="amq1" /> <property name="xaConnectionFactory" ref="activemq" /> </bean> <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"> <property name="uniqueResourceName" value="ds1" /> <property name="xaDataSource" ref="dataSourceRaw" /> <property name="testQuery" value="select 1" /> </bean> <bean id="dataSourceRaw" class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource"> <property name="serverName" value="localhost" /> <property name="portNumber" value="1433" /> <property name="selectMethod" value="cursor" /> <property name="databaseName" value="_theDatabaseName_" /> <property name="user" value="_theUserName_" /> <property name="password" value="_thePassword_" /> </bean> </beans> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * I have one properties file "src/main/resouces/jta.properties" file: +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory com.atomikos.icatch.console_file_name = tm-dev.out com.atomikos.icatch.log_base_name = tmlog-dev com.atomikos.icatch.tm_unique_name = tmdev com.atomikos.icatch.serial_jta_transactions=false com.atomikos.icatch.automatic_resource_registration=true com.atomikos.icatch.max_actives=15000 com.atomikos.icatch.max_timeout=3600000 com.atomikos.icatch.output_dir=atomikosxatm/ com.atomikos.icatch.log_base_dir=atomikosxatm/ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * I use the embedded ServiceMix 4.3 ActiveMq instance, otherwise you could use the following file "src/test/resources/broker-context.xml": +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:osgi="http://www.springframework.org/schema/osgi" xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0" xmlns:broker="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/osgi http://www.springframework.org/schema/osgi/spring-osgi.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://servicemix.apache.org/file/1.0 http://servicemix.apache.org/file/1.0/servicemix-file.xsd http://servicemix.apache.org/cxfse/1.0 http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <broker:broker id="my-broker" useJmx="false" persistent="false" brokerName="localhost"> <broker:transportConnectors> <broker:transportConnector uri="tcp://localhost:61616"/> </broker:transportConnectors> </broker:broker> </beans> +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * I have one java test class to test successful XA Commit. +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ package test.xa; import javax.sql.DataSource; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.language.XPath; import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.spring.SpringRouteBuilder; import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder; import org.apache.camel.test.CamelSpringTestSupport; import org.springframework.context.support.AbstractXmlApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jdbc.core.JdbcTemplate; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class ServiceMixXaCommitTest extends CamelSpringTestSupport { protected JdbcTemplate jdbc; @BeforeMethod public void setupDatabase() throws Exception { super.setUp(); DataSource ds = context.getRegistry().lookup("dataSourceRaw", DataSource.class); jdbc = new JdbcTemplate(ds); // jdbc = context.getRegistry().lookup("jdbc", JdbcTemplate.class); try { jdbc.execute( "drop table messaging.my_test_thirdparty" ); } catch ( Exception e ) { // ignore } jdbc.execute("create table messaging.my_test_thirdparty ( thirdparty_id varchar(10), name varchar(128), created varchar(20), status_code varchar(3) )"); } @AfterMethod public void restoreDatabase() throws Exception { jdbc.execute("drop table messaging.my_test_thirdparty"); } @Override protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); ActiveMQXAConnectionFactory connectionFactory = applicationContext.getBean("activemq", ActiveMQXAConnectionFactory.class); camelContext.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); // DataSource dataSource = applicationContext.getBean("dataSource", DataSource.class); // // JdbcComponent jdbcComponent = new JdbcComponent(); // jdbcComponent.setDataSource(dataSource); // camelContext.addComponent("jdbc", jdbcComponent); camelContext.addRoutes( createRouteBuilder() ); return camelContext; } @Override protected AbstractXmlApplicationContext createApplicationContext() { return new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/resource-context.xml"}); } @Test public void testXaRollbackAfterDb() throws Exception { // This database table should be empty Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0); String xml = "<?xml version=\"1.0\"?><thirdparty id=\"123\"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>"; template.sendBody("activemq:queue:my_test_thirdparty", xml); // Wait for route to fail Thread.sleep(15000); // There should be 1 row in the database Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 1); // Check ActiveMq to ensure final state String dlq = consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty", String.class); Assert.assertNull(dlq, "Should not find message message"); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new SpringRouteBuilder() { @Override public void configure() throws Exception { // Non-transactional dead letter queue. // errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ").maximumRedeliveries(2).redeliveryDelay(500)); ErrorHandlerBuilder errorHandlerBuilder = transactionErrorHandler(); RedeliveryPolicy redeliveryPolicy = ((TransactionErrorHandlerBuilder)errorHandlerBuilder).getRedeliveryPolicy(); redeliveryPolicy.setRedeliveryDelay( 500 ); redeliveryPolicy.setBackOffMultiplier(2); redeliveryPolicy.setUseExponentialBackOff(true); redeliveryPolicy.setMaximumRedeliveryDelay( 30 * 60 * 1000 ); // Max = 30 minutes redeliveryPolicy.setMaximumRedeliveries(4); errorHandler(errorHandlerBuilder); // Without the below onException call, the JMS Queue Message gets consumed and disappears. onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" ); from("activemq:queue:my_test_thirdparty") .transacted() .log("+++ Before Database Call +++") .bean(ServiceMixXaCommitTest.class, "toSql") .to("jdbc:dataSource") .log("+++ After Database Call +++") ; } }; } /* * <?xml version="1.0"?><thirdparty id="123"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty> * */ public static String toSql(@XPath("thirdparty/@id") int thirdpartyId, @XPath("thirdparty/name/text()") String name, @XPath("thirdparty/date/text()") long created, @XPath("thirdparty/code/text()") int status_code) { if (thirdpartyId <= 0) { throw new IllegalArgumentException("ThirdPartyId is invalid, was " + thirdpartyId); } StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id, name, created, status_code) VALUES ("); sb.append("'").append(thirdpartyId).append("', "); sb.append("'").append(name).append("', "); sb.append("'").append(created).append("', "); sb.append("'").append(status_code).append("') "); return sb.toString(); } } +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * I have one java test class to test successful XA Rollback. +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ package test.xa; import javax.sql.DataSource; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.language.XPath; import org.apache.camel.processor.RedeliveryPolicy; import org.apache.camel.spring.SpringRouteBuilder; import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder; import org.apache.camel.test.CamelSpringTestSupport; import org.springframework.context.support.AbstractXmlApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jdbc.core.JdbcTemplate; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class ServiceMixXaRollbackTest extends CamelSpringTestSupport { protected JdbcTemplate jdbc; @BeforeMethod public void setupDatabase() throws Exception { super.setUp(); DataSource ds = context.getRegistry().lookup("dataSourceRaw", DataSource.class); jdbc = new JdbcTemplate(ds); // jdbc = context.getRegistry().lookup("jdbc", JdbcTemplate.class); try { jdbc.execute( "drop table messaging.my_test_thirdparty" ); } catch ( Exception e ) { // ignore } jdbc.execute("create table messaging.my_test_thirdparty ( thirdparty_id varchar(10), name varchar(128), created varchar(20), status_code varchar(3) )"); } @AfterMethod public void restoreDatabase() throws Exception { jdbc.execute("drop table messaging.my_test_thirdparty"); } @Override protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); ActiveMQXAConnectionFactory connectionFactory = applicationContext.getBean("activemq", ActiveMQXAConnectionFactory.class); camelContext.addComponent("activemq", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); // DataSource dataSource = applicationContext.getBean("dataSource", DataSource.class); // // JdbcComponent jdbcComponent = new JdbcComponent(); // jdbcComponent.setDataSource(dataSource); // camelContext.addComponent("jdbc", jdbcComponent); camelContext.addRoutes( createRouteBuilder() ); return camelContext; } @Override protected AbstractXmlApplicationContext createApplicationContext() { return new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/resource-context.xml"}); } @Test public void testXaRollbackAfterDb() throws Exception { // This database table should be empty Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0); String xml = "<?xml version=\"1.0\"?><thirdparty id=\"123\"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty>"; template.sendBody("activemq:queue:my_test_thirdparty", xml); // Wait for route to fail Thread.sleep(15000); // The database should NOT have any new rows inserted to it. Assert.assertEquals(jdbc.queryForInt("select count(*) from my_test_thirdparty"), 0); // Check ActiveMq to ensure final state String dlq = consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty", String.class); Assert.assertNotNull(dlq, "Should not lose message"); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new SpringRouteBuilder() { @Override public void configure() throws Exception { // Non-transactional dead letter queue. // errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ").maximumRedeliveries(2).redeliveryDelay(500)); ErrorHandlerBuilder errorHandlerBuilder = transactionErrorHandler(); RedeliveryPolicy redeliveryPolicy = ((TransactionErrorHandlerBuilder)errorHandlerBuilder).getRedeliveryPolicy(); redeliveryPolicy.setRedeliveryDelay( 500 ); redeliveryPolicy.setBackOffMultiplier(2); redeliveryPolicy.setUseExponentialBackOff(true); redeliveryPolicy.setMaximumRedeliveryDelay( 30 * 60 * 1000 ); // Max = 30 minutes redeliveryPolicy.setMaximumRedeliveries(4); errorHandler(errorHandlerBuilder); // Without the below onException call, the JMS Queue Message gets consumed and disappears. onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to( "activemq:queue:DLQ.my_test_thirdparty" ); from("activemq:queue:my_test_thirdparty") .transacted() .log("+++ Before Database Call +++") .bean(ServiceMixXaRollbackTest.class, "toSql") .to("jdbc:dataSource") .log("+++ After Database Call +++") .throwException(new IllegalArgumentException("Unexpected Exception")) ; } }; } /* * <?xml version="1.0"?><thirdparty id="123"><name>Foo Bar</name><date>201110140815</date><code>200</code></thirdparty> * */ public static String toSql(@XPath("thirdparty/@id") int thirdpartyId, @XPath("thirdparty/name/text()") String name, @XPath("thirdparty/date/text()") long created, @XPath("thirdparty/code/text()") int status_code) { if (thirdpartyId <= 0) { throw new IllegalArgumentException("ThirdPartyId is invalid, was " + thirdpartyId); } StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id, name, created, status_code) VALUES ("); sb.append("'").append(thirdpartyId).append("', "); sb.append("'").append(name).append("', "); sb.append("'").append(created).append("', "); sb.append("'").append(status_code).append("') "); return sb.toString(); } } -- View this message in context: http://servicemix.396122.n5.nabble.com/JMS-and-Database-interactions-under-the-same-transactional-context-tp4762819p4911770.html Sent from the ServiceMix - User mailing list archive at Nabble.com.
