Re: Concurrent Consumers creating duplicates
The problem was with the processor. I didn't realize that processor is a singleton. I have change the code to use bean with prototype scope and it resolved the issue. Thanks for looking into this Willem Jiang. -- View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764298.html Sent from the Camel - Users mailing list archive at Nabble.com.
Re: Concurrent Consumers creating duplicates
The route looks good to me. Can you double check the JMS acknowledge setting of JMS connection? -- Willem Jiang Red Hat, Inc. Web: http://www.redhat.com Blog: http://willemjiang.blogspot.com (English) http://jnn.iteye.com (Chinese) Twitter: willemjiang Weibo: 姜宁willem On March 16, 2015 at 5:56:36 PM, kishore (kishoredevarase...@gmail.com) wrote: > Here is the WMQRouter. The Ack queue is for a different purpose. we have to > change the message adding some more data and send the Ack back. > > > /public class WMQRouter extends RouteBuilder { > > public static org.apache.log4j.Logger LOG = > org.apache.log4j.Logger.getLogger(WMQRouter.class); > > public String incomingQueue; > > public String outgoingQueue; > > public String backupFolder; > > public String ackQueue; > > public String getIncomingQueue() { > return incomingQueue; > } > > public void setIncomingQueue(String incomingQueue) { > this.incomingQueue = incomingQueue; > } > > public String getOutgoingQueue() { > return outgoingQueue; > } > > public void setOutgoingQueue(String outgoingQueue) { > this.outgoingQueue = outgoingQueue; > } > > public String getBackupFolder() { > return backupFolder; > } > > public void setBackupFolder(String backupFolder) { > this.backupFolder = backupFolder; > } > > public String getAckQueue() { > return ackQueue; > } > > public void setAckQueue(String ackQueue) { > this.ackQueue = ackQueue; > } > > /** > * The configure method is invoked by the Camel to process the message > * exchanges from the incoming queue and place the processed transaction > xml > * to corresponding queues This method also processes the on Exception > * scenario wherein invoking the Error Handler and places the failure > * message to the Failure Queue > */ > @Override > public void configure() throws Exception { > > String incomingQueue = "websphere:queue:" + getIncomingQueue(); > > String outgoingQueue = "websphere:queue:" + getOutgoingQueue(); > > String failureQueue = "websphere:queue:" + getAckQueue(); > > String backupFolder = "file:" + getBackupFolder(); > > FixParseValidator.init(); > > FixProcessor fixProcessor = new FixProcessor(); > FIXAckProcessor fixAckProcessor = new FIXAckProcessor(); > FIXErrorHandler fixErrorHandler = new FIXErrorHandler(); > > fixAckProcessor.setFixProcessor(fixProcessor); > fixErrorHandler.setFixProcessor(fixProcessor); > > onException(Exception.class).process(fixErrorHandler); > > onCompletion().process(fixAckProcessor).to(failureQueue); > > from(incomingQueue).to(backupFolder).process(fixProcessor) > .to(outgoingQueue); > } > } > / > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764209.html > > Sent from the Camel - Users mailing list archive at Nabble.com. >
Re: Concurrent Consumers creating duplicates
Here is the WMQRouter. The Ack queue is for a different purpose. we have to change the message adding some more data and send the Ack back. /public class WMQRouter extends RouteBuilder { public static org.apache.log4j.Logger LOG = org.apache.log4j.Logger.getLogger(WMQRouter.class); public String incomingQueue; public String outgoingQueue; public String backupFolder; public String ackQueue; public String getIncomingQueue() { return incomingQueue; } public void setIncomingQueue(String incomingQueue) { this.incomingQueue = incomingQueue; } public String getOutgoingQueue() { return outgoingQueue; } public void setOutgoingQueue(String outgoingQueue) { this.outgoingQueue = outgoingQueue; } public String getBackupFolder() { return backupFolder; } public void setBackupFolder(String backupFolder) { this.backupFolder = backupFolder; } public String getAckQueue() { return ackQueue; } public void setAckQueue(String ackQueue) { this.ackQueue = ackQueue; } /** * The configure method is invoked by the Camel to process the message * exchanges from the incoming queue and place the processed transaction xml * to corresponding queues This method also processes the on Exception * scenario wherein invoking the Error Handler and places the failure * message to the Failure Queue */ @Override public void configure() throws Exception { String incomingQueue = "websphere:queue:" + getIncomingQueue(); String outgoingQueue = "websphere:queue:" + getOutgoingQueue(); String failureQueue = "websphere:queue:" + getAckQueue(); String backupFolder = "file:" + getBackupFolder(); FixParseValidator.init(); FixProcessor fixProcessor = new FixProcessor(); FIXAckProcessor fixAckProcessor = new FIXAckProcessor(); FIXErrorHandler fixErrorHandler = new FIXErrorHandler(); fixAckProcessor.setFixProcessor(fixProcessor); fixErrorHandler.setFixProcessor(fixProcessor); onException(Exception.class).process(fixErrorHandler); onCompletion().process(fixAckProcessor).to(failureQueue); from(incomingQueue).to(backupFolder).process(fixProcessor) .to(outgoingQueue); } } / -- View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764209.html Sent from the Camel - Users mailing list archive at Nabble.com.
Re: Concurrent Consumers creating duplicates
I didn’t find the Route definition of wmqRoute, you may need to show us the code of com.broadridge.adapters.fixadapter.router.WMQRouter. BWT, why do your route need to specify the “ackQueue”? -- Willem Jiang Red Hat, Inc. Web: http://www.redhat.com Blog: http://willemjiang.blogspot.com (English) http://jnn.iteye.com (Chinese) Twitter: willemjiang Weibo: 姜宁willem On March 16, 2015 at 2:01:59 PM, kishore (kishoredevarase...@gmail.com) wrote: > Here is my camel-context.xml file > > > > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; > xmlns:context="http://www.springframework.org/schema/context"; > xmlns:camel="http://camel.apache.org/schema/spring"; > xsi:schemaLocation=" > http://www.springframework.org/schema/context > http://www.springframework.org/schema/context/spring-context-3.0.xsd > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > http://camel.apache.org/schema/spring > http://camel.apache.org/schema/spring/camel-spring.xsd";> > > > > class="com.broadridge.adapters.fixadapter.router.WMQRouter"> > > > > > > > > > > class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> > > > > > > > > > > > > > > > > > class="com.ibm.mq.jms.MQConnectionFactory"> > > > > > > > > > class="org.apache.camel.component.jms.JmsConfiguration"> > > > > > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764201.html > > Sent from the Camel - Users mailing list archive at Nabble.com. >
Re: Concurrent Consumers creating duplicates
Here is my camel-context.xml file http://www.springframework.org/schema/beans"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xmlns:context="http://www.springframework.org/schema/context"; xmlns:camel="http://camel.apache.org/schema/spring"; xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd";> http://camel.apache.org/schema/spring";> -- View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764201.html Sent from the Camel - Users mailing list archive at Nabble.com.
Re: Concurrent Consumers creating duplicates
You may need to check the acknowledge module setting of your JMS connection. BTW, can you show us the JMS endpoint setting that you have? -- Willem Jiang Red Hat, Inc. Web: http://www.redhat.com Blog: http://willemjiang.blogspot.com (English) http://jnn.iteye.com (Chinese) Twitter: willemjiang Weibo: 姜宁willem On March 15, 2015 at 3:09:28 PM, kishore (kishoredevarase...@gmail.com) wrote: > If I set the concurrentConsumers to 1 ,all the input messages are processed > fine and there are no duplicates. If I set concurrentConsumers to 5, some > messages are lost and some messages are duplicated. Can someone please > suggest a solution. > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764168.html > > Sent from the Camel - Users mailing list archive at Nabble.com. >
Re: Concurrent Consumers creating duplicates
If I set the concurrentConsumers to 1 ,all the input messages are processed fine and there are no duplicates. If I set concurrentConsumers to 5, some messages are lost and some messages are duplicated. Can someone please suggest a solution. -- View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158p5764168.html Sent from the Camel - Users mailing list archive at Nabble.com.
Concurrent Consumers creating duplicates
Hello All, I have a simple route which takes messages from an incoming queue, processes the message and writes it an outgoing queue. When I log the exchange.getIn().getBody() inside my processor, I see duplicates. Here is the snippet of my route /MQQueueConnectionFactory cf = new MQQueueConnectionFactory(); // Config cf.setHostName("127.0.0.1"); cf.setPort(1414); cf.setTransportType(WMQConstants.WMQ_CM_CLIENT); cf.setQueueManager("TestQueueManager"); cf.setChannel("SYSTEM.DEF.SVRCONN"); cxt.addComponent("websphere-mq", JmsComponent.jmsComponent(cf)); cxt.addRoutes(new RouteBuilder() { public void configure() { from("websphere-mq:queue:incoming?concurrentConsumers=5").process(new FixProcessor()).to( "websphere-mq:queue:outgoing"); } });/ Here is the snippet of my processor /public void process(Exchange exchange) throws Exception { originalMessage = exchange.getIn().getBody(String.class); ExecutionTransform execTran = new ExecutionTransform(); LOG.info("originalMessage:" + originalMessage);/ Here is the snippet of sample message generator / MQQueueConnectionFactory cf = new MQQueueConnectionFactory(); // Config cf.setHostName("127.0.0.1"); cf.setPort(1414); cf.setTransportType(1); cf.setQueueManager("TestQueueManager"); cf.setChannel("SYSTEM.DEF.SVRCONN"); MQQueueConnection connection = (MQQueueConnection) cf.createQueueConnection(); MQQueueSession session = (MQQueueSession) connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); MQQueue queue = (MQQueue) session.createQueue("queue:///incoming"); MQQueueSender sender = (MQQueueSender) session.createSender(queue); // Start the connection connection.start(); for(int i =0 ; i <100 ; i++){ String msg = "Test Message " + i; JMSTextMessage message = (JMSTextMessage) session.createTextMessage(msg); System.out.println(message.toString()); sender.send(message); }/ After I send 100 messages, I randomly get the originalMessage printed as duplicate. Can someone please what might be wrong with this? Thanks, Kishore. -- View this message in context: http://camel.465427.n5.nabble.com/Concurrent-Consumers-creating-duplicates-tp5764158.html Sent from the Camel - Users mailing list archive at Nabble.com.