Hi Willem Oh we should not put the duplicate correlation id, in the map, but instead use a putIfAbsent, so we only put if its not unique. If there is a duplicate we should leave the old one as-is. And then thrown an exception so the new one never is sent as a request/reply, but fails.
Then there is no issues at all, such as the warn log can be removed etc. On Fri, Jan 9, 2015 at 8:24 AM, <ningji...@apache.org> wrote: > Repository: camel > Updated Branches: > refs/heads/camel-2.13.x 09d34bd3a -> 387f571cd > refs/heads/camel-2.14.x 9bfb2092c -> 34cd058ce > > > CAMEL-8204 Throw exception when the correlation id is not unique > > > Project: http://git-wip-us.apache.org/repos/asf/camel/repo > Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/34cd058c > Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/34cd058c > Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/34cd058c > > Branch: refs/heads/camel-2.14.x > Commit: 34cd058ced4d1b8c1e8b02bdcafd6c573aad6f2d > Parents: 9bfb209 > Author: Willem Jiang <willem.ji...@gmail.com> > Authored: Thu Jan 8 22:48:44 2015 +0800 > Committer: Willem Jiang <willem.ji...@gmail.com> > Committed: Fri Jan 9 15:20:39 2015 +0800 > > ---------------------------------------------------------------------- > .../component/jms/reply/QueueReplyManager.java | 15 +++---- > .../jms/reply/ReplyManagerSupport.java | 19 +++++++++ > .../jms/reply/TemporaryQueueReplyManager.java | 12 ++---- > .../jms/JmsRequestReplyCorrelationTest.java | 42 ++++++++++++++++++++ > 4 files changed, 69 insertions(+), 19 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java > ---------------------------------------------------------------------- > diff --git > a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java > > b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java > index e494b83..eef52de 100644 > --- > a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java > +++ > b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java > @@ -18,6 +18,7 @@ package org.apache.camel.component.jms.reply; > > import java.math.BigInteger; > import java.util.Random; > + > import javax.jms.Destination; > import javax.jms.JMSException; > import javax.jms.Message; > @@ -46,17 +47,11 @@ public class QueueReplyManager extends > ReplyManagerSupport { > super(camelContext); > } > > - public String registerReply(ReplyManager replyManager, Exchange > exchange, AsyncCallback callback, > - String originalCorrelationId, String > correlationId, long requestTimeout) { > - // add to correlation map > - QueueReplyHandler handler = new QueueReplyHandler(replyManager, > exchange, callback, > + protected ReplyHandler createReplyHandler(ReplyManager replyManager, > Exchange exchange, AsyncCallback callback, > + String originalCorrelationId, > String correlationId, long requestTimeout) { > + return new QueueReplyHandler(replyManager, exchange, callback, > originalCorrelationId, correlationId, requestTimeout); > - ReplyHandler result = correlation.put(correlationId, handler, > requestTimeout); > - if (result != null) { > - log.warn("The correlationId [{}] is not unique, some reply > message would be ignored and the request thread could be blocked.", > correlationId); > - } > - return correlationId; > - } > + } > > public void updateCorrelationId(String correlationId, String > newCorrelationId, long requestTimeout) { > log.trace("Updated provisional correlationId [{}] to expected > correlationId [{}]", correlationId, newCorrelationId); > > http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java > ---------------------------------------------------------------------- > diff --git > a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java > > b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java > index a93a36b..ff2f344 100644 > --- > a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java > +++ > b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java > @@ -19,6 +19,7 @@ package org.apache.camel.component.jms.reply; > import java.util.concurrent.CountDownLatch; > import java.util.concurrent.ScheduledExecutorService; > import java.util.concurrent.TimeUnit; > + > import javax.jms.Destination; > import javax.jms.JMSException; > import javax.jms.Message; > @@ -94,6 +95,24 @@ public abstract class ReplyManagerSupport extends > ServiceSupport implements Repl > } > return replyTo; > } > + > + public String registerReply(ReplyManager replyManager, Exchange > exchange, AsyncCallback callback, > + String originalCorrelationId, String > correlationId, long requestTimeout) { > + // add to correlation map > + QueueReplyHandler handler = new QueueReplyHandler(replyManager, > exchange, callback, > + originalCorrelationId, correlationId, requestTimeout); > + ReplyHandler result = correlation.put(correlationId, handler, > requestTimeout); > + if (result != null) { > + String logMessage = String.format("The correlationId [%s] is not > unique.", correlationId); > + log.warn("{}, some reply message would be ignored and the > request thread could be blocked.", logMessage); > + throw new IllegalArgumentException(logMessage); > + } > + return correlationId; > + } > + > + > + protected abstract ReplyHandler createReplyHandler(ReplyManager > replyManager, Exchange exchange, AsyncCallback callback, > + String originalCorrelationId, String > correlationId, long requestTimeout); > > public void onMessage(Message message) { > String correlationID = null; > > http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > ---------------------------------------------------------------------- > diff --git > a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > > b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > index 788994b..54915e0 100644 > --- > a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > +++ > b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java > @@ -57,15 +57,9 @@ public class TemporaryQueueReplyManager extends > ReplyManagerSupport { > return super.getReplyTo(); > } > > - public String registerReply(ReplyManager replyManager, Exchange > exchange, AsyncCallback callback, > - String originalCorrelationId, String > correlationId, long requestTimeout) { > - // add to correlation map > - TemporaryQueueReplyHandler handler = new > TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, > correlationId, requestTimeout); > - ReplyHandler result = correlation.put(correlationId, handler, > requestTimeout); > - if (result != null) { > - log.error("The correlationId [{}] is not unique, some reply > message would be ignored and the request thread could be blocked.", > correlationId); > - } > - return correlationId; > + protected ReplyHandler createReplyHandler(ReplyManager replyManager, > Exchange exchange, AsyncCallback callback, > + String originalCorrelationId, > String correlationId, long requestTimeout) { > + return new TemporaryQueueReplyHandler(this, exchange, callback, > originalCorrelationId, correlationId, requestTimeout); > } > > public void updateCorrelationId(String correlationId, String > newCorrelationId, long requestTimeout) { > > http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java > ---------------------------------------------------------------------- > diff --git > a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java > > b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java > index d535f95..634e975 100644 > --- > a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java > +++ > b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java > @@ -62,6 +62,41 @@ public class JmsRequestReplyCorrelationTest extends > CamelTestSupport { > assertEquals(REPLY_BODY, out.getOut().getBody(String.class)); > assertEquals("a", out.getOut().getHeader("JMSCorrelationID")); > } > + > + /** > + * As the correlationID should be unique when receiving the reply > message, > + * now we just expect to get an exception here. > + */ > + @Test > + public void testRequestReplyCorrelationWithDuplicateId() throws > Exception { > + MockEndpoint result = getMockEndpoint("mock:result"); > + result.expectedMessageCount(1); > + > + // just send out the request to fill the correlation id first > + template.asyncSend("jms:queue:helloDelay", new Processor() { > + public void process(Exchange exchange) throws Exception { > + exchange.setPattern(ExchangePattern.InOut); > + Message in = exchange.getIn(); > + in.setBody("Hello World"); > + in.setHeader("JMSCorrelationID", "b"); > + } > + }); > + > + Exchange out = template.send("jms:queue:helloDelay", > ExchangePattern.InOut, new Processor() { > + public void process(Exchange exchange) throws Exception { > + Message in = exchange.getIn(); > + in.setBody("Hello World"); > + in.setHeader("JMSCorrelationID", "b"); > + } > + }); > + > + result.assertIsSatisfied(); > + > + assertNotNull("We are expecting the exception here!", > out.getException()); > + assertTrue("Get a wrong exception", out.getException() instanceof > IllegalArgumentException); > + > + } > + > > /** > * When the setting useMessageIdAsCorrelationid is false and > @@ -211,6 +246,13 @@ public class JmsRequestReplyCorrelationTest extends > CamelTestSupport { > > assertNotNull(exchange.getIn().getHeader("JMSReplyTo")); > } > }).to("mock:result"); > + > + > from("jms:queue:helloDelay").delay().constant(2000).process(new Processor() { > + public void process(Exchange exchange) throws Exception { > + exchange.getIn().setBody(REPLY_BODY); > + > assertNotNull(exchange.getIn().getHeader("JMSReplyTo")); > + } > + }).to("mock:result"); > } > }; > } > -- Claus Ibsen ----------------- Red Hat, Inc. Email: cib...@redhat.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen hawtio: http://hawt.io/ fabric8: http://fabric8.io/