Hi Willem

Oh we should not put the duplicate correlation id, in the map, but
instead use a putIfAbsent, so we only put if its not unique. If there
is a duplicate we should leave the old one as-is. And then thrown an
exception so the new one never is sent as a request/reply, but fails.

Then there is no issues at all, such as the warn log can be removed etc.

On Fri, Jan 9, 2015 at 8:24 AM,  <ningji...@apache.org> wrote:
> Repository: camel
> Updated Branches:
>   refs/heads/camel-2.13.x 09d34bd3a -> 387f571cd
>   refs/heads/camel-2.14.x 9bfb2092c -> 34cd058ce
>
>
> CAMEL-8204 Throw exception when the correlation id is not unique
>
>
> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/34cd058c
> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/34cd058c
> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/34cd058c
>
> Branch: refs/heads/camel-2.14.x
> Commit: 34cd058ced4d1b8c1e8b02bdcafd6c573aad6f2d
> Parents: 9bfb209
> Author: Willem Jiang <willem.ji...@gmail.com>
> Authored: Thu Jan 8 22:48:44 2015 +0800
> Committer: Willem Jiang <willem.ji...@gmail.com>
> Committed: Fri Jan 9 15:20:39 2015 +0800
>
> ----------------------------------------------------------------------
>  .../component/jms/reply/QueueReplyManager.java  | 15 +++----
>  .../jms/reply/ReplyManagerSupport.java          | 19 +++++++++
>  .../jms/reply/TemporaryQueueReplyManager.java   | 12 ++----
>  .../jms/JmsRequestReplyCorrelationTest.java     | 42 ++++++++++++++++++++
>  4 files changed, 69 insertions(+), 19 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
> ----------------------------------------------------------------------
> diff --git 
> a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
>  
> b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
> index e494b83..eef52de 100644
> --- 
> a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
> +++ 
> b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
> @@ -18,6 +18,7 @@ package org.apache.camel.component.jms.reply;
>
>  import java.math.BigInteger;
>  import java.util.Random;
> +
>  import javax.jms.Destination;
>  import javax.jms.JMSException;
>  import javax.jms.Message;
> @@ -46,17 +47,11 @@ public class QueueReplyManager extends 
> ReplyManagerSupport {
>          super(camelContext);
>      }
>
> -    public String registerReply(ReplyManager replyManager, Exchange 
> exchange, AsyncCallback callback,
> -                                String originalCorrelationId, String 
> correlationId, long requestTimeout) {
> -        // add to correlation map
> -        QueueReplyHandler handler = new QueueReplyHandler(replyManager, 
> exchange, callback,
> +    protected ReplyHandler createReplyHandler(ReplyManager replyManager, 
> Exchange exchange, AsyncCallback callback,
> +                                              String originalCorrelationId, 
> String correlationId, long requestTimeout) {
> +        return new QueueReplyHandler(replyManager, exchange, callback,
>                  originalCorrelationId, correlationId, requestTimeout);
> -        ReplyHandler result = correlation.put(correlationId, handler, 
> requestTimeout);
> -        if (result != null) {
> -            log.warn("The correlationId [{}] is not unique, some reply 
> message would be ignored and the request thread could be blocked.", 
> correlationId);
> -        }
> -        return correlationId;
> -    }
> +    }
>
>      public void updateCorrelationId(String correlationId, String 
> newCorrelationId, long requestTimeout) {
>          log.trace("Updated provisional correlationId [{}] to expected 
> correlationId [{}]", correlationId, newCorrelationId);
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
> ----------------------------------------------------------------------
> diff --git 
> a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
>  
> b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
> index a93a36b..ff2f344 100644
> --- 
> a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
> +++ 
> b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
> @@ -19,6 +19,7 @@ package org.apache.camel.component.jms.reply;
>  import java.util.concurrent.CountDownLatch;
>  import java.util.concurrent.ScheduledExecutorService;
>  import java.util.concurrent.TimeUnit;
> +
>  import javax.jms.Destination;
>  import javax.jms.JMSException;
>  import javax.jms.Message;
> @@ -94,6 +95,24 @@ public abstract class ReplyManagerSupport extends 
> ServiceSupport implements Repl
>          }
>          return replyTo;
>      }
> +
> +    public String registerReply(ReplyManager replyManager, Exchange 
> exchange, AsyncCallback callback,
> +                                String originalCorrelationId, String 
> correlationId, long requestTimeout) {
> +        // add to correlation map
> +        QueueReplyHandler handler = new QueueReplyHandler(replyManager, 
> exchange, callback,
> +                originalCorrelationId, correlationId, requestTimeout);
> +        ReplyHandler result = correlation.put(correlationId, handler, 
> requestTimeout);
> +        if (result != null) {
> +            String logMessage = String.format("The correlationId [%s] is not 
> unique.", correlationId);
> +            log.warn("{}, some reply message would be ignored and the 
> request thread could be blocked.",  logMessage);
> +            throw new IllegalArgumentException(logMessage);
> +        }
> +        return correlationId;
> +    }
> +
> +
> +    protected abstract ReplyHandler createReplyHandler(ReplyManager 
> replyManager, Exchange exchange, AsyncCallback callback,
> +                                String originalCorrelationId, String 
> correlationId, long requestTimeout);
>
>      public void onMessage(Message message) {
>          String correlationID = null;
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> ----------------------------------------------------------------------
> diff --git 
> a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
>  
> b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> index 788994b..54915e0 100644
> --- 
> a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> +++ 
> b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
> @@ -57,15 +57,9 @@ public class TemporaryQueueReplyManager extends 
> ReplyManagerSupport {
>          return super.getReplyTo();
>      }
>
> -    public String registerReply(ReplyManager replyManager, Exchange 
> exchange, AsyncCallback callback,
> -                                String originalCorrelationId, String 
> correlationId, long requestTimeout) {
> -        // add to correlation map
> -        TemporaryQueueReplyHandler handler = new 
> TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, 
> correlationId, requestTimeout);
> -        ReplyHandler result = correlation.put(correlationId, handler, 
> requestTimeout);
> -        if (result != null) {
> -            log.error("The correlationId [{}] is not unique, some reply 
> message would be ignored and the request thread could be blocked.", 
> correlationId);
> -        }
> -        return correlationId;
> +    protected ReplyHandler createReplyHandler(ReplyManager replyManager, 
> Exchange exchange, AsyncCallback callback,
> +                                              String originalCorrelationId, 
> String correlationId, long requestTimeout) {
> +        return new TemporaryQueueReplyHandler(this, exchange, callback, 
> originalCorrelationId, correlationId, requestTimeout);
>      }
>
>      public void updateCorrelationId(String correlationId, String 
> newCorrelationId, long requestTimeout) {
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/34cd058c/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
> ----------------------------------------------------------------------
> diff --git 
> a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
>  
> b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
> index d535f95..634e975 100644
> --- 
> a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
> +++ 
> b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyCorrelationTest.java
> @@ -62,6 +62,41 @@ public class JmsRequestReplyCorrelationTest extends 
> CamelTestSupport {
>          assertEquals(REPLY_BODY, out.getOut().getBody(String.class));
>          assertEquals("a", out.getOut().getHeader("JMSCorrelationID"));
>      }
> +
> +    /**
> +     * As the correlationID should be unique when receiving the reply 
> message,
> +     * now we just expect to get an exception here.
> +     */
> +    @Test
> +    public void testRequestReplyCorrelationWithDuplicateId() throws 
> Exception {
> +        MockEndpoint result = getMockEndpoint("mock:result");
> +        result.expectedMessageCount(1);
> +
> +        // just send out the request to fill the correlation id first
> +        template.asyncSend("jms:queue:helloDelay", new Processor() {
> +            public void process(Exchange exchange) throws Exception {
> +                exchange.setPattern(ExchangePattern.InOut);
> +                Message in = exchange.getIn();
> +                in.setBody("Hello World");
> +                in.setHeader("JMSCorrelationID", "b");
> +            }
> +        });
> +
> +        Exchange out = template.send("jms:queue:helloDelay", 
> ExchangePattern.InOut, new Processor() {
> +            public void process(Exchange exchange) throws Exception {
> +                Message in = exchange.getIn();
> +                in.setBody("Hello World");
> +                in.setHeader("JMSCorrelationID", "b");
> +            }
> +        });
> +
> +        result.assertIsSatisfied();
> +
> +        assertNotNull("We are expecting the exception here!", 
> out.getException());
> +        assertTrue("Get a wrong exception", out.getException() instanceof 
> IllegalArgumentException);
> +
> +    }
> +
>
>      /**
>       * When the setting useMessageIdAsCorrelationid is false and
> @@ -211,6 +246,13 @@ public class JmsRequestReplyCorrelationTest extends 
> CamelTestSupport {
>                          
> assertNotNull(exchange.getIn().getHeader("JMSReplyTo"));
>                      }
>                  }).to("mock:result");
> +
> +                
> from("jms:queue:helloDelay").delay().constant(2000).process(new Processor() {
> +                    public void process(Exchange exchange) throws Exception {
> +                        exchange.getIn().setBody(REPLY_BODY);
> +                        
> assertNotNull(exchange.getIn().getHeader("JMSReplyTo"));
> +                    }
> +                }).to("mock:result");
>              }
>          };
>      }
>



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cib...@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Reply via email to