[
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51160#action_51160
]
Christopher Hunt commented on CAMEL-1510:
-----------------------------------------
Hi Martin,
I'm still leaning to the patch that I provided... I think that it closely
resembles the code that is already there which is essentially flawed only in
the sense that it interrupts when adding an exchange.
A couple of observations with your changes:
- cancel still needs to interrupt - you really want to interrupt with
cancellations.
- cancelRequested is not being protected within the cancel method.
- cancelRequested wouldn't be required if cancel interrupts.
- drainQueueTo will need to protect the queue also.
... my code did do all of the above.
Kind regards,
Christopher
> BatchProcessor interrupt has side effects
> -----------------------------------------
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
> Reporter: Christopher Hunt
> Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle
> of processing exchanges, and the processing involves something slow like
> establishing a JMS connection over SSL or queuing to an asynchronous
> processor, then the processing can become interrupted. The consequence of
> this side effect is that the batch sender thread rarely gets the opportunity
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in
> continuously adding exchanges to the aggregator, the threshold becoming
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer
> grained concurrency controls being used instead of relying upon interrupting
> a thread. Perhaps something like the following (untested) re-write of the
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue<Exchange> queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList<Exchange>();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection<Exchange> collection, int
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
> }
> } catch (InterruptedException e) {
> break;
> }
> } while (true);
> } finally {
> queueMutex.unlock();
> }
> }
> private void sendExchanges() throws Exception {
> Iterator<Exchange> iter = collection.iterator();
> while (iter.hasNext()) {
> Exchange exchange = iter.next();
> iter.remove();
> processExchange(exchange);
> }
> }
> }
> {code}
> I have replaced the concurrent queue with a regular linked list and mutexed
> its access. In addition any queuing of exchanges is noted. This should result
> in less locking.
> The main change though is that queuing an exchange does not interrupt the
> batch sender's current activity.
> I hope that this sample is useful.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.