[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51281#action_51281 ] Martin Krasser commented on CAMEL-1510: --- Hi Christopher, Spurious wakeups are ok for the stream resequencer. If they occur the ResequencerEngine takes care that only elements are delivered if they really timed-out. We can leave the code as is. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Assignee: William Tam >Priority: Critical > Fix For: 2.0.0, 1.6.1 > > Attachments: BatchProcessor-lockmin.java.20.diff, > BatchProcessor.java.20.diff, camel-core-1.x.patch, camel-core-2.x.patch > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51273#action_51273 ] Christopher Hunt commented on CAMEL-1510: - Hi Martin, I just had a look at your changes to StreamResequencer and wonder if you need to signal that a request has been delivered using a boolean as well as relying on the condition variable. Perhaps you do not need to discriminate between a timeout and whether a request is made, but I thought you should know that a condition variable can wake up spuriously on some platforms i.e. non-timeout and non-signal. >From the Java 5 javadoc: _When waiting upon a Condition, a "spurious wakeup" is permitted to occur, in general, as a concession to the underlying platform semantics. This has little practical impact on most application programs as a Condition should always be waited upon in a loop, testing the state predicate that is being waited for. An implementation is free to remove the possibility of spurious wakeups but it is recommended that applications programmers always assume that they can occur and so always wait in a loop._ 'hope that this is useful to you. Kind regards, Christopher > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Assignee: William Tam >Priority: Critical > Fix For: 2.0.0, 1.6.1 > > Attachments: BatchProcessor-lockmin.java.20.diff, > BatchProcessor.java.20.diff, camel-core-1.x.patch, camel-core-2.x.patch > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.l
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51242#action_51242 ] William Tam commented on CAMEL-1510: Submitted Christopher's second patch (rr765824 and r765825). Thanks! > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Assignee: William Tam >Priority: Critical > Fix For: 2.0.0, 1.6.1 > > Attachments: BatchProcessor-lockmin.java.20.diff, > BatchProcessor.java.20.diff, camel-core-1.x.patch, camel-core-2.x.patch > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51240#action_51240 ] Christopher Hunt commented on CAMEL-1510: - Hi William, Thanks for submitting my patch to BatchProcessor. Thank you also for reviewing the code and noticing that queue.size() wasn't protected. I obtained your version of BatchProcessor from the trunk and further noticed that the call to isOutBatchCompleted can be safely performed while retaining the queue lock. I was under the original impression (through not looking) that isInBatchCompleted and isOutBatchCompleted were overload-able. Since they are private then this can not be the case and thus can be invoked while retaining the queue lock. The code is nicely simplified by removing the locking around these calls. I have attached a minor patch reflecting the above after having performed the camel-core test cases successfully again. The patch is for the 2.0 source. Kind regards, Christopher > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Assignee: William Tam >Priority: Critical > Fix For: 2.0.0, 1.6.1 > > Attachments: BatchProcessor-lockmin.java.20.diff, > BatchProcessor.java.20.diff, camel-core-1.x.patch, camel-core-2.x.patch > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51238#action_51238 ] William Tam commented on CAMEL-1510: Submitted Martin's patch to trunk (r765729) and 1.x (r765731); > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Assignee: William Tam >Priority: Critical > Attachments: BatchProcessor.java.20.diff, camel-core-1.x.patch, > camel-core-2.x.patch > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51236#action_51236 ] William Tam commented on CAMEL-1510: To answer Christopher's question: "Just wondering... the batch processor's collection should always be a thread-safe type of collection. Is this the case in practice? If the collection is not thread safe then the batch sender run method will contend with the processor's isOutBatchCompleted(), doStop() and getCollection() methods." The batch processor's collection is exclusively accessed by the BatchSender thread (with the exception accessed by the doStop() method), so the collection does not need to be a thread-safe type. The doStop() method is called during shutdown and it interrupts the BatchSender thread before clear() on the collection, so it should be fine. The getCollection() is a protected method and it never gets called. We probably should get rid of getCollection() and make In/OutBatchCompleted() method private. Thoughts? > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Assignee: William Tam >Priority: Critical > Attachments: BatchProcessor.java.20.diff > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51235#action_51235 ] William Tam commented on CAMEL-1510: Christopher's patch has been submitted to trunk (r765686) and 1.x (r765689). Martin, I will submit your fix to StreamResequencer whenever you are ready. Thanks! > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Assignee: William Tam >Priority: Critical > Attachments: BatchProcessor.java.20.diff > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } fina
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51234#action_51234 ] William Tam commented on CAMEL-1510: The run() method body would look like this. Do you see any problem? {code] queueLock.lock(); try { do { try { if (!exchangeEnqueued) { exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS); } // Wake up either because timeout or a messages have been enqueue while (isInBatchCompleted(queue.size())) { drainQueueTo(collection, batchSize); } if (exchangeEnqueued) { exchangeEnqueued = false; } queueLock.unlock(); try { if (!isOutBatchCompleted()) { continue; } try { sendExchanges(); } catch (Exception e) { getExceptionHandler().handleException(e); } } finally { queueLock.lock(); } } catch (InterruptedException e) { break; } } while (true); } finally { queueLock.unlock(); } {code} > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Assignee: William Tam >Priority: Critical > Attachments: BatchProcessor.java.20.diff > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(coll
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51230#action_51230 ] William Tam commented on CAMEL-1510: Thanks guys. Christopher, I have a few questions/comments in the patch. 1. Please use spaces instead of tabs for indentation. You can actually run style checker by "mvn -Psourcecheck". 2. In the run() method, I am not sure if it is 100% safe to unlock before calling queue.size(). {code} queueLock.unlock(); try { while (isInBatchCompleted(queue.size())) { queueLock.lock(); try { drainQueueTo(collection, batchSize); } finally { queueLock.unlock(); } } if (!isOutBatchCompleted()) { continue; } } finally { queueLock.lock(); } {code} 3. In the run() method, the first call "drainQueueTo(...)" probably should put a while loop around it (like the second call) since we don't know how many (could be 3X the InBatchSize for example) messages are on the queue. We could move the while loop in the drainQueueTo() method. {code} if (!exchangeEnqueued) { drainQueueTo(collection, batchSize); {code} > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > Attachments: BatchProcessor.java.20.diff > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQ
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51224#action_51224 ] Martin Krasser commented on CAMEL-1510: --- I'll provide a patch for the StreamResequencer within the next days. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > Attachments: BatchProcessor.java.20.diff > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } > } catch (InterruptedEx
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51223#action_51223 ] Christopher Hunt commented on CAMEL-1510: - I have now ran all of the camel-core tests and things appear fine: {code} cd Development/Eclipse/camel-workspace/camel-trunk/ cd camel-core/ mvn test [INFO] Scanning for projects... [INFO] [INFO] Building Camel :: Core [INFO]task-segment: [test] [INFO] [INFO] [resources:resources] [INFO] Using default encoding to copy filtered resources. [INFO] [compiler:compile] [INFO] Compiling 1 source file to /Volumes/Users HD/Users/huntc/Development/Eclipse/camel-workspace/camel-trunk/camel-core/target/classes [INFO] [resources:testResources] [INFO] Using default encoding to copy filtered resources. [INFO] [compiler:testCompile] [INFO] Nothing to compile - all classes are up to date [INFO] [surefire:test] [INFO] Surefire report directory: /Volumes/Users HD/Users/huntc/Development/Eclipse/camel-workspace/camel-trunk/camel-core/target/surefire-reports --- T E S T S --- Running org.apache.camel.management.JmxInstrumentationWithConnectorTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 7.151 sec Running org.apache.camel.issues.InterceptorLogTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.821 sec Running org.apache.camel.processor.RemoveHeaderTest ... Running org.apache.camel.converter.ConverterTest Tests run: 15, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.703 sec <<< FAILURE! ... Running org.apache.camel.util.jndi.JndiTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.175 sec Running org.apache.camel.component.bean.CustomParameterMappingStrategyTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.799 sec Results : Failed tests: Tests run: 941, Failures: 1, Errors: 0, Skipped: 0 [INFO] [ERROR] BUILD FAILURE [INFO] {code} ConverterTest fails because I am running on Mac OS X and have a path that includes a space: {code} ConverterTest org.apache.camel.converter.ConverterTest testFileToString(org.apache.camel.converter.ConverterTest) junit.framework.AssertionFailedError: Should have returned a String! at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.assertTrue(Assert.java:20) at junit.framework.Assert.assertNotNull(Assert.java:217) at org.apache.camel.converter.ConverterTest.testFileToString(ConverterTest.java:166) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:585) at junit.framework.TestCase.runTest(TestCase.java:164) at junit.framework.TestCase.runBare(TestCase.java:130) at junit.framework.TestResult$1.protect(TestResult.java:106) at junit.framework.TestResult.runProtected(TestResult.java:124) at junit.framework.TestResult.run(TestResult.java:109) at junit.framework.TestCase.run(TestCase.java:120) at junit.framework.TestSuite.runTest(TestSuite.java:230) at junit.framework.TestSuite.run(TestSuite.java:225) at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:81) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:45) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196) {code} (that's an unlreated problem) Please note that I have not attempted to apply my changes to StreamResequencer. Martin highlighted that StreamResequencer has a similar structure to BatchProcessor. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions:
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51222#action_51222 ] William Tam commented on CAMEL-1510: Hi Christopher, Thanks for working on the issue. I'd say at least running all tests in camel-core. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > Attachments: BatchProcessor.java.20.diff > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51209#action_51209 ] Martin Krasser commented on CAMEL-1510: --- Thank you guys for this great discussion. It would be great to have a patch for the 2.x trunk and the 1.x branch. If I can help you doing that please let me know. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51208#action_51208 ] Christopher Hunt commented on CAMEL-1510: - Sure thing. I should be able to do this tomorrow - shall I work off the 2.0 trunk? > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } > } catch (InterruptedException e) { >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51207#action_51207 ] William Tam commented on CAMEL-1510: @Christopher You are pretty close to have a patch, right? Would you mind to create one? > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } > } catch (InterruptedException e) { >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51203#action_51203 ] William Tam commented on CAMEL-1510: @Martin Let me try to answer your question regarding InBatchCompleted(), the original patch from CAMEL-1037 has an issue. Suppose the batch size is a very small number (say 2). Someone can send a large number (say 1000) of messages to the BatchProcessor in a short period of time. It can cause the queue size to become much greater than the batch size. The reason is that the enqueueExchange only interrupts the Sender thread if it is sleeping. If the Sender thread is not sleeping it only drain 2 messages from the queue. The queue can back up pretty easily. When that happens, messages are stuck on the queue until batchTimeout expires. However, it only drains 2 messages (batchSize) for each batchTimeout. The "while (isInBatchCompleted(queue.size()" is a solution for that issue. We actually introduced new parameters InBatchSize and OutBatchSize. InBatchSize is how big the queue can grow before draining the messages to the collection. OutBatchSize is how big the collection can grow before messages are sent. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51174#action_51174 ] Christopher Hunt commented on CAMEL-1510: - Hi guys, Thanks for this great dialogue. With regards to: _I think that's also an issue in the original code (i.e. in the codebase, not Christopher's proposal): the batch sender has always been interrupted whenever a message was enequeued. From a user's perspective there's no noticable batch timeout, for example, when the batchTimout is set to 1000 ms and every 300 ms there is a message coming in (assuming the batch size is 100). Normally, the batch processor should send 3 messages after the timout occurs but using the original code it would send more (I'd expect 100 messages)_ I do not believe that is the case with the original code or my proposal. The queue is only drained into the collection if the batch times out. Using my code to illustrate: {code} if (!exchangeQueued) { drainQueueTo(collection, batchSize); } {code} Furthermore sendExchanges would not even be called unless the batch is complete given the following block: {code} if (exchangeQueued) { exchangeQueued = false; queueMutex.unlock(); try { while (isInBatchCompleted(queue.size())) { queueMutex.lock(); try { drainQueueTo(collection, batchSize); } finally { queueMutex.unlock(); } } if (!isOutBatchCompleted()) { continue; } } finally { queueMutex.lock(); } } {code} ... if the batch is not completed then the loop will continue. On the re-factoring, please bear in mind that in essence, all that I have done is moved blocks of existing code around and used a condition to signal when adding to the queue. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51171#action_51171 ] Martin Krasser commented on CAMEL-1510: --- My feedback on Christopher's comments: 1) ok 2) ok 3) ok 4) I added {{isInBatchCompleted()}} here because otherwise we'd always stop the batch sender from waiting {{batchTimeout}} ms. I think that's also an issue in the original code (i.e. in the codebase, not Christopher's proposal): the batch sender has always been interrupted whenever a message was enequeued. From a user's perspective there's no noticable batch timeout, for example, when the {{batchTimout}} is set to 1000 ms and every 300 ms there is a message coming in (assuming the batch size is 100). Normally, the batch processor should send 3 messages after the timout occurs but using the original code it would send more (I'd expect 100 messages) The unit test testing the batch timout only works because it only sends a single message (BTW the original patch from CAMEL-1037 honored the batch timeout). I didn't verify these statements in a unit test - just derived that from looking at the code. We should consider that when fixing this issue (maybe Christopher's initial proposal is not 'built on a proven code'). Regarding all other comments for this point: ok (it makes sense to do a more fine-grained locking). 5) Actually, I don't fully understand why a {{while (isInBatchCompleted(queue.size()))}} is used here anyway. Any thoughts? Regarding lock granularity: a more fine-grained locking woud make sense here too. Furthermore, I agree with William that Christopher's original proposal is a bit difficult to read. Some refactorings would help. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51169#action_51169 ] Christopher Hunt commented on CAMEL-1510: - My feedback on Martin's proposal: 1. cancelRequested is not required. 2. Line 19: move the lock before the while statement so that the block becomes (also using do instead of while) - less locking i.e.: {code} queueMutex.lock(); try { do { try { boolean signalled = queueCondition.await(batchTimeout, TimeUnit.MILLISECONDS); processEnqueuedExchanges(signalled); } catch (InterruptedException e) { break; } catch (Exception e) { // TODO: handle exception ... e.printStackTrace(); } } while (true); } finally { queueMutex.unlock(); } {code} 3. Preserve the existing code for the cancel method i.e. it should continue performing an interrupt i.e.: {code} public void cancel() { interrupt(); } {code} 4. enqueueExchange needs to add to the queue while the queue is locked i.e.: {code} public void enqueueExchange(Exchange exchange) { queueMutex.lock(); try { queue.add(exchange); if (isInBatchCompleted(queue.size())) { queueCondition.signal(); } } finally { queueMutex.unlock(); } } {code} NOTE: isInBatchCompleted is called while the queue is locked - my example did not do this. My focus was on keeping the locks locked minimally given the goal of performance and throughput. 5. processEnqueuedExchanges does not need to check if the batch is cancelled as the interrupt would have previously called an exception i.e.: {code} private void processEnqueuedExchanges(boolean signalled) throws Exception { if (!signalled) { drainQueueTo(collection, batchSize); } else { while (isInBatchCompleted(queue.size())) { drainQueueTo(collection, batchSize); } if (!isOutBatchCompleted()) { return; } } try { sendExchanges(); } catch (Exception e) { getExceptionHandler().handleException(e); } } {code} NOTE: isInBatchCompleted is now being called a second time - once inside enqueueExchange and now here. NOTE: sendExchanges is being called while the queue is locked. If there is some slow IO occurring (as was indeed the case with my determining this issue originally) then nothing can be added to the queue during sendExchanges. My focus with the original code submission was on minimising lock contentions while retaining a structure that built on proven code. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51168#action_51168 ] William Tam commented on CAMEL-1510: Thanks for the great discussion and proposal. I personally think Martin's version of BatchSender is a bit easier to read. If they both solve the performance issue, I'd give it a +1. I think we agree to call intercept() in cancel(). Regarding the last bullet, it looks like drainQueueTo() is called with queueMutex already held, right? > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandl
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51163#action_51163 ] Christopher Hunt commented on CAMEL-1510: - "my intention was to provide an implementation that signals the batch sender to stop waiting when the batch size has been reached and to continue processing. I think we should keep that. " I see the problem here - I forgot to include a signal of the condition variable. I will update my original comment to reflect this. My sincere apologies for the confusion. Would you mind re-reviewing my code? "Maybe we should also consider to have a shared implementation for the wait/signal/cancel mechanisms for the BatchProcessor and the StreamResequencer, otherwise, we'd need to implement similar things in two different places. " I agree, a shared batch sender style of class should be useful. "Do you want to provide a patch file plus some tests or should we wait for comments from one of the commiters how to proceed?" I'm happy to provide a patch file, though I did have difficulty building the camel distro. I could try again. I think that it would be great to receive some more feedback on this incident. Thanks again for the dialogue. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51162#action_51162 ] Martin Krasser commented on CAMEL-1510: --- Christopher, my intention was to provide an implementation that signals the batch sender to stop waiting when the batch size has been reached and to continue processing. I think we should keep that. This is especially useful when the batch timeout is set to a high value. It prevents the batch sender from unecessarily waiting when the batch size has already been reached. From what I've seen in your proposal this is not the case i.e. the batch sender continues to wait even if the in-batch size has been reached before the timeout. Changing cancellation to use {{interrupt()}} makes sense to me. Maybe we should also consider to have a shared implementation for the wait/signal/cancel mechanisms for the {{BatchProcessor}} and the {{StreamResequencer}}, otherwise, we'd need to implement similar things in two different places. Do you want to provide a patch file plus some tests or should we wait for comments from one of the commiters how to proceed? > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51160#action_51160 ] Christopher Hunt commented on CAMEL-1510: - Hi Martin, I'm still leaning to the patch that I provided... I think that it closely resembles the code that is already there which is essentially flawed only in the sense that it interrupts when adding an exchange. A couple of observations with your changes: - cancel still needs to interrupt - you really want to interrupt with cancellations. - cancelRequested is not being protected within the cancel method. - cancelRequested wouldn't be required if cancel interrupts. - drainQueueTo will need to protect the queue also. ... my code did do all of the above. Kind regards, Christopher > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51159#action_51159 ] Martin Krasser commented on CAMEL-1510: --- Hi Christopher, agreed that it's a bit more elegant to use the locking mechanism from {{java.util.concurrent.locks}} when using Java 5 or higher :) I just tried to solve the problem using {{ReentrantLock}} and {{Condition}} too but instead of using an {{exchangeQueued}} variable I let the {{enqueueExchange()}} and the {{cancel()}} methods to _signal_ the batch sender to resume processing. I tested the following code with the {{AggregatorTest}} unit tests. {code:java} private class BatchSender extends Thread { private volatile boolean cancelRequested; private Queue queue; private Lock queueMutex = new ReentrantLock(); private Condition queueCondition = queueMutex.newCondition(); public BatchSender() { super("Batch Sender"); this.queue = new LinkedList(); } @Override public void run() { while (true) { queueMutex.lock(); try { boolean signalled = queueCondition.await(batchTimeout, TimeUnit.MILLISECONDS); processEnqueuedExchanges(signalled); } catch (InterruptedException e) { break; } catch (Exception e) { // TODO: handle exception ... e.printStackTrace(); } finally { queueMutex.unlock(); } } } public void cancel() { cancelRequested = true; queueMutex.lock(); try { queueCondition.signal(); } finally { queueMutex.unlock(); } } public void enqueueExchange(Exchange exchange) { queue.add(exchange); queueMutex.lock(); try { if (isInBatchCompleted(queue.size())) { queueCondition.signal(); } } finally { queueMutex.unlock(); } } private void processEnqueuedExchanges(boolean signalled) throws Exception { if (!signalled) { drainQueueTo(collection, batchSize); } else { if (cancelRequested) { return; } while (isInBatchCompleted(queue.size())) { drainQueueTo(collection, batchSize); } if (!isOutBatchCompleted()) { return; } } try { sendExchanges(); } catch (Exception e) { getExceptionHandler().handleException(e); } } private void sendExchanges() throws Exception { ... } private void drainQueueTo(Collection collection, int batchSize) { ... } } {code} Does this make sense to you? BTW similar changes should also be applied to the stream resequencer. Let's close this issue only when both the {{BatchProcessor}} and {{StreamResequencer}} are fixed. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps some
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51158#action_51158 ] Christopher Hunt commented on CAMEL-1510: - Hi Martin, Thank you for replying. I presume by synchronised you mean that the enqueueExchange and sendExchanges lock on some shared mutex. I wonder with your suggestion if you might also have to try synchronising with other things that can then be overloaded e.g. isInBatchCompleted and isOutBatchCompleted. Who would know what these methods could eventually be doing? Personally I prefer to see the batch sender awake from known conditions i.e. timeout or exchange enqueued. For some reason I also feel that Interrupts are a little brutal and should be used sparingly. In addition I think that what I have proposed (albeit untested) would be more efficient as there is only one lock in play. The present solution has the lock associated within the blocking queue. You would of course being adding another lock with the potential for a deadlock. Thanks for the continued dialogue. Kind regards, Christopher > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51156#action_51156 ] Martin Krasser commented on CAMEL-1510: --- Christopher, sorry for reacting so late - didn't see the JIRA notification earlier. I wonder if it's sufficient to make the methods {{enqueueExchange()}} and {{sendExchanges}} synchronized to solve that problem. What do you think? > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); >
[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects
[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=50968#action_50968 ] Christopher Hunt commented on CAMEL-1510: - Just wondering... the batch processor's collection should always be a thread-safe type of collection. Is this the case in practice? If the collection is not thread safe then the batch sender run method will contend with the processor's isOutBatchCompleted(), doStop() and getCollection() methods. > BatchProcessor interrupt has side effects > - > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 1.6.0 > Environment: Mac OS X >Reporter: Christopher Hunt >Priority: Critical > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > }