Author: wtam
Date: Thu Apr 16 17:13:15 2009
New Revision: 765689

URL: http://svn.apache.org/viewvc?rev=765689&view=rev
Log:
Merged revisions 765686 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r765686 | wtam | 2009-04-16 12:56:08 -0400 (Thu, 16 Apr 2009) | 1 line
  
  [CAMEL-1510] BatchProcessor interrupt has side effects (submitted on behalf 
of Christopher Hunt)
........

Modified:
    camel/branches/camel-1.x/   (props changed)
    
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Apr 16 17:13:15 2009
@@ -1 +1 @@
-/camel/trunk:736980,739733,739904,740251,740295,740306,740596,740663,741848,742231,742705,742739,742854,742856,742898,742906,743613,743762,743773,743920,743959-743960,744123,745105,745367,745541,745751,745826,745978,746269,746872,746895,746962,747258,747678-747704,748392,748436,748821,749563-749564,749574,749628-749629,749936,749956,750017,750334,750396,750761,750796,752068,752117,752418,752751-752755,752764-752773,752956,753087,753101,753175,755136,755487,756313,756348,756870,756939,757636,757693,757743,757865,758539,758563,758600,758617,758692,758990,759362,759453,759887,759931,760003,760890,760909,760937,761194,761536,761583,761607,762047,762633,762650,762935,763095,763484,763551,765154
+/camel/trunk:736980,739733,739904,740251,740295,740306,740596,740663,741848,742231,742705,742739,742854,742856,742898,742906,743613,743762,743773,743920,743959-743960,744123,745105,745367,745541,745751,745826,745978,746269,746872,746895,746962,747258,747678-747704,748392,748436,748821,749563-749564,749574,749628-749629,749936,749956,750017,750334,750396,750761,750796,752068,752117,752418,752751-752755,752764-752773,752956,753087,753101,753175,755136,755487,756313,756348,756870,756939,757636,757693,757743,757865,758539,758563,758600,758617,758692,758990,759362,759453,759887,759931,760003,760890,760909,760937,761194,761536,761583,761607,762047,762633,762650,762935,763095,763484,763551,765154,765686

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=765689&r1=765688&r2=765689&view=diff
==============================================================================
--- 
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
 (original)
+++ 
camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
 Thu Apr 16 17:13:15 2009
@@ -18,7 +18,12 @@
 
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -28,13 +33,12 @@
 import org.apache.camel.util.ServiceHelper;
 
 /**
- * A base class for any kind of {...@link Processor} which implements some 
kind of
- * batch processing.
+ * A base class for any kind of {...@link Processor} which implements some 
kind of batch processing.
  * 
  * @version $Revision$
  */
 public class BatchProcessor extends ServiceSupport implements Processor {
-    
+
     public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
     public static final int DEFAULT_BATCH_SIZE = 100;
 
@@ -77,9 +81,9 @@
     }
 
     /**
-     * Sets the <b>in</b> batch size. This is the number of incoming exchanges 
that this batch processor
-     * will process before its completed. The default value is {...@link 
#DEFAULT_BATCH_SIZE}.
-     *
+     * Sets the <b>in</b> batch size. This is the number of incoming exchanges 
that this batch processor will
+     * process before its completed. The default value is {...@link 
#DEFAULT_BATCH_SIZE}.
+     * 
      * @param batchSize the size
      */
     public void setBatchSize(int batchSize) {
@@ -91,10 +95,10 @@
     }
 
     /**
-     * Sets the <b>out</b> batch size. If the batch processor holds more 
exchanges than this out size then
-     * the completion is triggered. Can for instance be used to ensure that 
this batch is completed when
-     * a certain number of exchanges has been collected. By default this 
feature is <b>not</b> enabled.
-     *
+     * Sets the <b>out</b> batch size. If the batch processor holds more 
exchanges than this out size then the
+     * completion is triggered. Can for instance be used to ensure that this 
batch is completed when a certain
+     * number of exchanges has been collected. By default this feature is 
<b>not</b> enabled.
+     * 
      * @param outBatchSize the size
      */
     public void setOutBatchSize(int outBatchSize) {
@@ -114,16 +118,16 @@
     }
 
     /**
-     * A strategy method to decide if the "in" batch is completed.  That is, 
whether the resulting 
-     * exchanges in the in queue should be drained to the "out" collection.
+     * A strategy method to decide if the "in" batch is completed. That is, 
whether the resulting exchanges in
+     * the in queue should be drained to the "out" collection.
      */
     protected boolean isInBatchCompleted(int num) {
         return num >= batchSize;
     }
-    
+
     /**
-     * A strategy method to decide if the "out" batch is completed. That is, 
whether the resulting 
-     * exchange in the out collection should be sent.
+     * A strategy method to decide if the "out" batch is completed. That is, 
whether the resulting exchange in
+     * the out collection should be sent.
      */
     protected boolean isOutBatchCompleted() {
         if (outBatchSize == 0) {
@@ -134,9 +138,8 @@
     }
 
     /**
-     * Strategy Method to process an exchange in the batch. This method allows
-     * derived classes to perform custom processing before or after an
-     * individual exchange is processed
+     * Strategy Method to process an exchange in the batch. This method allows 
derived classes to perform
+     * custom processing before or after an individual exchange is processed
      */
     protected void processExchange(Exchange exchange) throws Exception {
         processor.process(exchange);
@@ -168,53 +171,119 @@
      * Sender thread for queued-up exchanges.
      */
     private class BatchSender extends Thread {
-        
-        private volatile boolean cancelRequested;
 
-        private LinkedBlockingQueue<Exchange> queue;
-        
+        private Queue<Exchange> queue;
+        private Lock queueLock = new ReentrantLock();
+        private boolean exchangeEnqueued;
+        private Condition exchangeEnqueuedCondition = queueLock.newCondition();
+
         public BatchSender() {
             super("Batch Sender");
-            this.queue = new LinkedBlockingQueue<Exchange>();
+            this.queue = new LinkedList<Exchange>();
         }
 
         @Override
         public void run() {
-            while (true) {
-                try {
-                    Thread.sleep(batchTimeout);
-                    queue.drainTo(collection, batchSize);  
-                } catch (InterruptedException e) {
-                    if (cancelRequested) {
-                        return;
-                    }
-                    
-                    while (isInBatchCompleted(queue.size())) {
-                        queue.drainTo(collection, batchSize);  
-                    }
-                    
-                    if (!isOutBatchCompleted()) {
-                        continue;
+            // Wait until one of either:
+            // * an exchange being queued;
+            // * the batch timeout expiring; or
+            // * the thread being cancelled.
+            //
+            // If an exchange is queued then we need to determine whether the
+            // batch is complete. If it is complete then we send out the 
batched
+            // exchanges. Otherwise we move back into our wait state.
+            //
+            // If the batch times out then we send out the batched exchanges
+            // collected so far.
+            //
+            // If we receive an interrupt then all blocking operations are
+            // interrupted and our thread terminates.
+            //
+            // The goal of the following algorithm in terms of synchronisation
+            // is to provide fine grained locking i.e. retaining the lock only
+            // when required. Special consideration is given to releasing the
+            // lock when calling an overloaded method such as 
isInBatchComplete,
+            // isOutBatchComplete and around sendExchanges. The latter is
+            // especially important as the process of sending out the exchanges
+            // would otherwise block new exchanges from being queued.
+
+            queueLock.lock();
+            try {
+                do {
+                    try {
+                        if (!exchangeEnqueued) {
+                            exchangeEnqueuedCondition.await(batchTimeout, 
TimeUnit.MILLISECONDS);
+                        }
+
+                        if (!exchangeEnqueued) {
+                            drainQueueTo(collection, batchSize);
+                        } else {             
+                            exchangeEnqueued = false;
+                            while (isInBatchCompleted(queue.size())) {   
+                                drainQueueTo(collection, batchSize);
+                            }
+                            
+                            queueLock.unlock();
+                            try {
+                                if (!isOutBatchCompleted()) {
+                                    continue;
+                                }
+                            } finally {
+                                queueLock.lock();
+                            }
+                        }
+
+                        queueLock.unlock();
+                        try {
+                            try {
+                                sendExchanges();
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException(e);
+                            }
+                        } finally {
+                            queueLock.lock();
+                        }
+
+                    } catch (InterruptedException e) {
+                        break;
                     }
-                }
-                try {
-                    sendExchanges();
-                } catch (Exception e) {
-                    getExceptionHandler().handleException(e);
+
+                } while (true);
+
+            } finally {
+                queueLock.unlock();
+            }
+        }
+
+        /**
+         * This method should be called with queueLock held
+         */
+        private void drainQueueTo(Collection<Exchange> collection, int 
batchSize) {
+            for (int i = 0; i < batchSize; ++i) {
+                Exchange e = queue.poll();
+                if (e != null) {
+                    collection.add(e);
+                } else {
+                    break;
                 }
             }
         }
-        
+
         public void cancel() {
-            cancelRequested = true;
             interrupt();
         }
-     
+
         public void enqueueExchange(Exchange exchange) {
-            queue.add(exchange);
-            interrupt();
+            queueLock.lock();
+            try {
+                queue.add(exchange);
+                exchangeEnqueued = true;
+                exchangeEnqueuedCondition.signal();
+            } finally {
+                queueLock.unlock();
+            }
         }
-        
+
         private void sendExchanges() throws Exception {
             Iterator<Exchange> iter = collection.iterator();
             while (iter.hasNext()) {
@@ -224,5 +293,5 @@
             }
         }
     }
-    
+
 }


Reply via email to