Author: davsclaus Date: Mon Jun 4 07:06:55 2012 New Revision: 1345844 URL: http://svn.apache.org/viewvc?rev=1345844&view=rev Log: CAMEL-5309: Fixed issue when reusing previous jms endpoint with old reply manager, when readding a route. The reply manager in use should be be be associated with its producer, and also tied to the lifecycle of the producer.
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java - copied, changed from r1345686, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1345844&r1=1345843&r2=1345844&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Mon Jun 4 07:06:55 2012 @@ -16,11 +16,8 @@ */ package org.apache.camel.component.jms; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -43,16 +40,12 @@ import org.apache.camel.Service; import org.apache.camel.ServiceStatus; import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.component.jms.reply.PersistentQueueReplyManager; -import org.apache.camel.component.jms.reply.ReplyManager; -import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.SynchronousDelegateProducer; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.UnsafeUriCharactersEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,10 +74,6 @@ public class JmsEndpoint extends Default private Destination destination; private String selector; private JmsConfiguration configuration; - private final Map<String, ReplyManager> replyToReplyManager = new HashMap<String, ReplyManager>(); - private ReplyManager replyManager; - // scheduled executor to check for timeout (reply not received) - private ScheduledExecutorService replyManagerExecutorService; private final AtomicBoolean running = new AtomicBoolean(); private volatile boolean destroying; @@ -177,14 +166,6 @@ public class JmsEndpoint extends Default notifyAll(); } } - public void destroyMessageListenerContainer(final AbstractMessageListenerContainer listenerContainer) { - destroying = true; - this.getReplyManagerExecutorService().execute(new Runnable() { - public void run() { - destroyMessageListenerContainerInternal(listenerContainer); - } - }); - } public AbstractMessageListenerContainer createMessageListenerContainer() throws Exception { return configuration.createMessageListenerContainer(this); @@ -389,31 +370,6 @@ public class JmsEndpoint extends Default return true; } - public synchronized ReplyManager getReplyManager() throws Exception { - if (replyManager == null) { - // use a temporary queue - replyManager = new TemporaryQueueReplyManager(); - replyManager.setEndpoint(this); - replyManager.setScheduledExecutorService(getReplyManagerExecutorService()); - ServiceHelper.startService(replyManager); - } - return replyManager; - } - - public synchronized ReplyManager getReplyManager(String replyTo) throws Exception { - ReplyManager answer = replyToReplyManager.get(replyTo); - if (answer == null) { - // use a persistent queue - answer = new PersistentQueueReplyManager(); - answer.setEndpoint(this); - answer.setScheduledExecutorService(getReplyManagerExecutorService()); - ServiceHelper.startService(answer); - // remember this manager so we can re-use it - replyToReplyManager.put(replyTo, answer); - } - return answer; - } - public boolean isPubSubDomain() { return pubSubDomain; } @@ -449,7 +405,6 @@ public class JmsEndpoint extends Default return metadata; } - /** * Returns the {@link JmsOperations} used for metadata operations such as creating temporary destinations */ @@ -461,14 +416,6 @@ public class JmsEndpoint extends Default return template; } - protected synchronized ScheduledExecutorService getReplyManagerExecutorService() { - if (replyManagerExecutorService == null) { - String name = "JmsReplyManagerTimeoutChecker[" + getEndpointConfiguredDestinationName() + "]"; - replyManagerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); - } - return replyManagerExecutorService; - } - protected ExecutorService getAsyncStartStopExecutorService() { if (getComponent() == null) { throw new IllegalStateException("AsyncStartStopListener requires JmsComponent to be configured on this endpoint: " + this); @@ -492,23 +439,6 @@ public class JmsEndpoint extends Default @Override protected void doStop() throws Exception { running.set(false); - - if (replyManager != null) { - ServiceHelper.stopService(replyManager); - replyManager = null; - } - - if (!replyToReplyManager.isEmpty()) { - for (ReplyManager replyManager : replyToReplyManager.values()) { - ServiceHelper.stopService(replyManager); - } - replyToReplyManager.clear(); - } - - if (replyManagerExecutorService != null) { - getCamelContext().getExecutorServiceManager().shutdownNow(replyManagerExecutorService); - replyManagerExecutorService = null; - } } // Delegated properties from the configuration Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=1345844&r1=1345843&r2=1345844&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java Mon Jun 4 07:06:55 2012 @@ -17,6 +17,7 @@ package org.apache.camel.component.jms; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.Destination; @@ -29,11 +30,14 @@ import org.apache.camel.Exchange; import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.RuntimeExchangeException; import org.apache.camel.component.jms.JmsConfiguration.CamelJmsTemplate; +import org.apache.camel.component.jms.reply.PersistentQueueReplyManager; import org.apache.camel.component.jms.reply.ReplyManager; +import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager; import org.apache.camel.component.jms.reply.UseMessageIdAsCorrelationIdMessageSentCallback; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.ValueHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +46,7 @@ import org.springframework.jms.core.Mess import org.springframework.jms.support.JmsUtils; import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName; + /** * @version */ @@ -59,6 +64,11 @@ public class JmsProducer extends Default this.endpoint = endpoint; } + @Override + public JmsEndpoint getEndpoint() { + return (JmsEndpoint) super.getEndpoint(); + } + protected void initReplyManager() { if (!started.get()) { synchronized (this) { @@ -76,12 +86,12 @@ public class JmsProducer extends Default } if (endpoint.getReplyTo() != null) { - replyManager = endpoint.getReplyManager(endpoint.getReplyTo()); + replyManager = createReplyManager(endpoint.getReplyTo()); if (LOG.isDebugEnabled()) { LOG.debug("Using JmsReplyManager: {} to process replies from: {}", replyManager, endpoint.getReplyTo()); } } else { - replyManager = endpoint.getReplyManager(); + replyManager = createReplyManager(); LOG.debug("Using JmsReplyManager: {} to process replies from temporary queue", replyManager); } } catch (Exception e) { @@ -92,6 +102,16 @@ public class JmsProducer extends Default } } + protected void unInitReplyManager() { + try { + ServiceHelper.stopService(replyManager); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + started.set(false); + } + } + public boolean process(Exchange exchange, AsyncCallback callback) { // deny processing if we are not started if (!isRunAllowed()) { @@ -444,5 +464,35 @@ public class JmsProducer extends Default protected void doStop() throws Exception { super.doStop(); + + // must stop/un-init reply manager if it was in use + unInitReplyManager(); } + + protected ReplyManager createReplyManager() throws Exception { + // use a temporary queue + ReplyManager replyManager = new TemporaryQueueReplyManager(getEndpoint().getCamelContext()); + replyManager.setEndpoint(getEndpoint()); + + String name = "JmsReplyManagerTimeoutChecker[" + getEndpoint().getEndpointConfiguredDestinationName() + "]"; + ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); + replyManager.setScheduledExecutorService(replyManagerExecutorService); + ServiceHelper.startService(replyManager); + + return replyManager; + } + + protected ReplyManager createReplyManager(String replyTo) throws Exception { + // use a persistent queue + ReplyManager replyManager = new PersistentQueueReplyManager(getEndpoint().getCamelContext()); + replyManager.setEndpoint(getEndpoint()); + + String name = "JmsReplyManagerTimeoutChecker[" + replyTo + "]"; + ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); + replyManager.setScheduledExecutorService(replyManagerExecutorService); + ServiceHelper.startService(replyManager); + + return replyManager; + } + } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1345844&r1=1345843&r2=1345844&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Mon Jun 4 07:06:55 2012 @@ -24,6 +24,7 @@ import javax.jms.Message; import javax.jms.Session; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.component.jms.DefaultSpringErrorHandler; import org.apache.camel.component.jms.ReplyToType; @@ -41,6 +42,10 @@ public class PersistentQueueReplyManager private String replyToSelectorValue; private MessageSelectorCreator dynamicMessageSelector; + public PersistentQueueReplyManager(CamelContext camelContext) { + super(camelContext); + } + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) { // add to correlation map Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1345844&r1=1345843&r2=1345844&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Mon Jun 4 07:06:55 2012 @@ -24,6 +24,7 @@ import javax.jms.JMSException; import javax.jms.Message; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.component.jms.JmsEndpoint; @@ -45,6 +46,7 @@ import org.springframework.jms.listener. public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager { protected final Logger log = LoggerFactory.getLogger(getClass()); + protected final CamelContext camelContext; protected ScheduledExecutorService executorService; protected JmsEndpoint endpoint; protected Destination replyTo; @@ -53,6 +55,10 @@ public abstract class ReplyManagerSuppor protected final long replyToTimeout = 10000; protected CorrelationTimeoutMap correlation; + public ReplyManagerSupport(CamelContext camelContext) { + this.camelContext = camelContext; + } + public void setScheduledExecutorService(ScheduledExecutorService executorService) { this.executorService = executorService; } @@ -229,6 +235,12 @@ public abstract class ReplyManagerSuppor listenerContainer.destroy(); listenerContainer = null; } + + // must also stop executor service + if (executorService != null) { + camelContext.getExecutorServiceManager().shutdownNow(executorService); + executorService = null; + } } } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1345844&r1=1345843&r2=1345844&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Mon Jun 4 07:06:55 2012 @@ -23,6 +23,7 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.component.jms.DefaultSpringErrorHandler; import org.springframework.jms.listener.AbstractMessageListenerContainer; @@ -36,6 +37,10 @@ import org.springframework.jms.support.d */ public class TemporaryQueueReplyManager extends ReplyManagerSupport { + public TemporaryQueueReplyManager(CamelContext camelContext) { + super(camelContext); + } + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) { // add to correlation map Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java (from r1345686, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java&r1=1345686&r2=1345844&rev=1345844&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyExclusiveReplyToRemoveAddRouteTest.java Mon Jun 4 07:06:55 2012 @@ -19,47 +19,40 @@ package org.apache.camel.component.jms; import javax.jms.ConnectionFactory; import org.apache.camel.CamelContext; -import org.apache.camel.CamelExecutionException; -import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.test.junit4.CamelTestSupport; -import org.apache.camel.util.StopWatch; import org.junit.Test; import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; /** - * Using exclusive fixed replyTo queues should be faster as there is no need for - * JMSMessage selectors. - * - * @version + * @version */ -public class JmsRequestReplyExclusiveReplyToTest extends CamelTestSupport { +public class JmsRequestReplyExclusiveReplyToRemoveAddRouteTest extends CamelTestSupport { @Test public void testJmsRequestReplyExclusiveFixedReplyTo() throws Exception { - StopWatch watch = new StopWatch(); + assertEquals("Hello A", template.requestBody("direct:start", "A")); - assertEquals("Hello A", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "A")); - assertEquals("Hello B", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "B")); - assertEquals("Hello C", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "C")); - assertEquals("Hello D", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "D")); - assertEquals("Hello E", template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Exclusive", "E")); + // stop and remove route + context.stopRoute("start"); + context.removeRoute("start"); - long delta = watch.stop(); - assertTrue("Should be faster than about 4 seconds, was: " + delta, delta < 4200); - } + // add new route using same jms endpoint uri + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start2").routeId("start2") + .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive") + .to("log:start2"); + } + }); + // and it should still work - @Test - public void testInvalidConfiguration() throws Exception { - try { - template.requestBody("activemq:queue:foo?replyTo=bar&replyToType=Temporary", "Hello World"); - fail("Should have thrown exception"); - } catch (CamelExecutionException e) { - assertIsInstanceOf(FailedToCreateProducerException.class, e.getCause()); - assertIsInstanceOf(IllegalArgumentException.class, e.getCause().getCause()); - assertEquals("ReplyToType Temporary is not supported when replyTo bar is also configured.", e.getCause().getCause().getMessage()); - } + assertEquals("Hello B", template.requestBody("direct:start2", "B")); + assertEquals("Hello C", template.requestBody("direct:start2", "C")); + assertEquals("Hello D", template.requestBody("direct:start2", "D")); + assertEquals("Hello E", template.requestBody("direct:start2", "E")); } protected CamelContext createCamelContext() throws Exception { @@ -74,7 +67,11 @@ public class JmsRequestReplyExclusiveRep return new RouteBuilder() { @Override public void configure() throws Exception { - from("activemq:queue:foo") + from("direct:start").routeId("start") + .to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive") + .to("log:start"); + + from("activemq:queue:foo").routeId("foo") .transform(body().prepend("Hello ")); } };