Thanks Kraythe! It worked.
On Tue, Dec 24, 2013 at 3:20 AM, kraythe . <kray...@gmail.com> wrote: > Ewww... I hate spring xml DSL (fluent Java or Scala is much better) .. > anyway after my eyes are done bleeding ... it seems like you are wrapping > the "possible failure" calls in a doTry/catch. Since you are handling the > transactions, camel assumes all is well and that you meant to do that, so > an exception wont be propagated and the transaction wont be rolled back. If > you want to have a transaction be rolled back you have to let the exception > propagate while you have a transaction error handler in scope or > setRollbackOnly() on the transaction when you catch the exception: Such as: > > from(endpointAMQ(QUEUE_DB_MONITOR_EVENT)) // Monitor the event queue > .routeId(ROUTE_ID_AUTOMATION_CASES_DB_PROCESSOR) // give the route > an ID > > .onException(Exception.class).useOriginalMessage().handled(true).maximumRedeliveries(0) > // catch exceptions > .setHeader(Exchange.FAILURE_ROUTE_ID, property(Exchange. > FAILURE_ROUTE_ID)) // set route that errored > .setHeader(Exchange.EXCEPTION_CAUGHT, simple( > "${exception.stacktrace}")) // Store reason for error > .to(ExchangePattern.InOnly, > endpointAMQ(QUEUE_CASE_AUTOMATION_DEAD)).end() > // to DLQ > .transacted(KEY_TXPOLICY_REQUIRED) // make the route transacted > // rest of route > > As you can see I store the exception stack trace and other information in > the headers of the failed exchange, send the exchange to the dead letter > queue and then so on. So if there is an exception in this route, NOTHING > will get rolled back which is what I want since I cant reprocess the > message. So why transacted? If someone pulls the plug on the machine or > uses kill -9 on the process, I want AMQ to retain the message. Now if I > wanted a rollback I could add the call .rollback() right after my > onException() > call and before the .end call. That would tell the exception error handler > (which is a transaction handler) to rollback AMQ. > > Now if you want to rollback both a database and AMQ transaction you will > need a JTA transaction manager. > > > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* > > > On Mon, Dec 23, 2013 at 2:11 AM, Tarun Kumar <agrawal.taru...@gmail.com > >wrote: > > > Hi Kraythe, > > > > Thanks for replying. > > > > Here is how my route looks like: > > > > <camel:route id="createMigrationRoute"> > > > > <camel:from uri="timer://foo?period=1000"></camel:from> > > > > <camel:transacted> > > > > <camel:doTry> > > > > <camel:bean ref="abc" method="fetchMessage"></camel:bean> > > > > <camel:bean ref="pqr" method="processMessage"></camel:bean> > > > > <camel:doCatch> > > > > <camel:exception>java.lang.IOException</camel:exception> > > > > <camel:bean ref="def" method="handleException" /> > > > > </camel:doCatch> > > > > <camel:doFinally> > > > > <camel:bean ref="dsd" method="me2"></camel:bean> > > > > </camel:doFinally> > > > > </camel:doTry> > > > > </camel:transacted> > > > > </camel:route> > > > > > > And beans looks like this: > > > > > > <bean id="catalogJms" > class="org.apache.camel.component.jms.JmsComponent"> > > > > <property name="connectionFactory"> > > > > <bean class="org.apache.activemq.ActiveMQConnectionFactory"> > > > > <property name="brokerURL"> > > > > <value>tcp://localhost:61616</value> > > > > </property> > > > > </bean> > > > > </property> > > > > <property name="transacted" value="true"></property> > > > > <property name="transactionManager" ref="jmsTransactionManager" /> > > > > <property name="acknowledgementModeName"> > > > > <value>CLIENT_ACKNOWLEDGE</value> > > > > </property> > > > > </bean> > > > > > > <bean id="poolConnectionFactory" class= > > "org.apache.activemq.pool.PooledConnectionFactory"> > > > > <property name="maxConnections" value="8" /> > > > > <property name="connectionFactory" ref="jmsConnectionFactory" /> > > > > </bean> > > > > > > <bean id="jmsConnectionFactory" class= > > "org.apache.activemq.ActiveMQConnectionFactory"> > > > > <property name="brokerURL" > > > > value= > > "vm://localhost:61616?broker.persistent=false&broker.useJmx=false" /> > > > > </bean> > > > > > > <!-- setup spring jms TX manager --> > > > > <bean id="jmsTransactionManager" > > > > class="org.springframework.jms.connection.JmsTransactionManager"> > > > > <property name="connectionFactory" ref="poolConnectionFactory" /> > > > > </bean> > > > > > > And fetchMessage in abc bean class looks like: > > > > > > public String fetchMessage() { > > > > Exchange exchange = consumer > > > > .receive( > > > > > "catalogJms:queue:queue1?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE&transacted=true", > > 5000); > > > > return exchange.getIn().getBody(TextMessage.class).toString(); > > > > } > > > > > > It still consumes message (QueueSize decreases by one) even when it fails > > in pqr bean with any exception apart from IOException. > > > > Please let me know what am i missing here? > > > > > > > > > > On Sat, Dec 21, 2013 at 10:01 PM, kraythe . <kray...@gmail.com> wrote: > > > > > In that case you are going about it wrong. Camel has this support built > > in > > > for you. If the route is declared to be transacted with > > > .transacted(myRequirdPolicy) then camel will handle that all for you > with > > > that one line. Put the transacted line right after your exception > > handleing > > > and before the main route and then go about your business. If you want > a > > > composite transaction that rolls back both the DB and JMS work then you > > > will need to configure a jta transaction manager. Just google 'camel > > > jta transacted route' and you will get code examples and so on. > > > > > > On Saturday, December 21, 2013, Tarun Kumar wrote: > > > > > > > Thanks for replying. Here is my usecase: > > > > > > > > consume a message from the queue. Do some transformation on the > messge. > > > > Then, persist the transformed message to datastore. > > > > Once, transformed message is written to datastore, send ack. Reason i > > > want > > > > to send ack later is, in case my application goes down post message > > > > consumption and before writing to datastore, i should be able to > fetch > > > same > > > > message again from JMS queue. Hope that clarifies. > > > > > > > > Thanks! > > > > > > > > > > > > > > > > On Sat, Dec 21, 2013 at 9:14 PM, kraythe . <kray...@gmail.com > > > <javascript:;>> > > > > wrote: > > > > > > > > > I don't think the question is quite clear. In JMS you can only > > consume > > > a > > > > > message off a queue once. You can't consume it and leave it on the > > > queue. > > > > > Why do want to not ack the message? Queues are like throwing candy > > > into a > > > > > room of kindergarten children. All will scramble for the candy but > > each > > > > > piece will get consumed by only one child. The only time JMS > messages > > > > > aren't caked is when they are failed in delivery. And even then, > the > > > only > > > > > use case for leaving them on the queue is because of a failed > > > transaction > > > > > in a transactional route, > > > > > > > > > > Tell me what is the use case you are trying to implement and I > might > > be > > > > > able to help you down another path. > > > > > > > > > > On Saturday, December 21, 2013, Tarun Kumar wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > Any help here will be highly appreciated. > > > > > > > > > > > > > > > > > > On Fri, Dec 20, 2013 at 9:55 AM, Tarun Kumar < > > > > agrawal.taru...@gmail.com <javascript:;> > > > > > <javascript:;> > > > > > > >wrote: > > > > > > > > > > > > > I am using polling consumer. > > > > > > > > > > > > > > from("timer://foo?period=5000").bean(cool, > "someBusinessLogic"); > > > > > > > > > > > > > > public static class MyCoolBean { > > > > > > > > > > > > > > private ConsumerTemplate consumer; > > > > > > > > > > > > > > public void setConsumer(ConsumerTemplate consumer) { > > > > > > > this.consumer = consumer; > > > > > > > } > > > > > > > > > > > > > > public void someBusinessLogic() { > > > > > > > > > > > > > > Exchange exchange > > > > > > > = > > > > > > > > > > > > > > > > > > > > > consumer.receive("catalogJms:queue:queueName?mapJmsMessage=false&acknowledgementModeName=CLIENT_ACKNOWLEDGE"); > > > > > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > > Even though i am not sending acknowledgement for received > > message, > > > > each > > > > > > > time it is polling different message. Any idea why that's > > > happening? > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > > > > > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > > > > > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > > > > > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* > > > > > > > > > > > > > > > > > > -- > > > *Robert Simmons Jr. MSc. - Lead Java Architect @ EA* > > > *Author of: Hardcore Java (2003) and Maintainable Java (2012)* > > > *LinkedIn: **http://www.linkedin.com/pub/robert-simmons/40/852/a39 > > > <http://www.linkedin.com/pub/robert-simmons/40/852/a39>* > > > > > >