Author: davsclaus
Date: Mon Jun  4 07:50:55 2012
New Revision: 1345854

URL: http://svn.apache.org/viewvc?rev=1345854&view=rev
Log:
CAMEL-5309: Fixed issue when reusing previous jms endpoint with old reply 
manager, when readding a route. The reply manager in use should be be be 
associated with its producer, and also tied to the lifecycle of the producer.

Added:
    
camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java
      - copied unchanged from r1345844, 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java
Modified:
    camel/branches/camel-2.9.x/   (props changed)
    
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
    
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
    
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
    
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1345844

Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1345854&r1=1345853&r2=1345854&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 Mon Jun  4 07:50:55 2012
@@ -16,11 +16,8 @@
  */
 package org.apache.camel.component.jms;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -43,16 +40,12 @@ import org.apache.camel.Service;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
-import org.apache.camel.component.jms.reply.ReplyManager;
-import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.HeaderFilterStrategyAware;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.UnsafeUriCharactersEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,10 +74,6 @@ public class JmsEndpoint extends Default
     private Destination destination;
     private String selector;
     private JmsConfiguration configuration;
-    private final Map<String, ReplyManager> replyToReplyManager = new 
HashMap<String, ReplyManager>();
-    private ReplyManager replyManager;
-    // scheduled executor to check for timeout (reply not received)
-    private ScheduledExecutorService replyManagerExecutorService;
     private final AtomicBoolean running = new AtomicBoolean();
     private volatile boolean destroying;
 
@@ -177,14 +166,6 @@ public class JmsEndpoint extends Default
             notifyAll();
         }
     }
-    public void destroyMessageListenerContainer(final 
AbstractMessageListenerContainer listenerContainer) {
-        destroying = true;
-        this.getReplyManagerExecutorService().execute(new Runnable() {
-            public void run() {
-                destroyMessageListenerContainerInternal(listenerContainer);
-            }
-        });
-    }
 
     public AbstractMessageListenerContainer createMessageListenerContainer() 
throws Exception {
         return configuration.createMessageListenerContainer(this);
@@ -383,31 +364,6 @@ public class JmsEndpoint extends Default
         return true;
     }
 
-    public synchronized ReplyManager getReplyManager() throws Exception {
-        if (replyManager == null) {
-            // use a temporary queue
-            replyManager = new TemporaryQueueReplyManager();
-            replyManager.setEndpoint(this);
-            
replyManager.setScheduledExecutorService(getReplyManagerExecutorService());
-            ServiceHelper.startService(replyManager);
-        }
-        return replyManager;
-    }
-
-    public synchronized ReplyManager getReplyManager(String replyTo) throws 
Exception {
-        ReplyManager answer = replyToReplyManager.get(replyTo);
-        if (answer == null) {
-            // use a persistent queue
-            answer = new PersistentQueueReplyManager();
-            answer.setEndpoint(this);
-            
answer.setScheduledExecutorService(getReplyManagerExecutorService());
-            ServiceHelper.startService(answer);
-            // remember this manager so we can re-use it
-            replyToReplyManager.put(replyTo, answer);
-        }
-        return answer;
-    }
-
     public boolean isPubSubDomain() {
         return pubSubDomain;
     }
@@ -443,7 +399,6 @@ public class JmsEndpoint extends Default
         return metadata;
     }
 
-
     /**
      * Returns the {@link JmsOperations} used for metadata operations such as 
creating temporary destinations
      */
@@ -455,14 +410,6 @@ public class JmsEndpoint extends Default
         return template;
     }
 
-    protected synchronized ScheduledExecutorService 
getReplyManagerExecutorService() {
-        if (replyManagerExecutorService == null) {
-            String name = "JmsReplyManagerTimeoutChecker[" + 
getEndpointConfiguredDestinationName() + "]";
-            replyManagerExecutorService = 
getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
-        }
-        return replyManagerExecutorService;
-    }
-
     /**
      * State whether this endpoint is running (eg started)
      */
@@ -478,18 +425,6 @@ public class JmsEndpoint extends Default
     @Override
     protected void doStop() throws Exception {
         running.set(false);
-
-        if (replyManager != null) {
-            ServiceHelper.stopService(replyManager);
-            replyManager = null;
-        }
-
-        if (!replyToReplyManager.isEmpty()) {
-            for (ReplyManager replyManager : replyToReplyManager.values()) {
-                ServiceHelper.stopService(replyManager);
-            }
-            replyToReplyManager.clear();
-        }
     }
 
     // Delegated properties from the configuration

Modified: 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1345854&r1=1345853&r2=1345854&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
 Mon Jun  4 07:50:55 2012
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms;
 
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -29,11 +30,14 @@ import org.apache.camel.Exchange;
 import org.apache.camel.FailedToCreateProducerException;
 import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate;
+import org.apache.camel.component.jms.reply.PersistentQueueReplyManager;
 import org.apache.camel.component.jms.reply.ReplyManager;
+import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager;
 import 
org.apache.camel.component.jms.reply.UseMessageIdAsCorrelationIdMessageSentCallback;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.ValueHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +46,7 @@ import org.springframework.jms.core.Mess
 import org.springframework.jms.support.JmsUtils;
 
 import static 
org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
+
 /**
  * @version 
  */
@@ -59,6 +64,11 @@ public class JmsProducer extends Default
         this.endpoint = endpoint;
     }
 
+    @Override
+    public JmsEndpoint getEndpoint() {
+        return (JmsEndpoint) super.getEndpoint();
+    }
+
     protected void initReplyManager() {
         if (!started.get()) {
             synchronized (this) {
@@ -76,12 +86,12 @@ public class JmsProducer extends Default
                     }
 
                     if (endpoint.getReplyTo() != null) {
-                        replyManager = 
endpoint.getReplyManager(endpoint.getReplyTo());
+                        replyManager = 
createReplyManager(endpoint.getReplyTo());
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("Using JmsReplyManager: {} to process 
replies from: {}", replyManager, endpoint.getReplyTo());
                         }
                     } else {
-                        replyManager = endpoint.getReplyManager();
+                        replyManager = createReplyManager();
                         LOG.debug("Using JmsReplyManager: {} to process 
replies from temporary queue", replyManager);
                     }
                 } catch (Exception e) {
@@ -92,6 +102,16 @@ public class JmsProducer extends Default
         }
     }
 
+    protected void unInitReplyManager() {
+        try {
+            ServiceHelper.stopService(replyManager);
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        } finally {
+            started.set(false);
+        }
+    }
+
     public boolean process(Exchange exchange, AsyncCallback callback) {
         // deny processing if we are not started
         if (!isRunAllowed()) {
@@ -444,5 +464,35 @@ public class JmsProducer extends Default
 
     protected void doStop() throws Exception {
         super.doStop();
+
+        // must stop/un-init reply manager if it was in use
+        unInitReplyManager();
     }
+
+    protected ReplyManager createReplyManager() throws Exception {
+        // use a temporary queue
+        ReplyManager replyManager = new 
TemporaryQueueReplyManager(getEndpoint().getCamelContext());
+        replyManager.setEndpoint(getEndpoint());
+
+        String name = "JmsReplyManagerTimeoutChecker[" + 
getEndpoint().getEndpointConfiguredDestinationName() + "]";
+        ScheduledExecutorService replyManagerExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+        replyManager.setScheduledExecutorService(replyManagerExecutorService);
+        ServiceHelper.startService(replyManager);
+
+        return replyManager;
+    }
+
+    protected ReplyManager createReplyManager(String replyTo) throws Exception 
{
+        // use a persistent queue
+        ReplyManager replyManager = new 
PersistentQueueReplyManager(getEndpoint().getCamelContext());
+        replyManager.setEndpoint(getEndpoint());
+
+        String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]";
+        ScheduledExecutorService replyManagerExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
+        replyManager.setScheduledExecutorService(replyManagerExecutorService);
+        ServiceHelper.startService(replyManager);
+
+        return replyManager;
+    }
+
 }

Modified: 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1345854&r1=1345853&r2=1345854&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
 Mon Jun  4 07:50:55 2012
@@ -24,6 +24,7 @@ import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
 import org.apache.camel.component.jms.ReplyToType;
@@ -41,6 +42,10 @@ public class PersistentQueueReplyManager
     private String replyToSelectorValue;
     private MessageSelectorCreator dynamicMessageSelector;
 
+    public PersistentQueueReplyManager(CamelContext camelContext) {
+        super(camelContext);
+    }
+
     public String registerReply(ReplyManager replyManager, Exchange exchange, 
AsyncCallback callback,
                                 String originalCorrelationId, String 
correlationId, long requestTimeout) {
         // add to correlation map

Modified: 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1345854&r1=1345853&r2=1345854&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
 Mon Jun  4 07:50:55 2012
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.component.jms.JmsEndpoint;
@@ -44,6 +45,7 @@ import org.springframework.jms.listener.
 public abstract class ReplyManagerSupport extends ServiceSupport implements 
ReplyManager {
 
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    protected final CamelContext camelContext;
     protected ScheduledExecutorService executorService;
     protected JmsEndpoint endpoint;
     protected Destination replyTo;
@@ -52,6 +54,10 @@ public abstract class ReplyManagerSuppor
     protected final long replyToTimeout = 10000;
     protected CorrelationTimeoutMap correlation;
 
+    public ReplyManagerSupport(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     public void setScheduledExecutorService(ScheduledExecutorService 
executorService) {
         this.executorService = executorService;
     }
@@ -228,6 +234,12 @@ public abstract class ReplyManagerSuppor
             listenerContainer.destroy();
             listenerContainer = null;
         }
+
+        // must also stop executor service
+        if (executorService != null) {
+            
camelContext.getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
     }
 
 }

Modified: 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1345854&r1=1345853&r2=1345854&view=diff
==============================================================================
--- 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
 (original)
+++ 
camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
 Mon Jun  4 07:50:55 2012
@@ -23,6 +23,7 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -36,6 +37,10 @@ import org.springframework.jms.support.d
  */
 public class TemporaryQueueReplyManager extends ReplyManagerSupport {
 
+    public TemporaryQueueReplyManager(CamelContext camelContext) {
+        super(camelContext);
+    }
+
     public String registerReply(ReplyManager replyManager, Exchange exchange, 
AsyncCallback callback,
                                 String originalCorrelationId, String 
correlationId, long requestTimeout) {
         // add to correlation map


Reply via email to