Updated Branches:
  refs/heads/camel-2.11.x de0f04819 -> 77e49c6bf

CAMEL-6374: Fixed seda/vm with multiple consumers to continue to work if routes 
is stopped/remove etc. As well better stop/shutdown logic to cater for still 
active consumers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/77e49c6b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/77e49c6b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/77e49c6b

Branch: refs/heads/camel-2.11.x
Commit: 77e49c6bfd3838dbcef05542eba6205fd04a8f12
Parents: de0f048
Author: Claus Ibsen <[email protected]>
Authored: Fri May 17 13:16:53 2013 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Fri May 17 13:17:25 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/seda/SedaConsumer.java  |   18 ++--
 .../apache/camel/component/seda/SedaEndpoint.java  |   45 +++++++---
 .../vm/VmMultipleConsumersKeepRouteTest.java       |   63 ++++++++++++++
 .../vm/VmMultipleConsumersRemoteRouteTest.java     |   66 +++++++++++++++
 4 files changed, 172 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/77e49c6b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 09cbda7..1b7f42b 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -175,6 +175,9 @@ public class SedaConsumer extends ServiceSupport implements 
Consumer, Runnable,
             try {
                 // use the end user configured poll timeout
                 exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Polled queue {} with timeout {} ms. -> {}", new 
Object[]{ObjectHelper.getIdentityHashCode(queue), pollTimeout, exchange});
+                }
                 if (exchange != null) {
                     try {
                         // send a new copied exchange with new camel context
@@ -242,18 +245,17 @@ public class SedaConsumer extends ServiceSupport 
implements Consumer, Runnable,
      * @throws Exception can be thrown if processing of the exchange failed
      */
     protected void sendToConsumers(final Exchange exchange) throws Exception {
+        // validate multiple consumers has been enabled
         int size = endpoint.getConsumers().size();
+        if (size > 1 && !endpoint.isMultipleConsumersSupported()) {
+            throw new IllegalStateException("Multiple consumers for the same 
endpoint is not allowed: " + endpoint);
+        }
 
         // if there are multiple consumers then multicast to them
-        if (size > 1) {
-
-            // validate multiple consumers has been enabled
-            if (!endpoint.isMultipleConsumersSupported()) {
-                throw new IllegalStateException("Multiple consumers for the 
same endpoint is not allowed: " + endpoint);
-            }
+        if (endpoint.isMultipleConsumersSupported()) {
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Multicasting to {} consumers for Exchange: {}", 
endpoint.getConsumers().size(), exchange);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Multicasting to {} consumers for Exchange: {}", 
size, exchange);
             }
 
             // handover completions, as we need to done this when the 
multicast is done

http://git-wip-us.apache.org/repos/asf/camel/blob/77e49c6b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index 60b6c6a..5d36d26 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -142,17 +142,19 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
     }
 
     protected synchronized void updateMulticastProcessor() throws Exception {
+        // only needed if we support multiple consumers
+        if (!isMultipleConsumersSupported()) {
+            return;
+        }
+
+        // stop old before we create a new
         if (consumerMulticastProcessor != null) {
             ServiceHelper.stopService(consumerMulticastProcessor);
+            consumerMulticastProcessor = null;
         }
 
         int size = getConsumers().size();
-        if (size == 0 && multicastExecutor != null) {
-            // stop the multicast executor as its not needed anymore when size 
is zero
-            
getCamelContext().getExecutorServiceManager().shutdownGraceful(multicastExecutor);
-            multicastExecutor = null;
-        }
-        if (size > 1) {
+        if (size >= 1) {
             if (multicastExecutor == null) {
                 // create multicast executor as we need it when we have more 
than 1 processor
                 multicastExecutor = 
getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, 
URISupport.sanitizeUri(getEndpointUri()) + "(multicast)");
@@ -165,9 +167,6 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
             // create multicast processor
             multicastStarted = false;
             consumerMulticastProcessor = new 
MulticastProcessor(getCamelContext(), processors, null, true, 
multicastExecutor, false, false, false, 0, null, false);
-        } else {
-            // not needed
-            consumerMulticastProcessor = null;
         }
     }
 
@@ -387,11 +386,35 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
     }
 
     @Override
-    protected void doShutdown() throws Exception {
+    public void stop() throws Exception {
+        if (getConsumers().isEmpty()) {
+            super.stop();
+        } else {
+            LOG.debug("There is still active consumers.");
+        }
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+        if (shutdown.get()) {
+            LOG.trace("Service already shut down");
+            return;
+        }
+
         // notify component we are shutting down this endpoint
         if (getComponent() != null) {
             getComponent().onShutdownEndpoint(this);
         }
+
+        if (getConsumers().isEmpty()) {
+            super.shutdown();
+        } else {
+            LOG.debug("There is still active consumers.");
+        }
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
         // shutdown thread pool if it was in use
         if (multicastExecutor != null) {
             
getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor);
@@ -400,7 +423,5 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
 
         // clear queue, as we are shutdown, so if re-created then the queue 
must be updated
         queue = null;
-
-        super.doShutdown();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/77e49c6b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersKeepRouteTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersKeepRouteTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersKeepRouteTest.java
new file mode 100644
index 0000000..d4171a1
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersKeepRouteTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+
+public class VmMultipleConsumersKeepRouteTest extends TestCase {
+
+    public void testVmMultipleConsumersKeepRoute() throws Exception {
+        CamelContext camelContext = new DefaultCamelContext();
+        ProducerTemplate producerTemplate = 
camelContext.createProducerTemplate();
+
+        RouteBuilder builder = new RouteBuilder(camelContext) {
+            @Override
+            public void configure() throws Exception {
+                
from("vm:producer?multipleConsumers=true").routeId("route1").to("mock:route1");
+            }
+
+        };
+        RouteBuilder builder2 = new RouteBuilder(camelContext) {
+            @Override
+            public void configure() throws Exception {
+                
from("vm:producer?multipleConsumers=true").routeId("route2").to("mock:route2");
+            }
+        };
+        camelContext.addRoutes(builder);
+        camelContext.addRoutes(builder2);
+
+        camelContext.start();
+
+        MockEndpoint mock1 = (MockEndpoint) 
camelContext.getEndpoint("mock:route1");
+        MockEndpoint mock2 = (MockEndpoint) 
camelContext.getEndpoint("mock:route2");
+        mock1.expectedMessageCount(100);
+        mock2.expectedMessageCount(100);
+
+        for (int i = 0; i < 100; i++) {
+            producerTemplate.sendBody("vm:producer?multipleConsumers=true", i);
+        }
+
+        MockEndpoint.assertIsSatisfied(mock1, mock2);
+
+        camelContext.stop();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/77e49c6b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersRemoteRouteTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersRemoteRouteTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersRemoteRouteTest.java
new file mode 100644
index 0000000..41997fe
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/vm/VmMultipleConsumersRemoteRouteTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vm;
+
+import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+
+public class VmMultipleConsumersRemoteRouteTest extends TestCase {
+
+    public void testVmMultipleConsumersRemoteRoute() throws Exception {
+        CamelContext camelContext = new DefaultCamelContext();
+        ProducerTemplate producerTemplate = 
camelContext.createProducerTemplate();
+
+        RouteBuilder builder = new RouteBuilder(camelContext) {
+            @Override
+            public void configure() throws Exception {
+                
from("vm:producer?multipleConsumers=true").routeId("route1").to("mock:route1");
+            }
+
+        };
+        RouteBuilder builder2 = new RouteBuilder(camelContext) {
+            @Override
+            public void configure() throws Exception {
+                
from("vm:producer?multipleConsumers=true").routeId("route2").to("mock:route2");
+            }
+        };
+        camelContext.addRoutes(builder);
+        camelContext.addRoutes(builder2);
+
+        camelContext.start();
+
+        camelContext.stopRoute("route2");
+        camelContext.removeRoute("route2");
+
+        MockEndpoint mock1 = (MockEndpoint) 
camelContext.getEndpoint("mock:route1");
+        MockEndpoint mock2 = (MockEndpoint) 
camelContext.getEndpoint("mock:route2");
+        mock1.expectedMessageCount(100);
+        mock2.expectedMessageCount(0);
+
+        for (int i = 0; i < 100; i++) {
+            producerTemplate.sendBody("vm:producer?multipleConsumers=true", i);
+        }
+
+        MockEndpoint.assertIsSatisfied(mock1, mock2);
+
+        camelContext.stop();
+    }
+}

Reply via email to