On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kray...@gmail.com> wrote: > No one has any idea on this? It would be really great if I could find the > formula to get this to work. >
I dont think people always have the time to help, and especially when its more complicated with transactions and a lot of Camel route code. If you want to get priority help then there is some companies that offer that http://camel.apache.org/commercial-camel-offerings.html > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* > > > On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kray...@gmail.com> wrote: > >> So I think there is a problem with the way rollback is implemented in >> relation to Camel. As far as I can tell there is no way to get the >> following working. >> >> package com.ea.wwce.camel.test.utilities; >> >> import com.ea.wwce.camel.utilities.data.RecordList; >> import com.ea.wwce.camel.utilities.transactions.TxnHelper; >> import org.apache.camel.ExchangePattern; >> import org.apache.camel.builder.AdviceWithRouteBuilder; >> import org.apache.camel.builder.RouteBuilder; >> import org.apache.camel.component.mock.MockEndpoint; >> import org.testng.annotations.Test; >> import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*; >> import static >> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ; >> import static >> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings; >> import static org.apache.camel.ExchangePattern.InOnly; >> >> /** This test suite validates the transaction configuration in the test >> suite. */ >> public class JMSOnlyTransactionTest extends AMQRouteTestSupport { >> private static final String QUEUE_DEAD = "dead"; >> private static final String QUEUE_INBOX = "inbox"; >> private static final String QUEUE_OUTBOX = "outbox"; >> private static final String ROUTE_ID_FEED = "Feed"; >> private static final String ROUTE_ID_TEST_ROUTE = "TestRoute"; >> private static final String ROUTE_ID_RESULTS = "ResultsRoute"; >> private static final String ROUTE_ID_DEAD = "DeadRoute"; >> private static final String DIRECT_FEED_INBOX = "direct:feed_inbox"; >> >> private static final String MOCK_END = "mock:end"; >> private static final String MOCK_BEFORE_TO_QUEUE = >> "mock:before_to_queue"; >> private static final String MOCK_AFTER_TO_QUEUE = "mock:after_to_queue"; >> >> /** Mock endpoints. */ >> private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue, >> mockAfterToQueue; >> >> /** Helper to initialize mocks in the test. */ >> private void initMocks() { >> mockEnd = assertAndGetMockEndpoint(MOCK_END); >> mockDead = assertAndGetMockEndpoint(MOCK_DEAD); >> mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE); >> mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE); >> mockOutbox = assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX)); >> } >> >> @Override >> protected RouteBuilder createRouteBuilder() { >> System.out.println("createRouteBuilder"); >> return new RouteBuilder(this.context) { >> @Override >> public void configure() { >> getContext().setTracing(true); >> from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED) >> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX)); >> from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS) >> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >> .unmarshal(dfCaseRecord).to(MOCK_END); >> from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD) >> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >> .unmarshal(dfCaseRecord).to(MOCK_DEAD); >> from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE) >> >> .onException(RuntimeException.class).handled(true).useOriginalMessage() >> .to(InOnly, endpointAMQ(QUEUE_DEAD)).end() >> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW) >> .unmarshal(dfCaseRecord) >> .to(MOCK_BEFORE_TO_QUEUE) >> .marshal(dfCaseRecord) >> .to(endpointAMQ(QUEUE_OUTBOX)) >> .unmarshal(dfCaseRecord) >> .to(MOCK_AFTER_TO_QUEUE); >> } >> }; >> } >> >> /** Advice the route, mocking ActiveMQ endpoints. */ >> protected void adviceRoute() throws Exception { >> this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith( >> this.context, new AdviceWithRouteBuilder() { >> @Override >> public void configure() throws Exception { >> mockEndpoints("activemq:queue:*"); >> } >> } >> ); >> } >> >> @Test >> public void testNormalRouting() throws Exception { >> adviceRoute(); >> startCamelContext(); >> initMocks(); >> final RecordList cases = casesA(); >> >> mockEnd.expectedBodiesReceived(cases); >> mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); >> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, >> cases)); >> mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); >> >> template.sendBody(DIRECT_FEED_INBOX, cases); >> >> mockBeforeToQueue.assertIsSatisfied(); >> mockAfterToQueue.assertIsSatisfied(); >> mockEnd.assertIsSatisfied(); >> mockDead.assertIsSatisfied(); >> mockOutbox.assertIsSatisfied(); >> } >> >> @Test >> public void testRollbackBeforeEnqueue() throws Exception { >> adviceRoute(); >> startCamelContext(); >> initMocks(); >> final RecordList cases = casesA(); >> >> mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2)); >> mockDead.expectedBodiesReceived(cases.get(0)); >> mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); >> mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); >> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, >> cases.get(1), cases.get(2))); >> mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1), >> cases.get(2)); >> >> template.sendBody(DIRECT_FEED_INBOX, cases); >> >> mockBeforeToQueue.assertIsSatisfied(); >> mockAfterToQueue.assertIsSatisfied(); >> mockEnd.assertIsSatisfied(); >> mockDead.assertIsSatisfied(); >> mockOutbox.assertIsSatisfied(); >> } >> >> @Test >> public void testRollbackAfterEnqueue() throws Exception { >> adviceRoute(); >> startCamelContext(); >> initMocks(); >> final RecordList cases = casesA(); >> >> mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1), cases.get(2)); >> mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0)); >> mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); >> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, >> cases)); >> mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); >> mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); >> >> template.sendBody(DIRECT_FEED_INBOX, cases); >> >> mockBeforeToQueue.assertIsSatisfied(); >> mockAfterToQueue.assertIsSatisfied(); >> mockDead.assertIsSatisfied(); >> mockOutbox.assertIsSatisfied(); >> mockEnd.assertIsSatisfied(); >> } >> } >> >> I have tried dozens of combinations in the onException clause and nothing >> works. Adding markRollbackOnly(), or rollback() only succeeds in rolling >> back the dead letter channel as well. Making the handled(false) cause AMQ >> to resubmit the transaction and rolls back the dead letter channel. I have >> tried a dozen combinations so if anyone has one that works I would be >> grateful. >> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)* >> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* >> >> >> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kray...@gmail.com> wrote: >> >>> So, in the ongoing perfect transaction configuration we have an >>> interesting use case: Consider the following route in a test: >>> >>> @Override >>> protected RouteBuilder createRouteBuilder() { >>> System.out.println("createRouteBuilder"); >>> return new RouteBuilder(this.context) { >>> @Override >>> public void configure() { >>> getContext().setTracing(true); >>> from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED) >>> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >>> >>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX)); >>> from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS) >>> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >>> .unmarshal(dfCaseRecord).to(MOCK_END); >>> from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD) >>> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >>> .unmarshal(dfCaseRecord).to(MOCK_DEAD); >>> from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE) >>> >>> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end() >>> .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) >>> .unmarshal(dfCaseRecord) >>> .to(MOCK_BEFORE_TO_QUEUE) >>> .marshal(dfCaseRecord) >>> .to(endpointAMQ(QUEUE_OUTBOX)) >>> .unmarshal(dfCaseRecord) >>> .to(MOCK_AFTER_TO_QUEUE); >>> } >>> }; >>> } >>> >>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks >>> up the transaction policy by name as it is just a string key for our JNDI >>> registry. Our test case looks like the following. >>> >>> @Test >>> public void testRollbackAfterEnqueue() throws Exception { >>> adviceRoute(); >>> startCamelContext(); >>> initMocks(); >>> final RecordList cases = casesA(); >>> >>> mockEnd.expectedMessageCount(2); >>> mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2)); >>> mockDead.expectedMessageCount(1); >>> mockDead.expectedBodiesReceived(cases.get(0)); >>> mockBeforeToQueue.expectedMessageCount(3); >>> mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases); >>> mockOutbox.expectedMessageCount(3); >>> >>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper, >>> cases)); >>> mockAfterToQueue.expectedMessageCount(3); >>> mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases); >>> mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR); >>> >>> template.sendBody(DIRECT_FEED_INBOX, cases); >>> >>> mockBeforeToQueue.assertIsSatisfied(); >>> mockAfterToQueue.assertIsSatisfied(); >>> mockEnd.assertIsSatisfied(); >>> mockDead.assertIsSatisfied(); >>> mockOutbox.assertIsSatisfied(); >>> } >>> >>> In this route the goal is that if any exceptions are thrown even after >>> the message is enqueued, it should be rolled back, the message that >>> excepted should go to the DLQ and the messages in the outbox should be >>> rolled back but the message from the inbox should not be put back on the >>> queue. This is proving to be a bit of a juggling act. >>> >>> I tried putting markRollBackOnly() in the exception handler after the >>> to(dead) but that rolled back the dead letter queue and outbox and then >>> redelivered the inbox message. Removing the markRollbackOnly() means >>> that the message arrives at dead and is off the inbox but it doesn't get >>> removed from the outbox. >>> >>> So I am looking for the right combination of calls to accomplish what I >>> want to do. Any suggestions? >>> >>> Thanks in advance. >>> >>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* >>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)* >>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 >>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* >>> >> >> -- Claus Ibsen ----------------- Red Hat, Inc. Email: cib...@redhat.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen hawtio: http://hawt.io/ fabric8: http://fabric8.io/