Yeah no problem. I was just hoping someone would have the answer. I keep
plugging away at it. Probably some transaction issue, I dont know. You can
answer this perhaps :) , when I send a message to an activemq endpoint with
InOnly exchange pattern, will that message be subject to the transation?
I.e. if the transaction fails will it get popped off AMQ?

*Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
*Author of: Hardcore Java (2003) and Maintainable Java (2012)*
*LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
<http://www.linkedin.com/pub/robert-simmons/40/852/a39>*


On Tue, Apr 29, 2014 at 2:55 AM, Claus Ibsen <claus.ib...@gmail.com> wrote:

> On Mon, Apr 28, 2014 at 8:48 PM, kraythe . <kray...@gmail.com> wrote:
> > No one has any idea on this? It would be really great if I could find the
> > formula to get this to work.
> >
>
> I dont think people always have the time to help, and especially when
> its more complicated with transactions and a lot of Camel route code.
>
> If you want to get priority help then there is some companies that offer
> that
> http://camel.apache.org/commercial-camel-offerings.html
>
> > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> > *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >
> >
> > On Tue, Apr 15, 2014 at 5:15 PM, kraythe . <kray...@gmail.com> wrote:
> >
> >> So I think there is a problem with the way rollback is implemented in
> >> relation to Camel. As far as I can tell there is no way to get the
> >> following working.
> >>
> >> package com.ea.wwce.camel.test.utilities;
> >>
> >> import com.ea.wwce.camel.utilities.data.RecordList;
> >> import com.ea.wwce.camel.utilities.transactions.TxnHelper;
> >> import org.apache.camel.ExchangePattern;
> >> import org.apache.camel.builder.AdviceWithRouteBuilder;
> >> import org.apache.camel.builder.RouteBuilder;
> >> import org.apache.camel.component.mock.MockEndpoint;
> >> import org.testng.annotations.Test;
> >> import static com.ea.wwce.camel.test.utilities.TransactionTestTools.*;
> >> import static
> >> com.ea.wwce.camel.utilities.activemq.ActiveMQHelper.endpointAMQ;
> >> import static
> >>
> com.ea.wwce.camel.utilities.jackson.RecordSerialization.toListOfJsonStrings;
> >> import static org.apache.camel.ExchangePattern.InOnly;
> >>
> >> /** This test suite validates the transaction configuration in the test
> >> suite. */
> >> public class JMSOnlyTransactionTest extends AMQRouteTestSupport {
> >>   private static final String QUEUE_DEAD = "dead";
> >>   private static final String QUEUE_INBOX = "inbox";
> >>   private static final String QUEUE_OUTBOX = "outbox";
> >>   private static final String ROUTE_ID_FEED = "Feed";
> >>   private static final String ROUTE_ID_TEST_ROUTE = "TestRoute";
> >>   private static final String ROUTE_ID_RESULTS = "ResultsRoute";
> >>   private static final String ROUTE_ID_DEAD = "DeadRoute";
> >>   private static final String DIRECT_FEED_INBOX = "direct:feed_inbox";
> >>
> >>   private static final String MOCK_END = "mock:end";
> >>   private static final String MOCK_BEFORE_TO_QUEUE =
> >> "mock:before_to_queue";
> >>   private static final String MOCK_AFTER_TO_QUEUE =
> "mock:after_to_queue";
> >>
> >>   /** Mock endpoints. */
> >>   private MockEndpoint mockEnd, mockDead, mockOutbox, mockBeforeToQueue,
> >> mockAfterToQueue;
> >>
> >>   /** Helper to initialize mocks in the test. */
> >>   private void initMocks() {
> >>     mockEnd = assertAndGetMockEndpoint(MOCK_END);
> >>     mockDead = assertAndGetMockEndpoint(MOCK_DEAD);
> >>     mockBeforeToQueue = assertAndGetMockEndpoint(MOCK_BEFORE_TO_QUEUE);
> >>     mockAfterToQueue = assertAndGetMockEndpoint(MOCK_AFTER_TO_QUEUE);
> >>     mockOutbox =
> assertAndGetMockEndpoint(mockEndpointAMQ(QUEUE_OUTBOX));
> >>   }
> >>
> >>   @Override
> >>   protected RouteBuilder createRouteBuilder() {
> >>     System.out.println("createRouteBuilder");
> >>     return new RouteBuilder(this.context) {
> >>       @Override
> >>       public void configure() {
> >>         getContext().setTracing(true);
> >>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>
> >> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
> >>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>             .unmarshal(dfCaseRecord).to(MOCK_END);
> >>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
> >>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
> >>
> >> .onException(RuntimeException.class).handled(true).useOriginalMessage()
> >>               .to(InOnly, endpointAMQ(QUEUE_DEAD)).end()
> >>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRES_NEW)
> >>             .unmarshal(dfCaseRecord)
> >>             .to(MOCK_BEFORE_TO_QUEUE)
> >>             .marshal(dfCaseRecord)
> >>             .to(endpointAMQ(QUEUE_OUTBOX))
> >>             .unmarshal(dfCaseRecord)
> >>             .to(MOCK_AFTER_TO_QUEUE);
> >>       }
> >>     };
> >>   }
> >>
> >>   /** Advice the route, mocking ActiveMQ endpoints. */
> >>   protected void adviceRoute() throws Exception {
> >>     this.context.getRouteDefinition(ROUTE_ID_TEST_ROUTE).adviceWith(
> >>         this.context, new AdviceWithRouteBuilder() {
> >>           @Override
> >>           public void configure() throws Exception {
> >>             mockEndpoints("activemq:queue:*");
> >>           }
> >>         }
> >>     );
> >>   }
> >>
> >>   @Test
> >>   public void testNormalRouting() throws Exception {
> >>     adviceRoute();
> >>     startCamelContext();
> >>     initMocks();
> >>     final RecordList cases = casesA();
> >>
> >>     mockEnd.expectedBodiesReceived(cases);
> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> cases));
> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>
> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >>
> >>     mockBeforeToQueue.assertIsSatisfied();
> >>     mockAfterToQueue.assertIsSatisfied();
> >>     mockEnd.assertIsSatisfied();
> >>     mockDead.assertIsSatisfied();
> >>     mockOutbox.assertIsSatisfied();
> >>   }
> >>
> >>   @Test
> >>   public void testRollbackBeforeEnqueue() throws Exception {
> >>     adviceRoute();
> >>     startCamelContext();
> >>     initMocks();
> >>     final RecordList cases = casesA();
> >>
> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
> cases.get(2));
> >>     mockDead.expectedBodiesReceived(cases.get(0));
> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>     mockBeforeToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> cases.get(1), cases.get(2)));
> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases.get(1),
> >> cases.get(2));
> >>
> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >>
> >>     mockBeforeToQueue.assertIsSatisfied();
> >>     mockAfterToQueue.assertIsSatisfied();
> >>     mockEnd.assertIsSatisfied();
> >>     mockDead.assertIsSatisfied();
> >>     mockOutbox.assertIsSatisfied();
> >>   }
> >>
> >>   @Test
> >>   public void testRollbackAfterEnqueue() throws Exception {
> >>     adviceRoute();
> >>     startCamelContext();
> >>     initMocks();
> >>     final RecordList cases = casesA();
> >>
> >>     mockEnd.expectedBodiesReceivedInAnyOrder(cases.get(1),
> cases.get(2));
> >>     mockDead.expectedBodiesReceivedInAnyOrder(cases.get(0));
> >>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>
> >> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >> cases));
> >>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >>
> >>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >>
> >>     mockBeforeToQueue.assertIsSatisfied();
> >>     mockAfterToQueue.assertIsSatisfied();
> >>     mockDead.assertIsSatisfied();
> >>     mockOutbox.assertIsSatisfied();
> >>     mockEnd.assertIsSatisfied();
> >>   }
> >> }
> >>
> >> I have tried dozens of combinations in the onException clause and
> nothing
> >> works. Adding markRollbackOnly(), or rollback() only succeeds in rolling
> >> back the dead letter channel as well. Making the handled(false) cause
> AMQ
> >> to resubmit the transaction and rolls back the dead letter channel. I
> have
> >> tried a dozen combinations so if anyone has one that works I would be
> >> grateful.
> >>
> >> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >>
> >>
> >> On Mon, Apr 14, 2014 at 1:10 PM, kraythe . <kray...@gmail.com> wrote:
> >>
> >>> So, in the ongoing perfect transaction configuration we have an
> >>> interesting use case: Consider the following route in a test:
> >>>
> >>>   @Override
> >>>   protected RouteBuilder createRouteBuilder() {
> >>>     System.out.println("createRouteBuilder");
> >>>     return new RouteBuilder(this.context) {
> >>>       @Override
> >>>       public void configure() {
> >>>         getContext().setTracing(true);
> >>>         from(DIRECT_FEED_INBOX).routeId(ROUTE_ID_FEED)
> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>>
> >>> .split(body()).marshal(dfCaseRecord).to(endpointAMQ(QUEUE_INBOX));
> >>>         from(endpointAMQ(QUEUE_OUTBOX)).routeId(ROUTE_ID_RESULTS)
> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>>             .unmarshal(dfCaseRecord).to(MOCK_END);
> >>>         from(endpointAMQ(QUEUE_DEAD)).routeId(ROUTE_ID_DEAD)
> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>>             .unmarshal(dfCaseRecord).to(MOCK_DEAD);
> >>>         from(endpointAMQ(QUEUE_INBOX)).routeId(ROUTE_ID_TEST_ROUTE)
> >>>
> >>>
> .onException(RuntimeException.class).handled(true).useOriginalMessage().to(endpointAMQ(QUEUE_DEAD)).end()
> >>>             .transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED)
> >>>             .unmarshal(dfCaseRecord)
> >>>             .to(MOCK_BEFORE_TO_QUEUE)
> >>>             .marshal(dfCaseRecord)
> >>>             .to(endpointAMQ(QUEUE_OUTBOX))
> >>>             .unmarshal(dfCaseRecord)
> >>>             .to(MOCK_AFTER_TO_QUEUE);
> >>>       }
> >>>     };
> >>>   }
> >>>
> >>> Note that the transacted(TxnHelper.KEY_TXNPOLICY_REQUIRED) simply looks
> >>> up the transaction policy by name as it is just a string key for our
> JNDI
> >>> registry. Our test case looks like the following.
> >>>
> >>>   @Test
> >>>   public void testRollbackAfterEnqueue() throws Exception {
> >>>     adviceRoute();
> >>>     startCamelContext();
> >>>     initMocks();
> >>>     final RecordList cases = casesA();
> >>>
> >>>     mockEnd.expectedMessageCount(2);
> >>>     mockEnd.expectedBodiesReceived(cases.get(1), cases.get(2));
> >>>     mockDead.expectedMessageCount(1);
> >>>     mockDead.expectedBodiesReceived(cases.get(0));
> >>>     mockBeforeToQueue.expectedMessageCount(3);
> >>>     mockBeforeToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>>     mockOutbox.expectedMessageCount(3);
> >>>
> >>> mockOutbox.expectedBodiesReceivedInAnyOrder(toListOfJsonStrings(mapper,
> >>> cases));
> >>>     mockAfterToQueue.expectedMessageCount(3);
> >>>     mockAfterToQueue.expectedBodiesReceivedInAnyOrder(cases);
> >>>     mockAfterToQueue.whenExchangeReceived(1, EXCEPTION_PROCESSOR);
> >>>
> >>>     template.sendBody(DIRECT_FEED_INBOX, cases);
> >>>
> >>>     mockBeforeToQueue.assertIsSatisfied();
> >>>     mockAfterToQueue.assertIsSatisfied();
> >>>     mockEnd.assertIsSatisfied();
> >>>     mockDead.assertIsSatisfied();
> >>>     mockOutbox.assertIsSatisfied();
> >>>   }
> >>>
> >>> In this route the goal is that if any exceptions are thrown even after
> >>> the message is enqueued, it should be rolled back, the message that
> >>> excepted should go to the DLQ and the messages in the outbox should be
> >>> rolled back but the message from the inbox should not be put back on
> the
> >>> queue. This is proving to be a bit of a juggling act.
> >>>
> >>> I tried putting markRollBackOnly() in the exception handler after the
> >>> to(dead) but that rolled back the dead letter queue and outbox and then
> >>> redelivered the inbox message. Removing the markRollbackOnly() means
> >>> that the message arrives at dead and is off the inbox but it doesn't
> get
> >>> removed from the outbox.
> >>>
> >>> So I am looking for the right combination of calls to accomplish what I
> >>> want to do. Any suggestions?
> >>>
> >>> Thanks in advance.
> >>>
> >>> *Robert Simmons Jr. MSc. - Lead Java Architect @ EA*
> >>> *Author of: Hardcore Java (2003) and Maintainable Java (2012)*
> >>> *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39
> >>> <http://www.linkedin.com/pub/robert-simmons/40/852/a39>*
> >>>
> >>
> >>
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cib...@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/
>

Reply via email to