I am using ServiceMix and I am attempting to use XA transactions correctly. 
Basically, I want my route wrapped in an XA transaction in a way that
ActiveMQ and Microsoft SqlServer persistance is atomic.

Ideally, I want all transactions that fail X amount of times to move into a
Dead Letter Queue for later processing.

---------------------------------------------------------------

Currently for the class "ServiceMixXaRollbackTest" below...

If a transactions is to fail more than 2 times it will move to the Dead
Letter Queue:  "DLQ.my_test_thirdparty".

However if I comment out the code:

onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to(
"activemq:queue:DLQ.my_test_thirdparty" );

Then the JMS XML message that has been dropped into "my_test_thirdparty"
will get consumed and disappear after 4 retries.


---------------------------------------------------------------

Is the way in which I undertake my Camel Error Handling correct?  Any help
would be much appreciated.



Regards,
Mark



---------------------------------------------------------------
Below are some of the articles that may be related.


http://servicemix.396122.n5.nabble.com/smx4-camel2-2-transactional-error-handling-td420449.html
http://camel.465427.n5.nabble.com/Transaction-Error-Handler-with-Dead-Letter-Channel-td3232320.html
http://tmielke.blogspot.com/2011/07/error-handling-in-camel-for-jms.html
http://weblog.plexobject.com/?p=1672
http://camel.apache.org/transactionerrorhandler.html
http://camel.apache.org/transactional-client.html

---------------------------------------------------------------




                                
                                


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* I have a one "src/main/resources/META-INF/spring/resource-context.xml"
file:
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
    xmlns:osgi="http://www.springframework.org/schema/osgi";
    xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0";
    xmlns:broker="http://activemq.apache.org/schema/core";
    xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                            http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
                               http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
                               http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
                               http://servicemix.apache.org/file/1.0
http://servicemix.apache.org/file/1.0/servicemix-file.xsd
                               http://servicemix.apache.org/cxfse/1.0
http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
                               http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>

    <bean id="AtomikosTransactionManager"
class="com.atomikos.icatch.jta.UserTransactionManager"
        init-method="init" destroy-method="close">
        <property name="forceShutdown" value="false" />
    </bean>

    <bean id="AtomikosUserTransaction"
class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="1000" />
    </bean>

    <bean id="JtaTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="AtomikosTransactionManager"
/>
        <property name="userTransaction" ref="AtomikosUserTransaction" />
    </bean>

    <bean id="activemq"
class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <bean id="ConnectionFactory"
class="com.atomikos.jms.AtomikosConnectionFactoryBean"
        init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="amq1" />
        <property name="xaConnectionFactory" ref="activemq" />
    </bean>


    <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean">
        <property name="uniqueResourceName" value="ds1" />
        <property name="xaDataSource" ref="dataSourceRaw" />
        <property name="testQuery" value="select 1" />
    </bean>

    <bean id="dataSourceRaw"
class="com.microsoft.sqlserver.jdbc.SQLServerXADataSource">
        <property name="serverName" value="localhost" />
        <property name="portNumber" value="1433" />
        <property name="selectMethod" value="cursor" />
        <property name="databaseName" value="_theDatabaseName_" />
        <property name="user" value="_theUserName_" />
        <property name="password" value="_thePassword_" />
    </bean>

</beans>


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* I have one properties file "src/main/resouces/jta.properties" file:
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
com.atomikos.icatch.console_file_name = tm-dev.out
com.atomikos.icatch.log_base_name = tmlog-dev
com.atomikos.icatch.tm_unique_name = tmdev
com.atomikos.icatch.serial_jta_transactions=false
com.atomikos.icatch.automatic_resource_registration=true
com.atomikos.icatch.max_actives=15000
com.atomikos.icatch.max_timeout=3600000
com.atomikos.icatch.output_dir=atomikosxatm/
com.atomikos.icatch.log_base_dir=atomikosxatm/


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* I use the embedded ServiceMix 4.3 ActiveMq instance, otherwise you could
use the following file "src/test/resources/broker-context.xml":
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
    xmlns:osgi="http://www.springframework.org/schema/osgi";
    xmlns:cxfse="http://servicemix.apache.org/cxfse/1.0";
    xmlns:broker="http://activemq.apache.org/schema/core";
    xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                            http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
                               http://www.springframework.org/schema/osgi
http://www.springframework.org/schema/osgi/spring-osgi.xsd
                               http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
                               http://servicemix.apache.org/file/1.0
http://servicemix.apache.org/file/1.0/servicemix-file.xsd
                               http://servicemix.apache.org/cxfse/1.0
http://servicemix.apache.org/schema/servicemix-cxf-se-2011.01.xsd
                               http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd";>


    
    <broker:broker id="my-broker" useJmx="false" persistent="false"
brokerName="localhost">
        <broker:transportConnectors>
            <broker:transportConnector uri="tcp://localhost:61616"/>
        </broker:transportConnectors>
    </broker:broker>

</beans>


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* I have one java test class to test successful XA Commit.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package test.xa;

import javax.sql.DataSource;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.language.XPath;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder;
import org.apache.camel.test.CamelSpringTestSupport;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class ServiceMixXaCommitTest extends CamelSpringTestSupport
{
    protected JdbcTemplate jdbc;

    @BeforeMethod
    public void setupDatabase() throws Exception {
        super.setUp();
        DataSource ds = context.getRegistry().lookup("dataSourceRaw",
DataSource.class);
        jdbc = new JdbcTemplate(ds);

//        jdbc = context.getRegistry().lookup("jdbc", JdbcTemplate.class);
        try
        {
            jdbc.execute( "drop table messaging.my_test_thirdparty" );
        }
        catch ( Exception e )
        {
            // ignore
        }
        jdbc.execute("create table messaging.my_test_thirdparty (
thirdparty_id varchar(10), name varchar(128), created varchar(20),
status_code varchar(3) )");
    }

    @AfterMethod
    public void restoreDatabase() throws Exception {
        jdbc.execute("drop table messaging.my_test_thirdparty");
    }

    @Override
    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();

        ActiveMQXAConnectionFactory connectionFactory =
applicationContext.getBean("activemq", ActiveMQXAConnectionFactory.class);
        camelContext.addComponent("activemq",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        camelContext.addComponent("jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

//        DataSource dataSource = applicationContext.getBean("dataSource",
DataSource.class);
//
//        JdbcComponent jdbcComponent = new JdbcComponent();
//        jdbcComponent.setDataSource(dataSource);
//        camelContext.addComponent("jdbc", jdbcComponent);


        camelContext.addRoutes( createRouteBuilder() );

        return camelContext;
    }



    @Override
    protected AbstractXmlApplicationContext createApplicationContext() {
        return new ClassPathXmlApplicationContext(new
String[]{"META-INF/spring/resource-context.xml"});
    }

    @Test
    public void testXaRollbackAfterDb() throws Exception {

        // This database table should be empty
        Assert.assertEquals(jdbc.queryForInt("select count(*) from
my_test_thirdparty"), 0);

        String xml = "<?xml version=\"1.0\"?><thirdparty
id=\"123\"><name>Foo
Bar</name><date>201110140815</date><code>200</code></thirdparty>";
        template.sendBody("activemq:queue:my_test_thirdparty", xml);

        // Wait for route to fail
        Thread.sleep(15000);

        // There should be 1 row in the database
        Assert.assertEquals(jdbc.queryForInt("select count(*) from
my_test_thirdparty"), 1);

        // Check ActiveMq to ensure final state
        String dlq =
consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty",
String.class);
        Assert.assertNull(dlq, "Should not find message message");

    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new SpringRouteBuilder() {

            @Override
            public void configure() throws Exception {


//              Non-transactional dead letter queue.
//               
errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ").maximumRedeliveries(2).redeliveryDelay(500));


                ErrorHandlerBuilder errorHandlerBuilder =
transactionErrorHandler();

                RedeliveryPolicy redeliveryPolicy =
((TransactionErrorHandlerBuilder)errorHandlerBuilder).getRedeliveryPolicy();
                redeliveryPolicy.setRedeliveryDelay( 500 );
                redeliveryPolicy.setBackOffMultiplier(2);
                redeliveryPolicy.setUseExponentialBackOff(true);
                redeliveryPolicy.setMaximumRedeliveryDelay( 30 * 60 * 1000
); // Max = 30 minutes
                redeliveryPolicy.setMaximumRedeliveries(4);

                errorHandler(errorHandlerBuilder);

                // Without the below onException call, the JMS Queue Message
gets consumed and disappears.
               
onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to(
"activemq:queue:DLQ.my_test_thirdparty" );


                from("activemq:queue:my_test_thirdparty")
                    .transacted()
                    .log("+++ Before Database Call +++")
                    .bean(ServiceMixXaCommitTest.class, "toSql")
                    .to("jdbc:dataSource")
                    .log("+++ After Database Call +++")
                    ;


            }

        };
    }


    /*
     * <?xml version="1.0"?><thirdparty id="123"><name>Foo
Bar</name><date>201110140815</date><code>200</code></thirdparty>
     *
     */
    public static String toSql(@XPath("thirdparty/@id") int thirdpartyId,
                        @XPath("thirdparty/name/text()") String name,
                        @XPath("thirdparty/date/text()") long created,
                        @XPath("thirdparty/code/text()") int status_code) {

        if (thirdpartyId <= 0) {
            throw new IllegalArgumentException("ThirdPartyId is invalid, was
" + thirdpartyId);
        }

        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id,
name, created, status_code) VALUES (");
        sb.append("'").append(thirdpartyId).append("', ");
        sb.append("'").append(name).append("', ");
        sb.append("'").append(created).append("', ");
        sb.append("'").append(status_code).append("') ");

        return sb.toString();
    }


}


+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* I have one java test class to test successful XA Rollback.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

package test.xa;

import javax.sql.DataSource;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.language.XPath;
import org.apache.camel.processor.RedeliveryPolicy;
import org.apache.camel.spring.SpringRouteBuilder;
import org.apache.camel.spring.spi.TransactionErrorHandlerBuilder;
import org.apache.camel.test.CamelSpringTestSupport;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class ServiceMixXaRollbackTest extends CamelSpringTestSupport
{
    protected JdbcTemplate jdbc;

    @BeforeMethod
    public void setupDatabase() throws Exception {
        super.setUp();
        DataSource ds = context.getRegistry().lookup("dataSourceRaw",
DataSource.class);
        jdbc = new JdbcTemplate(ds);

//        jdbc = context.getRegistry().lookup("jdbc", JdbcTemplate.class);

        try
        {
            jdbc.execute( "drop table messaging.my_test_thirdparty" );
        }
        catch ( Exception e )
        {
            // ignore
        }
        jdbc.execute("create table messaging.my_test_thirdparty (
thirdparty_id varchar(10), name varchar(128), created varchar(20),
status_code varchar(3) )");
    }

    @AfterMethod
    public void restoreDatabase() throws Exception {
        jdbc.execute("drop table messaging.my_test_thirdparty");
    }

    @Override
    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();

        ActiveMQXAConnectionFactory connectionFactory =
applicationContext.getBean("activemq", ActiveMQXAConnectionFactory.class);
        camelContext.addComponent("activemq",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        camelContext.addComponent("jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

//        DataSource dataSource = applicationContext.getBean("dataSource",
DataSource.class);
//
//        JdbcComponent jdbcComponent = new JdbcComponent();
//        jdbcComponent.setDataSource(dataSource);
//        camelContext.addComponent("jdbc", jdbcComponent);


        camelContext.addRoutes( createRouteBuilder() );

        return camelContext;
    }



    @Override
    protected AbstractXmlApplicationContext createApplicationContext() {
        return new ClassPathXmlApplicationContext(new
String[]{"META-INF/spring/resource-context.xml"});
    }

    @Test
    public void testXaRollbackAfterDb() throws Exception {

        // This database table should be empty
        Assert.assertEquals(jdbc.queryForInt("select count(*) from
my_test_thirdparty"), 0);

        String xml = "<?xml version=\"1.0\"?><thirdparty
id=\"123\"><name>Foo
Bar</name><date>201110140815</date><code>200</code></thirdparty>";
        template.sendBody("activemq:queue:my_test_thirdparty", xml);

        // Wait for route to fail
        Thread.sleep(15000);

        // The database should NOT have any new rows inserted to it.
        Assert.assertEquals(jdbc.queryForInt("select count(*) from
my_test_thirdparty"), 0);

        // Check ActiveMq to ensure final state
        String dlq =
consumer.receiveBodyNoWait("activemq:queue:DLQ.my_test_thirdparty",
String.class);
        Assert.assertNotNull(dlq, "Should not lose message");
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new SpringRouteBuilder() {

            @Override
            public void configure() throws Exception {


//              Non-transactional dead letter queue.
//               
errorHandler(deadLetterChannel("activemq:queue:ActiveMQ.DLQ").maximumRedeliveries(2).redeliveryDelay(500));


                ErrorHandlerBuilder errorHandlerBuilder =
transactionErrorHandler();

                RedeliveryPolicy redeliveryPolicy =
((TransactionErrorHandlerBuilder)errorHandlerBuilder).getRedeliveryPolicy();
                redeliveryPolicy.setRedeliveryDelay( 500 );
                redeliveryPolicy.setBackOffMultiplier(2);
                redeliveryPolicy.setUseExponentialBackOff(true);
                redeliveryPolicy.setMaximumRedeliveryDelay( 30 * 60 * 1000
); // Max = 30 minutes
                redeliveryPolicy.setMaximumRedeliveries(4);

                errorHandler(errorHandlerBuilder);

                // Without the below onException call, the JMS Queue Message
gets consumed and disappears.
               
onException(IllegalArgumentException.class).maximumRedeliveries(2).useOriginalMessage().to(
"activemq:queue:DLQ.my_test_thirdparty" );


                from("activemq:queue:my_test_thirdparty")
                    .transacted()
                    .log("+++ Before Database Call +++")
                    .bean(ServiceMixXaRollbackTest.class, "toSql")
                    .to("jdbc:dataSource")
                    .log("+++ After Database Call +++")
                    .throwException(new IllegalArgumentException("Unexpected
Exception"))
                    ;


            }

        };
    }


    /*
     * <?xml version="1.0"?><thirdparty id="123"><name>Foo
Bar</name><date>201110140815</date><code>200</code></thirdparty>
     *
     */
    public static String toSql(@XPath("thirdparty/@id") int thirdpartyId,
                        @XPath("thirdparty/name/text()") String name,
                        @XPath("thirdparty/date/text()") long created,
                        @XPath("thirdparty/code/text()") int status_code) {

        if (thirdpartyId <= 0) {
            throw new IllegalArgumentException("ThirdPartyId is invalid, was
" + thirdpartyId);
        }

        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO messaging.my_test_thirdparty (thirdparty_id,
name, created, status_code) VALUES (");
        sb.append("'").append(thirdpartyId).append("', ");
        sb.append("'").append(name).append("', ");
        sb.append("'").append(created).append("', ");
        sb.append("'").append(status_code).append("') ");

        return sb.toString();
    }


}


--
View this message in context: 
http://servicemix.396122.n5.nabble.com/JMS-and-Database-interactions-under-the-same-transactional-context-tp4762819p4911770.html
Sent from the ServiceMix - User mailing list archive at Nabble.com.

Reply via email to