[
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51159#action_51159
]
Martin Krasser commented on CAMEL-1510:
---------------------------------------
Hi Christopher,
agreed that it's a bit more elegant to use the locking mechanism from
{{java.util.concurrent.locks}} when using Java 5 or higher :) I just tried to
solve the problem using {{ReentrantLock}} and {{Condition}} too but instead of
using an {{exchangeQueued}} variable I let the {{enqueueExchange()}} and the
{{cancel()}} methods to _signal_ the batch sender to resume processing. I
tested the following code with the {{AggregatorTest}} unit tests.
{code:java}
private class BatchSender extends Thread {
private volatile boolean cancelRequested;
private Queue<Exchange> queue;
private Lock queueMutex = new ReentrantLock();
private Condition queueCondition = queueMutex.newCondition();
public BatchSender() {
super("Batch Sender");
this.queue = new LinkedList<Exchange>();
}
@Override
public void run() {
while (true) {
queueMutex.lock();
try {
boolean signalled = queueCondition.await(batchTimeout,
TimeUnit.MILLISECONDS);
processEnqueuedExchanges(signalled);
} catch (InterruptedException e) {
break;
} catch (Exception e) {
// TODO: handle exception ...
e.printStackTrace();
} finally {
queueMutex.unlock();
}
}
}
public void cancel() {
cancelRequested = true;
queueMutex.lock();
try {
queueCondition.signal();
} finally {
queueMutex.unlock();
}
}
public void enqueueExchange(Exchange exchange) {
queue.add(exchange);
queueMutex.lock();
try {
if (isInBatchCompleted(queue.size())) {
queueCondition.signal();
}
} finally {
queueMutex.unlock();
}
}
private void processEnqueuedExchanges(boolean signalled) throws
Exception {
if (!signalled) {
drainQueueTo(collection, batchSize);
} else {
if (cancelRequested) {
return;
}
while (isInBatchCompleted(queue.size())) {
drainQueueTo(collection, batchSize);
}
if (!isOutBatchCompleted()) {
return;
}
}
try {
sendExchanges();
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
}
private void sendExchanges() throws Exception {
...
}
private void drainQueueTo(Collection<Exchange> collection, int
batchSize) {
...
}
}
{code}
Does this make sense to you?
BTW similar changes should also be applied to the stream resequencer. Let's
close this issue only when both the {{BatchProcessor}} and
{{StreamResequencer}} are fixed.
> BatchProcessor interrupt has side effects
> -----------------------------------------
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
> Reporter: Christopher Hunt
> Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle
> of processing exchanges, and the processing involves something slow like
> establishing a JMS connection over SSL or queuing to an asynchronous
> processor, then the processing can become interrupted. The consequence of
> this side effect is that the batch sender thread rarely gets the opportunity
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in
> continuously adding exchanges to the aggregator, the threshold becoming
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer
> grained concurrency controls being used instead of relying upon interrupting
> a thread. Perhaps something like the following (untested) re-write of the
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue<Exchange> queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList<Exchange>();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection<Exchange> collection, int
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
> }
> } catch (InterruptedException e) {
> break;
> }
> } while (true);
> } finally {
> queueMutex.unlock();
> }
> }
> private void sendExchanges() throws Exception {
> Iterator<Exchange> iter = collection.iterator();
> while (iter.hasNext()) {
> Exchange exchange = iter.next();
> iter.remove();
> processExchange(exchange);
> }
> }
> }
> {code}
> I have replaced the concurrent queue with a regular linked list and mutexed
> its access. In addition any queuing of exchanges is noted. This should result
> in less locking.
> The main change though is that queuing an exchange does not interrupt the
> batch sender's current activity.
> I hope that this sample is useful.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.