Author: davsclaus
Date: Mon Nov 22 10:37:14 2010
New Revision: 1037666

URL: http://svn.apache.org/viewvc?rev=1037666&view=rev
Log:
CAMEL-3348: Fixed issue with seda consumer in rare condition may lose exchange 
during graceful shutdown on high loaded systems.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=1037666&r1=1037665&r2=1037666&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
 Mon Nov 22 10:37:14 2010
@@ -16,11 +16,10 @@
  */
 package org.apache.camel.component.seda;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -36,7 +35,6 @@ import org.apache.camel.processor.Multic
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -48,6 +46,10 @@ import org.apache.commons.logging.LogFac
 public class SedaConsumer extends ServiceSupport implements Consumer, 
Runnable, ShutdownAware {
     private static final transient Log LOG = 
LogFactory.getLog(SedaConsumer.class);
 
+    // use a task counter to help ensure we can graceful shutdown the seda 
consumers without
+    // causing any exchanges to be lost due a tiny loophole between the 
exchange is polled
+    // and when its registered as in flight exchange
+    private final AtomicInteger tasks = new AtomicInteger();
     private SedaEndpoint endpoint;
     private AsyncProcessor processor;
     private ExecutorService executor;
@@ -90,24 +92,35 @@ public class SedaConsumer extends Servic
 
     public int getPendingExchangesSize() {
         // number of pending messages on the queue
-        return endpoint.getQueue().size();
+        int answer = endpoint.getQueue().size();
+        if (answer == 0) {
+            // if there are no pending exchanges we at first must ensure that
+            // all tasks has been completed and the thread is stopped, to avoid
+            // any condition which otherwise would cause an exchange to be lost
+
+            // we think there are 0 pending exchanges but we are only 100% sure
+            // when all the running tasks has been shutdown, so they do not
+            // somehow have polled an Exchange which we otherwise may loose
+            // due the Exchange takes a little while before its enlisted in the
+            // in flight registry (to let Camel know there is an Exchange in 
progress)
+            answer = tasks.get();
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Pending exchanges " + answer);
+        }
+        return answer;
     }
 
     public void run() {
         BlockingQueue<Exchange> queue = endpoint.getQueue();
         while (queue != null && isRunAllowed()) {
-            final Exchange exchange;
+            Exchange exchange = null;
             try {
                 exchange = queue.poll(1000, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Sleep interrupted, are we stopping? " + 
(isStopping() || isStopped()));
-                }
-                continue;
-            }
-            if (exchange != null) {
-                if (isRunAllowed()) {
+                if (exchange != null) {
                     try {
+                        tasks.incrementAndGet();
                         sendToConsumers(exchange);
 
                         // log exception if an exception occurred and was not 
handled
@@ -116,22 +129,27 @@ public class SedaConsumer extends Servic
                         }
                     } catch (Exception e) {
                         getExceptionHandler().handleException("Error 
processing exchange", exchange, e);
+                    } finally {
+                        tasks.decrementAndGet();
                     }
+                }
+            } catch (InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Sleep interrupted, are we stopping? " + 
(isStopping() || isStopped()));
+                }
+                continue;
+            } catch (Throwable e) {
+                if (exchange != null) {
+                    getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
                 } else {
-                    if (LOG.isWarnEnabled()) {
-                        LOG.warn("This consumer is stopped during polling an 
exchange, so putting it back on the seda queue: " + exchange);
-                    }
-                    try {
-                        queue.put(exchange);
-                    } catch (InterruptedException e) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Sleep interrupted, are we stopping? " + 
(isStopping() || isStopped()));
-                        }
-                        continue;
-                    }
+                    getExceptionHandler().handleException(e);
                 }
             }
         }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Ending this polling consumer thread, there are still " 
+ tasks.get() + " threads left.");
+        }
     }
 
     /**
@@ -175,6 +193,8 @@ public class SedaConsumer extends Servic
     }
 
     protected void doStart() throws Exception {
+        tasks.set(0);
+
         int poolSize = endpoint.getConcurrentConsumers();
         executor = endpoint.getCamelContext().getExecutorServiceStrategy()
                         .newFixedThreadPool(this, endpoint.getEndpointUri(), 
poolSize);
@@ -187,7 +207,9 @@ public class SedaConsumer extends Servic
     protected void doStop() throws Exception {
         endpoint.onStopped(this);
         // must shutdown executor on stop to avoid overhead of having them 
running
-        
endpoint.getCamelContext().getExecutorServiceStrategy().shutdown(executor);
+        // use shutdown now to force the tasks which are polling for new 
exchanges
+        // to stop immediately to avoid them picking up new exchanges arriving 
in the mean time
+        
endpoint.getCamelContext().getExecutorServiceStrategy().shutdownNow(executor);
         executor = null;
     }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java?rev=1037666&r1=1037665&r2=1037666&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ShutdownDeferTest.java
 Mon Nov 22 10:37:14 2010
@@ -50,8 +50,8 @@ public class ShutdownDeferTest extends C
 
         context.stop();
 
-        // should route about 4 - 5 (in some rare cases it will only route 4)
-        assertTrue("Should complete all messages, was " + 
bar.getReceivedCounter(), bar.getReceivedCounter() >= 4);
+        // should route all 5
+        assertEquals(5, bar.getReceivedCounter());
     }
 
     @Override


Reply via email to