Author: wtam Date: Thu Apr 16 17:13:15 2009 New Revision: 765689 URL: http://svn.apache.org/viewvc?rev=765689&view=rev Log: Merged revisions 765686 via svnmerge from https://svn.apache.org/repos/asf/camel/trunk
........ r765686 | wtam | 2009-04-16 12:56:08 -0400 (Thu, 16 Apr 2009) | 1 line [CAMEL-1510] BatchProcessor interrupt has side effects (submitted on behalf of Christopher Hunt) ........ Modified: camel/branches/camel-1.x/ (props changed) camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Propchange: camel/branches/camel-1.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Apr 16 17:13:15 2009 @@ -1 +1 @@ -/camel/trunk:736980,739733,739904,740251,740295,740306,740596,740663,741848,742231,742705,742739,742854,742856,742898,742906,743613,743762,743773,743920,743959-743960,744123,745105,745367,745541,745751,745826,745978,746269,746872,746895,746962,747258,747678-747704,748392,748436,748821,749563-749564,749574,749628-749629,749936,749956,750017,750334,750396,750761,750796,752068,752117,752418,752751-752755,752764-752773,752956,753087,753101,753175,755136,755487,756313,756348,756870,756939,757636,757693,757743,757865,758539,758563,758600,758617,758692,758990,759362,759453,759887,759931,760003,760890,760909,760937,761194,761536,761583,761607,762047,762633,762650,762935,763095,763484,763551,765154 +/camel/trunk:736980,739733,739904,740251,740295,740306,740596,740663,741848,742231,742705,742739,742854,742856,742898,742906,743613,743762,743773,743920,743959-743960,744123,745105,745367,745541,745751,745826,745978,746269,746872,746895,746962,747258,747678-747704,748392,748436,748821,749563-749564,749574,749628-749629,749936,749956,750017,750334,750396,750761,750796,752068,752117,752418,752751-752755,752764-752773,752956,753087,753101,753175,755136,755487,756313,756348,756870,756939,757636,757693,757743,757865,758539,758563,758600,758617,758692,758990,759362,759453,759887,759931,760003,760890,760909,760937,761194,761536,761583,761607,762047,762633,762650,762935,763095,763484,763551,765154,765686 Propchange: camel/branches/camel-1.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=765689&r1=765688&r2=765689&view=diff ============================================================================== --- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original) +++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Thu Apr 16 17:13:15 2009 @@ -18,7 +18,12 @@ import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -28,13 +33,12 @@ import org.apache.camel.util.ServiceHelper; /** - * A base class for any kind of {...@link Processor} which implements some kind of - * batch processing. + * A base class for any kind of {...@link Processor} which implements some kind of batch processing. * * @version $Revision$ */ public class BatchProcessor extends ServiceSupport implements Processor { - + public static final long DEFAULT_BATCH_TIMEOUT = 1000L; public static final int DEFAULT_BATCH_SIZE = 100; @@ -77,9 +81,9 @@ } /** - * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor - * will process before its completed. The default value is {...@link #DEFAULT_BATCH_SIZE}. - * + * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will + * process before its completed. The default value is {...@link #DEFAULT_BATCH_SIZE}. + * * @param batchSize the size */ public void setBatchSize(int batchSize) { @@ -91,10 +95,10 @@ } /** - * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then - * the completion is triggered. Can for instance be used to ensure that this batch is completed when - * a certain number of exchanges has been collected. By default this feature is <b>not</b> enabled. - * + * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the + * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain + * number of exchanges has been collected. By default this feature is <b>not</b> enabled. + * * @param outBatchSize the size */ public void setOutBatchSize(int outBatchSize) { @@ -114,16 +118,16 @@ } /** - * A strategy method to decide if the "in" batch is completed. That is, whether the resulting - * exchanges in the in queue should be drained to the "out" collection. + * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in + * the in queue should be drained to the "out" collection. */ protected boolean isInBatchCompleted(int num) { return num >= batchSize; } - + /** - * A strategy method to decide if the "out" batch is completed. That is, whether the resulting - * exchange in the out collection should be sent. + * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in + * the out collection should be sent. */ protected boolean isOutBatchCompleted() { if (outBatchSize == 0) { @@ -134,9 +138,8 @@ } /** - * Strategy Method to process an exchange in the batch. This method allows - * derived classes to perform custom processing before or after an - * individual exchange is processed + * Strategy Method to process an exchange in the batch. This method allows derived classes to perform + * custom processing before or after an individual exchange is processed */ protected void processExchange(Exchange exchange) throws Exception { processor.process(exchange); @@ -168,53 +171,119 @@ * Sender thread for queued-up exchanges. */ private class BatchSender extends Thread { - - private volatile boolean cancelRequested; - private LinkedBlockingQueue<Exchange> queue; - + private Queue<Exchange> queue; + private Lock queueLock = new ReentrantLock(); + private boolean exchangeEnqueued; + private Condition exchangeEnqueuedCondition = queueLock.newCondition(); + public BatchSender() { super("Batch Sender"); - this.queue = new LinkedBlockingQueue<Exchange>(); + this.queue = new LinkedList<Exchange>(); } @Override public void run() { - while (true) { - try { - Thread.sleep(batchTimeout); - queue.drainTo(collection, batchSize); - } catch (InterruptedException e) { - if (cancelRequested) { - return; - } - - while (isInBatchCompleted(queue.size())) { - queue.drainTo(collection, batchSize); - } - - if (!isOutBatchCompleted()) { - continue; + // Wait until one of either: + // * an exchange being queued; + // * the batch timeout expiring; or + // * the thread being cancelled. + // + // If an exchange is queued then we need to determine whether the + // batch is complete. If it is complete then we send out the batched + // exchanges. Otherwise we move back into our wait state. + // + // If the batch times out then we send out the batched exchanges + // collected so far. + // + // If we receive an interrupt then all blocking operations are + // interrupted and our thread terminates. + // + // The goal of the following algorithm in terms of synchronisation + // is to provide fine grained locking i.e. retaining the lock only + // when required. Special consideration is given to releasing the + // lock when calling an overloaded method such as isInBatchComplete, + // isOutBatchComplete and around sendExchanges. The latter is + // especially important as the process of sending out the exchanges + // would otherwise block new exchanges from being queued. + + queueLock.lock(); + try { + do { + try { + if (!exchangeEnqueued) { + exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS); + } + + if (!exchangeEnqueued) { + drainQueueTo(collection, batchSize); + } else { + exchangeEnqueued = false; + while (isInBatchCompleted(queue.size())) { + drainQueueTo(collection, batchSize); + } + + queueLock.unlock(); + try { + if (!isOutBatchCompleted()) { + continue; + } + } finally { + queueLock.lock(); + } + } + + queueLock.unlock(); + try { + try { + sendExchanges(); + } catch (Exception e) { + getExceptionHandler().handleException(e); + } + } finally { + queueLock.lock(); + } + + } catch (InterruptedException e) { + break; } - } - try { - sendExchanges(); - } catch (Exception e) { - getExceptionHandler().handleException(e); + + } while (true); + + } finally { + queueLock.unlock(); + } + } + + /** + * This method should be called with queueLock held + */ + private void drainQueueTo(Collection<Exchange> collection, int batchSize) { + for (int i = 0; i < batchSize; ++i) { + Exchange e = queue.poll(); + if (e != null) { + collection.add(e); + } else { + break; } } } - + public void cancel() { - cancelRequested = true; interrupt(); } - + public void enqueueExchange(Exchange exchange) { - queue.add(exchange); - interrupt(); + queueLock.lock(); + try { + queue.add(exchange); + exchangeEnqueued = true; + exchangeEnqueuedCondition.signal(); + } finally { + queueLock.unlock(); + } } - + private void sendExchanges() throws Exception { Iterator<Exchange> iter = collection.iterator(); while (iter.hasNext()) { @@ -224,5 +293,5 @@ } } } - + }