Author: davsclaus
Date: Fri Mar 22 11:17:15 2013
New Revision: 1459720

URL: http://svn.apache.org/r1459720
Log:
CAMEL-6199: Fixed camel-jms consumer to remove privte thread pools used by 
listener container, when the consumer is stopped. For example when people add 
and remove routes at runtime.

Added:
    
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java
      - copied, changed from r1459644, 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAllowNullBodyTest.java
Modified:
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java?rev=1459720&r1=1459719&r2=1459720&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
 Fri Mar 22 11:17:15 2013
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.concurrent.ExecutorService;
 import javax.jms.Connection;
 
 import org.apache.camel.FailedToCreateConsumerException;
@@ -37,6 +38,8 @@ public class JmsConsumer extends Default
     private volatile AbstractMessageListenerContainer listenerContainer;
     private volatile EndpointMessageListener messageListener;
     private volatile boolean initialized;
+    private volatile ExecutorService executorService;
+    private volatile boolean shutdownExecutorService;
 
     public JmsConsumer(JmsEndpoint endpoint, Processor processor, 
AbstractMessageListenerContainer listenerContainer) {
         super(endpoint, processor);
@@ -76,6 +79,23 @@ public class JmsConsumer extends Default
     }
 
     /**
+     * Sets the {@link ExecutorService} the {@link 
AbstractMessageListenerContainer} is using (if any).
+     * <p/>
+     * The {@link AbstractMessageListenerContainer} may use a private thread 
pool, and then when this consumer
+     * is stopped, we need to shutdown this thread pool as well, to clean up 
all resources.
+     * If a shared thread pool is used by the {@link 
AbstractMessageListenerContainer} then the lifecycle
+     * of that shared thread pool is handled elsewhere (not by this consumer); 
and therefore
+     * the <tt>shutdownExecutorService</tt> parameter should be <tt>false</tt>.
+     *
+     * @param executorService         the thread pool
+     * @param shutdownExecutorService whether to shutdown the thread pool when 
this consumer stops
+     */
+    void setListenerContainerExecutorService(ExecutorService executorService, 
boolean shutdownExecutorService) {
+        this.executorService = executorService;
+        this.shutdownExecutorService = shutdownExecutorService;
+    }
+
+    /**
      * Starts the JMS listener container
      * <p/>
      * Can be used to start this consumer later if it was configured to not 
auto startup.
@@ -160,6 +180,12 @@ public class JmsConsumer extends Default
         // then we will use updated configuration from jms endpoint that may 
have been managed using JMX
         listenerContainer = null;
         messageListener = null;
+
+        // shutdown thread pool if listener container was using a private 
thread pool
+        if (shutdownExecutorService && executorService != null) {
+            
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+        }
+        executorService = null;
     }
 
     @Override

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1459720&r1=1459719&r2=1459720&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
 Fri Mar 22 11:17:15 2013
@@ -183,6 +183,12 @@ public class JmsEndpoint extends Default
                 log.debug("Using custom TaskExecutor: {} on listener 
container: {}", configuration.getTaskExecutor(), listenerContainer);
             }
             setContainerTaskExecutor(listenerContainer, 
configuration.getTaskExecutor());
+            // we are using a shared thread pool that this listener container 
is using.
+            // store a reference to the consumer, but we should not shutdown 
the thread pool when the consumer stops
+            // as the lifecycle of the shared thread pool is handled elsewhere
+            if (configuration.getTaskExecutor() instanceof ExecutorService) {
+                consumer.setListenerContainerExecutorService((ExecutorService) 
configuration.getTaskExecutor(), false);
+            }
         } else if ((listenerContainer instanceof 
DefaultJmsMessageListenerContainer && 
configuration.getDefaultTaskExecutorType() == null)
                 || !(listenerContainer instanceof 
DefaultJmsMessageListenerContainer)) {
             // preserve backwards compatibility if an explicit Default 
TaskExecutor Type was not set;
@@ -190,6 +196,9 @@ public class JmsEndpoint extends Default
             // use a cached pool as DefaultMessageListenerContainer will 
throttle pool sizing
             ExecutorService executor = 
getCamelContext().getExecutorServiceManager().newCachedThreadPool(consumer, 
consumerName);
             setContainerTaskExecutor(listenerContainer, executor);
+            // we created a new private thread pool that this listener 
container is using, now store a reference on the consumer
+            // so when the consumer is stopped we can shutdown the thread pool 
also, to ensure all resources is shutdown
+            consumer.setListenerContainerExecutorService(executor, true);
         } else {
             // do nothing, as we're working with a 
DefaultJmsMessageListenerContainer with an explicit DefaultTaskExecutorType,
             // so DefaultJmsMessageListenerContainer#createDefaultTaskExecutor 
will handle the creation

Copied: 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java
 (from r1459644, 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAllowNullBodyTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAllowNullBodyTest.java&r1=1459644&r2=1459720&rev=1459720&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAllowNullBodyTest.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsAddAndRemoveRouteManagementTest.java
 Fri Mar 22 11:17:15 2013
@@ -16,71 +16,73 @@
  */
 package org.apache.camel.component.jms;
 
+import java.util.Set;
 import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.CamelExecutionException;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.management.DefaultManagementNamingStrategy;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
 import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
 
 /**
- *
+ * Test that all thread pools is removed when adding and removing a route 
dynamically
  */
-public class JmsAllowNullBodyTest extends CamelTestSupport {
+public class JmsAddAndRemoveRouteManagementTest extends CamelTestSupport {
 
-    @Test
-    public void testAllowNullBodyDefault() throws Exception {
-        getMockEndpoint("mock:result").expectedMessageCount(1);
-        getMockEndpoint("mock:result").message(0).body().isNull();
-        getMockEndpoint("mock:result").message(0).header("bar").isEqualTo(123);
-
-        // allow null body is default enabled
-        template.sendBodyAndHeader("activemq:queue:foo", null, "bar", 123);
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
 
-        assertMockEndpointsSatisfied();
+    protected MBeanServer getMBeanServer() {
+        return 
context.getManagementStrategy().getManagementAgent().getMBeanServer();
     }
 
     @Test
-    public void testAllowNullBody() throws Exception {
-        getMockEndpoint("mock:result").expectedMessageCount(1);
-        getMockEndpoint("mock:result").message(0).body().isNull();
-        getMockEndpoint("mock:result").message(0).header("bar").isEqualTo(123);
+    public void testAddAndRemoveRoute() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
 
-        template.sendBodyAndHeader("activemq:queue:foo?allowNullBody=true", 
null, "bar", 123);
+        Set<ObjectName> before = mbeanServer.queryNames(new 
ObjectName("*:type=threadpools,*"), null);
 
-        assertMockEndpointsSatisfied();
-    }
-
-    @Test
-    public void testAllowNullTextBody() throws Exception {
         getMockEndpoint("mock:result").expectedMessageCount(1);
-        getMockEndpoint("mock:result").message(0).body().isNull();
-        getMockEndpoint("mock:result").message(0).header("bar").isEqualTo(123);
 
-        
template.sendBodyAndHeader("activemq:queue:foo?allowNullBody=true&jmsMessageType=Text",
 null, "bar", 123);
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("activemq:queue:in").routeId("myNewRoute")
+                    .to("activemq:queue:foo");
+            }
+        });
+
+        Set<ObjectName> during = mbeanServer.queryNames(new 
ObjectName("*:type=threadpools,*"), null);
+        assertEquals("There should be one more thread pool in JMX", 
before.size() + 1, during.size());
+
+        template.sendBody("activemq:queue:in", "Hello World");
 
         assertMockEndpointsSatisfied();
-    }
 
-    @Test
-    public void testNoAllowNullBody() throws Exception {
-        try {
-            
template.sendBodyAndHeader("activemq:queue:foo?allowNullBody=false", null, 
"bar", 123);
-            fail("Should have thrown exception");
-        } catch (CamelExecutionException e) {
-            JMSException cause = assertIsInstanceOf(JMSException.class, 
e.getCause().getCause());
-            assertEquals("Cannot send message as message body is null, and 
option allowNullBody is false.", cause.getMessage());
-        }
+        // now stop and remove that route
+        context.stopRoute("myNewRoute");
+        context.removeRoute("myNewRoute");
+
+        Set<ObjectName> after = mbeanServer.queryNames(new 
ObjectName("*:type=threadpools,*"), null);
+        assertEquals("Should have removed all thread pools from removed 
route", before.size(), after.size());
     }
 
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
         ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
         camelContext.addComponent("activemq", 
jmsComponentAutoAcknowledge(connectionFactory));
+
+        DefaultManagementNamingStrategy naming = 
(DefaultManagementNamingStrategy) 
camelContext.getManagementStrategy().getManagementNamingStrategy();
+        naming.setHostName("localhost");
+        naming.setDomainName("org.apache.camel");
+
         return camelContext;
     }
 


Reply via email to