Updated Branches: refs/heads/master 8abd3bb53 -> 4dc4dc361
CAMEL-6374: Fixed seda/vm with multiple consumers to continue to work if routes is stopped/remove etc. As well better stop/shutdown logic to cater for still active consumers. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4dc4dc36 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4dc4dc36 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4dc4dc36 Branch: refs/heads/master Commit: 4dc4dc3618eb4acb5f8eea86bd2479333a473a04 Parents: 8abd3bb Author: Claus Ibsen <[email protected]> Authored: Fri May 17 13:16:53 2013 +0200 Committer: Claus Ibsen <[email protected]> Committed: Fri May 17 13:16:53 2013 +0200 ---------------------------------------------------------------------- .../apache/camel/component/seda/SedaConsumer.java | 18 ++-- .../apache/camel/component/seda/SedaEndpoint.java | 45 +++++++--- .../vm/VmMultipleConsumersKeepRouteTest.java | 63 ++++++++++++++ .../vm/VmMultipleConsumersRemoteRouteTest.java | 66 +++++++++++++++ 4 files changed, 172 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4dc4dc36/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java index 09cbda7..1b7f42b 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java @@ -175,6 +175,9 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, try { // use the end user configured poll timeout exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); + if (LOG.isTraceEnabled()) { + LOG.trace("Polled queue {} with timeout {} ms. -> {}", new Object[]{ObjectHelper.getIdentityHashCode(queue), pollTimeout, exchange}); + } if (exchange != null) { try { // send a new copied exchange with new camel context @@ -242,18 +245,17 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, * @throws Exception can be thrown if processing of the exchange failed */ protected void sendToConsumers(final Exchange exchange) throws Exception { + // validate multiple consumers has been enabled int size = endpoint.getConsumers().size(); + if (size > 1 && !endpoint.isMultipleConsumersSupported()) { + throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint); + } // if there are multiple consumers then multicast to them - if (size > 1) { - - // validate multiple consumers has been enabled - if (!endpoint.isMultipleConsumersSupported()) { - throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint); - } + if (endpoint.isMultipleConsumersSupported()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Multicasting to {} consumers for Exchange: {}", endpoint.getConsumers().size(), exchange); + if (LOG.isTraceEnabled()) { + LOG.trace("Multicasting to {} consumers for Exchange: {}", size, exchange); } // handover completions, as we need to done this when the multicast is done http://git-wip-us.apache.org/repos/asf/camel/blob/4dc4dc36/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index 6a6c3d8..4682b3d 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -149,17 +149,19 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } protected synchronized void updateMulticastProcessor() throws Exception { + // only needed if we support multiple consumers + if (!isMultipleConsumersSupported()) { + return; + } + + // stop old before we create a new if (consumerMulticastProcessor != null) { ServiceHelper.stopService(consumerMulticastProcessor); + consumerMulticastProcessor = null; } int size = getConsumers().size(); - if (size == 0 && multicastExecutor != null) { - // stop the multicast executor as its not needed anymore when size is zero - getCamelContext().getExecutorServiceManager().shutdownGraceful(multicastExecutor); - multicastExecutor = null; - } - if (size > 1) { + if (size >= 1) { if (multicastExecutor == null) { // create multicast executor as we need it when we have more than 1 processor multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, URISupport.sanitizeUri(getEndpointUri()) + "(multicast)"); @@ -172,9 +174,6 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, // create multicast processor multicastStarted = false; consumerMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, false, 0, null, false); - } else { - // not needed - consumerMulticastProcessor = null; } } @@ -394,11 +393,35 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } @Override - protected void doShutdown() throws Exception { + public void stop() throws Exception { + if (getConsumers().isEmpty()) { + super.stop(); + } else { + LOG.debug("There is still active consumers."); + } + } + + @Override + public void shutdown() throws Exception { + if (shutdown.get()) { + LOG.trace("Service already shut down"); + return; + } + // notify component we are shutting down this endpoint if (getComponent() != null) { getComponent().onShutdownEndpoint(this); } + + if (getConsumers().isEmpty()) { + super.shutdown(); + } else { + LOG.debug("There is still active consumers."); + } + } + + @Override + protected void doShutdown() throws Exception { // shutdown thread pool if it was in use if (multicastExecutor != null) { getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor); @@ -407,7 +430,5 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint, // clear queue, as we are shutdown, so if re-created then the queue must be updated queue = null; - - super.doShutdown(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/4dc4dc36/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersKeepRouteTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersKeepRouteTest.java b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersKeepRouteTest.java new file mode 100644 index 0000000..d4171a1 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersKeepRouteTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vm; + +import junit.framework.TestCase; +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; + +public class VmMultipleConsumersKeepRouteTest extends TestCase { + + public void testVmMultipleConsumersKeepRoute() throws Exception { + CamelContext camelContext = new DefaultCamelContext(); + ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); + + RouteBuilder builder = new RouteBuilder(camelContext) { + @Override + public void configure() throws Exception { + from("vm:producer?multipleConsumers=true").routeId("route1").to("mock:route1"); + } + + }; + RouteBuilder builder2 = new RouteBuilder(camelContext) { + @Override + public void configure() throws Exception { + from("vm:producer?multipleConsumers=true").routeId("route2").to("mock:route2"); + } + }; + camelContext.addRoutes(builder); + camelContext.addRoutes(builder2); + + camelContext.start(); + + MockEndpoint mock1 = (MockEndpoint) camelContext.getEndpoint("mock:route1"); + MockEndpoint mock2 = (MockEndpoint) camelContext.getEndpoint("mock:route2"); + mock1.expectedMessageCount(100); + mock2.expectedMessageCount(100); + + for (int i = 0; i < 100; i++) { + producerTemplate.sendBody("vm:producer?multipleConsumers=true", i); + } + + MockEndpoint.assertIsSatisfied(mock1, mock2); + + camelContext.stop(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/4dc4dc36/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersRemoteRouteTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersRemoteRouteTest.java b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersRemoteRouteTest.java new file mode 100644 index 0000000..41997fe --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersRemoteRouteTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vm; + +import junit.framework.TestCase; +import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; + +public class VmMultipleConsumersRemoteRouteTest extends TestCase { + + public void testVmMultipleConsumersRemoteRoute() throws Exception { + CamelContext camelContext = new DefaultCamelContext(); + ProducerTemplate producerTemplate = camelContext.createProducerTemplate(); + + RouteBuilder builder = new RouteBuilder(camelContext) { + @Override + public void configure() throws Exception { + from("vm:producer?multipleConsumers=true").routeId("route1").to("mock:route1"); + } + + }; + RouteBuilder builder2 = new RouteBuilder(camelContext) { + @Override + public void configure() throws Exception { + from("vm:producer?multipleConsumers=true").routeId("route2").to("mock:route2"); + } + }; + camelContext.addRoutes(builder); + camelContext.addRoutes(builder2); + + camelContext.start(); + + camelContext.stopRoute("route2"); + camelContext.removeRoute("route2"); + + MockEndpoint mock1 = (MockEndpoint) camelContext.getEndpoint("mock:route1"); + MockEndpoint mock2 = (MockEndpoint) camelContext.getEndpoint("mock:route2"); + mock1.expectedMessageCount(100); + mock2.expectedMessageCount(0); + + for (int i = 0; i < 100; i++) { + producerTemplate.sendBody("vm:producer?multipleConsumers=true", i); + } + + MockEndpoint.assertIsSatisfied(mock1, mock2); + + camelContext.stop(); + } +}
