Author: davsclaus Date: Fri Mar 22 11:17:15 2013 New Revision: 1459720 URL: http://svn.apache.org/r1459720 Log: CAMEL-6199: Fixed camel-jms consumer to remove privte thread pools used by listener container, when the consumer is stopped. For example when people add and remove routes at runtime.
Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java - copied, changed from r1459644, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAllowNullBodyTest.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1459720&r1=1459719&r2=1459720&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java Fri Mar 22 11:17:15 2013 @@ -16,6 +16,7 @@ */ package org.apache.camel.component.jms; +import java.util.concurrent.ExecutorService; import javax.jms.Connection; import org.apache.camel.FailedToCreateConsumerException; @@ -37,6 +38,8 @@ public class JmsConsumer extends Default private volatile AbstractMessageListenerContainer listenerContainer; private volatile EndpointMessageListener messageListener; private volatile boolean initialized; + private volatile ExecutorService executorService; + private volatile boolean shutdownExecutorService; public JmsConsumer(JmsEndpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) { super(endpoint, processor); @@ -76,6 +79,23 @@ public class JmsConsumer extends Default } /** + * Sets the {@link ExecutorService} the {@link AbstractMessageListenerContainer} is using (if any). + * <p/> + * The {@link AbstractMessageListenerContainer} may use a private thread pool, and then when this consumer + * is stopped, we need to shutdown this thread pool as well, to clean up all resources. + * If a shared thread pool is used by the {@link AbstractMessageListenerContainer} then the lifecycle + * of that shared thread pool is handled elsewhere (not by this consumer); and therefore + * the <tt>shutdownExecutorService</tt> parameter should be <tt>false</tt>. + * + * @param executorService the thread pool + * @param shutdownExecutorService whether to shutdown the thread pool when this consumer stops + */ + void setListenerContainerExecutorService(ExecutorService executorService, boolean shutdownExecutorService) { + this.executorService = executorService; + this.shutdownExecutorService = shutdownExecutorService; + } + + /** * Starts the JMS listener container * <p/> * Can be used to start this consumer later if it was configured to not auto startup. @@ -160,6 +180,12 @@ public class JmsConsumer extends Default // then we will use updated configuration from jms endpoint that may have been managed using JMX listenerContainer = null; messageListener = null; + + // shutdown thread pool if listener container was using a private thread pool + if (shutdownExecutorService && executorService != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService); + } + executorService = null; } @Override Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1459720&r1=1459719&r2=1459720&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Mar 22 11:17:15 2013 @@ -183,6 +183,12 @@ public class JmsEndpoint extends Default log.debug("Using custom TaskExecutor: {} on listener container: {}", configuration.getTaskExecutor(), listenerContainer); } setContainerTaskExecutor(listenerContainer, configuration.getTaskExecutor()); + // we are using a shared thread pool that this listener container is using. + // store a reference to the consumer, but we should not shutdown the thread pool when the consumer stops + // as the lifecycle of the shared thread pool is handled elsewhere + if (configuration.getTaskExecutor() instanceof ExecutorService) { + consumer.setListenerContainerExecutorService((ExecutorService) configuration.getTaskExecutor(), false); + } } else if ((listenerContainer instanceof DefaultJmsMessageListenerContainer && configuration.getDefaultTaskExecutorType() == null) || !(listenerContainer instanceof DefaultJmsMessageListenerContainer)) { // preserve backwards compatibility if an explicit Default TaskExecutor Type was not set; @@ -190,6 +196,9 @@ public class JmsEndpoint extends Default // use a cached pool as DefaultMessageListenerContainer will throttle pool sizing ExecutorService executor = getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer, consumerName); setContainerTaskExecutor(listenerContainer, executor); + // we created a new private thread pool that this listener container is using, now store a reference on the consumer + // so when the consumer is stopped we can shutdown the thread pool also, to ensure all resources is shutdown + consumer.setListenerContainerExecutorService(executor, true); } else { // do nothing, as we're working with a DefaultJmsMessageListenerContainer with an explicit DefaultTaskExecutorType, // so DefaultJmsMessageListenerContainer#createDefaultTaskExecutor will handle the creation Copied: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java (from r1459644, camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAllowNullBodyTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAllowNullBodyTest.java&r1=1459644&r2=1459720&rev=1459720&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAllowNullBodyTest.java (original) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java Fri Mar 22 11:17:15 2013 @@ -16,71 +16,73 @@ */ package org.apache.camel.component.jms; +import java.util.Set; import javax.jms.ConnectionFactory; -import javax.jms.JMSException; +import javax.management.MBeanServer; +import javax.management.ObjectName; import org.apache.camel.CamelContext; -import org.apache.camel.CamelExecutionException; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.management.DefaultManagementNamingStrategy; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; /** - * + * Test that all thread pools is removed when adding and removing a route dynamically */ -public class JmsAllowNullBodyTest extends CamelTestSupport { +public class JmsAddAndRemoveRouteManagementTest extends CamelTestSupport { - @Test - public void testAllowNullBodyDefault() throws Exception { - getMockEndpoint("mock:result").expectedMessageCount(1); - getMockEndpoint("mock:result").message(0).body().isNull(); - getMockEndpoint("mock:result").message(0).header("bar").isEqualTo(123); - - // allow null body is default enabled - template.sendBodyAndHeader("activemq:queue:foo", null, "bar", 123); + @Override + protected boolean useJmx() { + return true; + } - assertMockEndpointsSatisfied(); + protected MBeanServer getMBeanServer() { + return context.getManagementStrategy().getManagementAgent().getMBeanServer(); } @Test - public void testAllowNullBody() throws Exception { - getMockEndpoint("mock:result").expectedMessageCount(1); - getMockEndpoint("mock:result").message(0).body().isNull(); - getMockEndpoint("mock:result").message(0).header("bar").isEqualTo(123); + public void testAddAndRemoveRoute() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); - template.sendBodyAndHeader("activemq:queue:foo?allowNullBody=true", null, "bar", 123); + Set<ObjectName> before = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null); - assertMockEndpointsSatisfied(); - } - - @Test - public void testAllowNullTextBody() throws Exception { getMockEndpoint("mock:result").expectedMessageCount(1); - getMockEndpoint("mock:result").message(0).body().isNull(); - getMockEndpoint("mock:result").message(0).header("bar").isEqualTo(123); - template.sendBodyAndHeader("activemq:queue:foo?allowNullBody=true&jmsMessageType=Text", null, "bar", 123); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("activemq:queue:in").routeId("myNewRoute") + .to("activemq:queue:foo"); + } + }); + + Set<ObjectName> during = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null); + assertEquals("There should be one more thread pool in JMX", before.size() + 1, during.size()); + + template.sendBody("activemq:queue:in", "Hello World"); assertMockEndpointsSatisfied(); - } - @Test - public void testNoAllowNullBody() throws Exception { - try { - template.sendBodyAndHeader("activemq:queue:foo?allowNullBody=false", null, "bar", 123); - fail("Should have thrown exception"); - } catch (CamelExecutionException e) { - JMSException cause = assertIsInstanceOf(JMSException.class, e.getCause().getCause()); - assertEquals("Cannot send message as message body is null, and option allowNullBody is false.", cause.getMessage()); - } + // now stop and remove that route + context.stopRoute("myNewRoute"); + context.removeRoute("myNewRoute"); + + Set<ObjectName> after = mbeanServer.queryNames(new ObjectName("*:type=threadpools,*"), null); + assertEquals("Should have removed all thread pools from removed route", before.size(), after.size()); } protected CamelContext createCamelContext() throws Exception { CamelContext camelContext = super.createCamelContext(); ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + + DefaultManagementNamingStrategy naming = (DefaultManagementNamingStrategy) camelContext.getManagementStrategy().getManagementNamingStrategy(); + naming.setHostName("localhost"); + naming.setDomainName("org.apache.camel"); + return camelContext; }