Repository: camel Updated Branches: refs/heads/camel-2.12.x a6a68e453 -> fa2373562 refs/heads/camel-2.13.x 49821e854 -> dff3b533b refs/heads/master 8945cf186 -> 67ee84ed0
CAMEL-7715: Fix SJMS ThreadPool for SjmsConsumer and SjmsProducer. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/67ee84ed Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/67ee84ed Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/67ee84ed Branch: refs/heads/master Commit: 67ee84ed03ca8cd63df8d783bfc256c646e67067 Parents: 8945cf1 Author: Cristiano Nicolai <cristiano.nico...@gmail.com> Authored: Tue Aug 19 11:21:24 2014 +1000 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Tue Aug 19 18:24:07 2014 +0800 ---------------------------------------------------------------------- .../camel/component/sjms/SjmsConsumer.java | 7 +- .../camel/component/sjms/SjmsProducer.java | 7 +- .../sjms/threadpool/ThreadPoolTest.java | 112 +++++++++++++++++++ 3 files changed, 122 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/67ee84ed/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java index 5b01c3a..4ae566d 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java @@ -48,7 +48,7 @@ import org.apache.camel.spi.Synchronization; public class SjmsConsumer extends DefaultConsumer { protected MessageConsumerPool consumers; - private final ExecutorService executor; + private ExecutorService executor; /** * A pool of MessageConsumerResources created at the initialization of the associated consumer. @@ -135,7 +135,6 @@ public class SjmsConsumer extends DefaultConsumer { public SjmsConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); - this.executor = endpoint.getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer"); } @Override @@ -146,6 +145,7 @@ public class SjmsConsumer extends DefaultConsumer { @Override protected void doStart() throws Exception { super.doStart(); + this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsConsumer"); consumers = new MessageConsumerPool(); consumers.fillPool(); } @@ -157,6 +157,9 @@ public class SjmsConsumer extends DefaultConsumer { consumers.drainPool(); consumers = null; } + if(this.executor!=null){ + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor); + } } /** http://git-wip-us.apache.org/repos/asf/camel/blob/67ee84ed/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java index c5d9c1f..582d657 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java @@ -123,16 +123,16 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { } private MessageProducerPool producers; - private final ExecutorService executor; + private ExecutorService executor; public SjmsProducer(Endpoint endpoint) { super(endpoint); - this.executor = endpoint.getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer"); } @Override protected void doStart() throws Exception { super.doStart(); + this.executor = getEndpoint().getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, "SjmsProducer"); if (getProducers() == null) { setProducers(new MessageProducerPool()); getProducers().fillPool(); @@ -146,6 +146,9 @@ public abstract class SjmsProducer extends DefaultAsyncProducer { getProducers().drainPool(); setProducers(null); } + if(this.executor!=null){ + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(this.executor); + } } public abstract MessageProducerResources doCreateProducerModel() throws Exception; http://git-wip-us.apache.org/repos/asf/camel/blob/67ee84ed/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/threadpool/ThreadPoolTest.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/threadpool/ThreadPoolTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/threadpool/ThreadPoolTest.java new file mode 100644 index 0000000..1bff29e --- /dev/null +++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/threadpool/ThreadPoolTest.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.sjms.threadpool; + +import java.lang.management.ManagementFactory; +import java.util.Set; + +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectInstance; +import javax.management.ObjectName; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sjms.support.JmsTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for CAMEL-7715. + * + */ +public class ThreadPoolTest extends JmsTestSupport { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolTest.class); + private static final String FROM_ROUTE = "from"; + private static final String TO_ROUTE = "to"; + + @Override + protected boolean useJmx() { + return true; + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("sjms:queue:foo").routeId(FROM_ROUTE); + from("sjms:queue:foo").to("log:test.log.1?showBody=true").routeId(TO_ROUTE); + } + }; + } + + /** + * Test that only 2 thread pools are created on start + * + * @throws Exception + */ + @Test + public void testContextStart() throws Exception { + assertProducerThreadPoolCount(1); + assertConsumerThreadPoolCount(1); + } + + /** + * Test that ThreadPool is removed when producer is removed + * @throws Exception + */ + @Test + public void testProducerThreadThreadPoolRemoved() throws Exception { + context.stopRoute(FROM_ROUTE); + assertProducerThreadPoolCount(0); + } + + /** + * Test that ThreadPool is removed when consumer is removed + * @throws Exception + */ + @Test + public void testConsumerThreadThreadPoolRemoved() throws Exception { + context.stopRoute(TO_ROUTE); + assertConsumerThreadPoolCount(0); + } + + private void assertProducerThreadPoolCount(final int count) throws Exception { + assertEquals(count, getMbeanCount("\"InOnlyProducer")); + } + + private void assertConsumerThreadPoolCount(final int count) throws Exception { + assertEquals(count, getMbeanCount("\"SjmsConsumer")); + } + + private int getMbeanCount(final String name) throws MalformedObjectNameException { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + Set<ObjectInstance> mbeans = mbs.queryMBeans(new ObjectName("org.apache.camel:type=threadpools,*"), null); + LOGGER.debug("mbeans size: " + mbeans.size()); + int count = 0; + for (ObjectInstance mbean : mbeans) { + LOGGER.debug("mbean: {}", mbean); + if (mbean.getObjectName().getKeyProperty("name").startsWith(name)) { + count++; + } + } + return count; + } + +}