BatchProcessor interrupt has side effects
-----------------------------------------

                 Key: CAMEL-1510
                 URL: https://issues.apache.org/activemq/browse/CAMEL-1510
             Project: Apache Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 1.6.0
         Environment: Mac OS X
            Reporter: Christopher Hunt
            Priority: Critical


I have noticed that the BatchProcessor class uses the Thread class interrupt 
method to wake the run loop from sleeping within the enqueueExchange method.

The unfortunate side effect of this is that if the run loop is in the middle of 
processing exchanges, and the processing involves something slow like 
establishing a JMS connection over SSL or queuing to an asynchronous processor, 
then the processing can become interrupted. The consequence of this side effect 
is that the batch sender thread rarely gets the opportunity to complete 
properly and exceptions regarding the interrupt are thrown.

This all became apparent during some performance testing that resulted in 
continuously adding exchanges to the aggregator, the threshold becoming 
reached, and then trying to enqueue the aggregated result to a JMS queue.

If my analysis of the BatchProcessor is correct then I would recommend finer 
grained concurrency controls being used instead of relying upon interrupting a 
thread. Perhaps something like the following (untested) re-write of the sender:

{code}
        private class BatchSender extends Thread {
                private Queue<Exchange> queue;
                private boolean exchangeQueued = false;
                private Lock queueMutex = new ReentrantLock();
                private Condition queueCondition = queueMutex.newCondition();

                public BatchSender() {
                        super("Batch Sender");
                        this.queue = new LinkedList<Exchange>();
                }

                public void cancel() {
                        interrupt();
                }

                private void drainQueueTo(Collection<Exchange> collection, int 
batchSize) {
                        collection.clear();
                        for (int i = 0; i < batchSize; ++i) {
                                Exchange e = queue.poll();
                                if (e != null) {
                                        collection.add(e);
                                } else {
                                        break;
                                }
                        }
                }

                public void enqueueExchange(Exchange exchange) {
                        queueMutex.lock();
                        try {
                                queue.add(exchange);
                                exchangeQueued = true;
                        } finally {
                                queueMutex.unlock();
                        }
                }

                @Override
                public void run() {
                        queueMutex.lock();
                        try {
                                do {
                                        try {
                                                if (!exchangeQueued) {
                                                        
queueCondition.await(batchTimeout,
                                                                        
TimeUnit.MILLISECONDS);
                                                        if (!exchangeQueued) {
                                                                
drainQueueTo(collection, batchSize);
                                                        }
                                                }

                                                if (exchangeQueued) {
                                                        exchangeQueued = false;
                                                        queueMutex.unlock();
                                                        try {
                                                                while 
(isInBatchCompleted(queue.size())) {
                                                                        
queueMutex.lock();
                                                                        try {
                                                                                
drainQueueTo(collection, batchSize);
                                                                        } 
finally {
                                                                                
queueMutex.unlock();
                                                                        }
                                                                }

                                                                if 
(!isOutBatchCompleted()) {
                                                                        
continue;
                                                                }
                                                        } finally {
                                                                
queueMutex.lock();
                                                        }

                                                }

                                                queueMutex.unlock();
                                                try {
                                                        try {
                                                                sendExchanges();
                                                        } catch (Exception e) {
                                                                
getExceptionHandler().handleException(e);
                                                        }
                                                } finally {
                                                        queueMutex.lock();
                                                }
                                        } catch (InterruptedException e) {
                                                break;
                                        }
                                } while (true);
                        } finally {
                                queueMutex.unlock();
                        }
                }

                private void sendExchanges() throws Exception {
                        Iterator<Exchange> iter = collection.iterator();
                        while (iter.hasNext()) {
                                Exchange exchange = iter.next();
                                iter.remove();
                                processExchange(exchange);
                        }
                }
        }
{code}

I have replaced the concurrent queue with a regular linked list and mutexed its 
access. In addition any queuing of exchanges is noted. This should result in 
less locking.

The main change though is that queuing an exchange does not interrupt the batch 
sender's current activity.

I hope that this sample is useful.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to