Updated Branches:
  refs/heads/master 6adb14060 -> accd3b54e

CAMEL-6405: Added option purgeWhenStopping to seda/vm component so you can 
decide not to process remaining in memory exchanges if stopping, so you can 
stop fast.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/accd3b54
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/accd3b54
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/accd3b54

Branch: refs/heads/master
Commit: accd3b54e2e8ba41c64d13ec31fde19e5db0105a
Parents: 6adb140
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu May 30 15:05:43 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu May 30 15:05:43 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/seda/SedaConsumer.java  |   11 ++-
 .../apache/camel/component/seda/SedaEndpoint.java  |   11 +++
 .../component/seda/SedaPurgeWhenStoppingTest.java  |   68 +++++++++++++++
 3 files changed, 89 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/accd3b54/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index 2c8f476..1e0bc58 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -102,7 +102,11 @@ public class SedaConsumer extends ServiceSupport 
implements Consumer, Runnable,
     }
 
     public int getPendingExchangesSize() {
-        // number of pending messages on the queue
+        // the route is shutting down, so either we should purge the queue,
+        // or return how many exchanges are still on the queue
+        if (endpoint.isPurgeWhenStopping()) {
+            endpoint.purgeQueue();
+        }
         return endpoint.getQueue().size();
     }
 
@@ -314,6 +318,11 @@ public class SedaConsumer extends ServiceSupport 
implements Consumer, Runnable,
     }
 
     protected void doStop() throws Exception {
+        // ensure queue is purged if we stop the consumer
+        if (endpoint.isPurgeWhenStopping()) {
+            endpoint.purgeQueue();
+        }
+
         endpoint.onStopped(this);
         
         shutdownExecutor();

http://git-wip-us.apache.org/repos/asf/camel/blob/accd3b54/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index 4682b3d..03b6e63 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -74,6 +74,7 @@ public class SedaEndpoint extends DefaultEndpoint implements 
BrowsableEndpoint,
     private volatile boolean multicastStarted;
     private boolean blockWhenFull;
     private int pollTimeout = 1000;
+    private boolean purgeWhenStopping;
 
     public SedaEndpoint() {
     }
@@ -249,6 +250,15 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
         this.pollTimeout = pollTimeout;
     }
 
+    @ManagedAttribute
+    public boolean isPurgeWhenStopping() {
+        return purgeWhenStopping;
+    }
+
+    public void setPurgeWhenStopping(boolean purgeWhenStopping) {
+        this.purgeWhenStopping = purgeWhenStopping;
+    }
+
     public boolean isSingleton() {
         return true;
     }
@@ -270,6 +280,7 @@ public class SedaEndpoint extends DefaultEndpoint 
implements BrowsableEndpoint,
      */
     @ManagedOperation(description = "Purges the seda queue")
     public void purgeQueue() {
+        LOG.debug("Purging queue with {} exchanges", queue.size());
         queue.clear();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/accd3b54/camel-core/src/test/java/org/apache/camel/component/seda/SedaPurgeWhenStoppingTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/component/seda/SedaPurgeWhenStoppingTest.java
 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaPurgeWhenStoppingTest.java
new file mode 100644
index 0000000..36a7f32
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/component/seda/SedaPurgeWhenStoppingTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class SedaPurgeWhenStoppingTest extends ContextTestSupport {
+
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    public void testPurgeWhenStopping() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        for (int i = 0; i < 100; i++) {
+            template.sendBody("seda:foo", "Message " + i);
+        }
+
+        context.startRoute("myRoute");
+        latch.await(5, TimeUnit.SECONDS);
+        context.stopRoute("myRoute");
+
+        mock.setAssertPeriod(2000);
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("seda:foo?purgeWhenStopping=true").routeId("myRoute").noAutoStartup()
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws 
Exception {
+                            latch.countDown();
+                            Thread.sleep(500);
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+}

Reply via email to