Author: davsclaus Date: Mon Jun 4 07:50:55 2012 New Revision: 1345854 URL: http://svn.apache.org/viewvc?rev=1345854&view=rev Log: CAMEL-5309: Fixed issue when reusing previous jms endpoint with old reply manager, when readding a route. The reply manager in use should be be be associated with its producer, and also tied to the lifecycle of the producer.
Added: camel/branches/camel-2.9.x/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java - copied unchanged from r1345844, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1345844 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1345854&r1=1345853&r2=1345854&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Jun 4 07:50:55 2012 @@ -16,11 +16,8 @@ */ package org.apache.camel.component.jms; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -43,16 +40,12 @@ import org.apache.camel.Service; import org.apache.camel.ServiceStatus; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.component.jms.reply.PersistentQueueReplyManager; -import org.apache.camel.component.jms.reply.ReplyManager; -import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.SynchronousDelegateProducer; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.UnsafeUriCharactersEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,10 +74,6 @@ public class JmsEndpoint extends Default private Destination destination; private String selector; private JmsConfiguration configuration; - private final Map<String, ReplyManager> replyToReplyManager = new HashMap<String, ReplyManager>(); - private ReplyManager replyManager; - // scheduled executor to check for timeout (reply not received) - private ScheduledExecutorService replyManagerExecutorService; private final AtomicBoolean running = new AtomicBoolean(); private volatile boolean destroying; @@ -177,14 +166,6 @@ public class JmsEndpoint extends Default notifyAll(); } } - public void destroyMessageListenerContainer(final AbstractMessageListenerContainer listenerContainer) { - destroying = true; - this.getReplyManagerExecutorService().execute(new Runnable() { - public void run() { - destroyMessageListenerContainerInternal(listenerContainer); - } - }); - } public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception { return configuration.createMessageListenerContainer(this); @@ -383,31 +364,6 @@ public class JmsEndpoint extends Default return true; } - public synchronized ReplyManager getReplyManager() throws Exception { - if (replyManager == null) { - // use a temporary queue - replyManager = new TemporaryQueueReplyManager(); - replyManager.setEndpoint(this); - replyManager.setScheduledExecutorService(getReplyManagerExecutorService()); - ServiceHelper.startService(replyManager); - } - return replyManager; - } - - public synchronized ReplyManager getReplyManager(String replyTo) throws Exception { - ReplyManager answer = replyToReplyManager.get(replyTo); - if (answer == null) { - // use a persistent queue - answer = new PersistentQueueReplyManager(); - answer.setEndpoint(this); - answer.setScheduledExecutorService(getReplyManagerExecutorService()); - ServiceHelper.startService(answer); - // remember this manager so we can re-use it - replyToReplyManager.put(replyTo, answer); - } - return answer; - } - public boolean isPubSubDomain() { return pubSubDomain; } @@ -443,7 +399,6 @@ public class JmsEndpoint extends Default return metadata; } - /** * Returns the {@link JmsOperations} used for metadata operations such as creating temporary destinations */ @@ -455,14 +410,6 @@ public class JmsEndpoint extends Default return template; } - protected synchronized ScheduledExecutorService getReplyManagerExecutorService() { - if (replyManagerExecutorService == null) { - String name = "JmsReplyManagerTimeoutChecker[" + getEndpointConfiguredDestinationName() + "]"; - replyManagerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); - } - return replyManagerExecutorService; - } - /** * State whether this endpoint is running (eg started) */ @@ -478,18 +425,6 @@ public class JmsEndpoint extends Default @Override protected void doStop() throws Exception { running.set(false); - - if (replyManager != null) { - ServiceHelper.stopService(replyManager); - replyManager = null; - } - - if (!replyToReplyManager.isEmpty()) { - for (ReplyManager replyManager : replyToReplyManager.values()) { - ServiceHelper.stopService(replyManager); - } - replyToReplyManager.clear(); - } } // Delegated properties from the configuration Modified: camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1345854&r1=1345853&r2=1345854&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original) +++ camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Jun 4 07:50:55 2012 @@ -17,6 +17,7 @@ package org.apache.camel.component.jms; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.Destination; @@ -29,11 +30,14 @@ import org.apache.camel.Exchange; import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.RuntimeExchangeException; import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate; +import org.apache.camel.component.jms.reply.PersistentQueueReplyManager; import org.apache.camel.component.jms.reply.ReplyManager; +import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager; import org.apache.camel.component.jms.reply.UseMessageIdAsCorrelationIdMessageSentCallback; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.ValueHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +46,7 @@ import org.springframework.jms.core.Mess import org.springframework.jms.support.JmsUtils; import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName; + /** * @version */ @@ -59,6 +64,11 @@ public class JmsProducer extends Default this.endpoint = endpoint; } + @Override + public JmsEndpoint getEndpoint() { + return (JmsEndpoint) super.getEndpoint(); + } + protected void initReplyManager() { if (!started.get()) { synchronized (this) { @@ -76,12 +86,12 @@ public class JmsProducer extends Default } if (endpoint.getReplyTo() != null) { - replyManager = endpoint.getReplyManager(endpoint.getReplyTo()); + replyManager = createReplyManager(endpoint.getReplyTo()); if (LOG.isDebugEnabled()) { LOG.debug("Using JmsReplyManager: {} to process replies from: {}", replyManager, endpoint.getReplyTo()); } } else { - replyManager = endpoint.getReplyManager(); + replyManager = createReplyManager(); LOG.debug("Using JmsReplyManager: {} to process replies from temporary queue", replyManager); } } catch (Exception e) { @@ -92,6 +102,16 @@ public class JmsProducer extends Default } } + protected void unInitReplyManager() { + try { + ServiceHelper.stopService(replyManager); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + started.set(false); + } + } + public boolean process(Exchange exchange, AsyncCallback callback) { // deny processing if we are not started if (!isRunAllowed()) { @@ -444,5 +464,35 @@ public class JmsProducer extends Default protected void doStop() throws Exception { super.doStop(); + + // must stop/un-init reply manager if it was in use + unInitReplyManager(); } + + protected ReplyManager createReplyManager() throws Exception { + // use a temporary queue + ReplyManager replyManager = new TemporaryQueueReplyManager(getEndpoint().getCamelContext()); + replyManager.setEndpoint(getEndpoint()); + + String name = "JmsReplyManagerTimeoutChecker[" + getEndpoint().getEndpointConfiguredDestinationName() + "]"; + ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); + replyManager.setScheduledExecutorService(replyManagerExecutorService); + ServiceHelper.startService(replyManager); + + return replyManager; + } + + protected ReplyManager createReplyManager(String replyTo) throws Exception { + // use a persistent queue + ReplyManager replyManager = new PersistentQueueReplyManager(getEndpoint().getCamelContext()); + replyManager.setEndpoint(getEndpoint()); + + String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]"; + ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); + replyManager.setScheduledExecutorService(replyManagerExecutorService); + ServiceHelper.startService(replyManager); + + return replyManager; + } + } Modified: camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1345854&r1=1345853&r2=1345854&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original) +++ camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Mon Jun 4 07:50:55 2012 @@ -24,6 +24,7 @@ import javax.jms.Message; import javax.jms.Session; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.component.jms.DefaultSpringErrorHandler; import org.apache.camel.component.jms.ReplyToType; @@ -41,6 +42,10 @@ public class PersistentQueueReplyManager private String replyToSelectorValue; private MessageSelectorCreator dynamicMessageSelector; + public PersistentQueueReplyManager(CamelContext camelContext) { + super(camelContext); + } + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) { // add to correlation map Modified: camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1345854&r1=1345853&r2=1345854&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original) +++ camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Mon Jun 4 07:50:55 2012 @@ -24,6 +24,7 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.component.jms.JmsEndpoint; @@ -44,6 +45,7 @@ import org.springframework.jms.listener. public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager { protected final Logger log = LoggerFactory.getLogger(getClass()); + protected final CamelContext camelContext; protected ScheduledExecutorService executorService; protected JmsEndpoint endpoint; protected Destination replyTo; @@ -52,6 +54,10 @@ public abstract class ReplyManagerSuppor protected final long replyToTimeout = 10000; protected CorrelationTimeoutMap correlation; + public ReplyManagerSupport(CamelContext camelContext) { + this.camelContext = camelContext; + } + public void setScheduledExecutorService(ScheduledExecutorService executorService) { this.executorService = executorService; } @@ -228,6 +234,12 @@ public abstract class ReplyManagerSuppor listenerContainer.destroy(); listenerContainer = null; } + + // must also stop executor service + if (executorService != null) { + camelContext.getExecutorServiceManager().shutdownNow(executorService); + executorService = null; + } } } Modified: camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1345854&r1=1345853&r2=1345854&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original) +++ camel/branches/camel-2.9.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Mon Jun 4 07:50:55 2012 @@ -23,6 +23,7 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.component.jms.DefaultSpringErrorHandler; import org.springframework.jms.listener.AbstractMessageListenerContainer; @@ -36,6 +37,10 @@ import org.springframework.jms.support.d */ public class TemporaryQueueReplyManager extends ReplyManagerSupport { + public TemporaryQueueReplyManager(CamelContext camelContext) { + super(camelContext); + } + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) { // add to correlation map