[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-19 Thread Martin Krasser (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51281#action_51281
 ] 

Martin Krasser commented on CAMEL-1510:
---

Hi Christopher,

Spurious wakeups are ok for the stream resequencer. If they occur the 
ResequencerEngine takes care that only elements are delivered if they really 
timed-out. We can leave the code as is.


> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Assignee: William Tam
>Priority: Critical
> Fix For: 2.0.0, 1.6.1
>
> Attachments: BatchProcessor-lockmin.java.20.diff, 
> BatchProcessor.java.20.diff, camel-core-1.x.patch, camel-core-2.x.patch
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> 

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-18 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51273#action_51273
 ] 

Christopher Hunt commented on CAMEL-1510:
-

Hi Martin,

I just had a look at your changes to StreamResequencer and wonder if you need 
to signal that a request has been delivered using a boolean as well as relying 
on the condition variable. Perhaps you do not need to discriminate between a 
timeout and whether a request is made, but I thought you should know that a 
condition variable can wake up spuriously on some platforms i.e. non-timeout 
and non-signal.

>From the Java 5 javadoc:

_When waiting upon a Condition, a "spurious wakeup" is permitted to occur, in 
general, as a concession to the underlying platform semantics. This has little 
practical impact on most application programs as a Condition should always be 
waited upon in a loop, testing the state predicate that is being waited for. An 
implementation is free to remove the possibility of spurious wakeups but it is 
recommended that applications programmers always assume that they can occur and 
so always wait in a loop._

'hope that this is useful to you.

Kind regards,
Christopher

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Assignee: William Tam
>Priority: Critical
> Fix For: 2.0.0, 1.6.1
>
> Attachments: BatchProcessor-lockmin.java.20.diff, 
> BatchProcessor.java.20.diff, camel-core-1.x.patch, camel-core-2.x.patch
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.l

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-16 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51242#action_51242
 ] 

William Tam commented on CAMEL-1510:


Submitted Christopher's second patch (rr765824 and r765825).  Thanks!

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Assignee: William Tam
>Priority: Critical
> Fix For: 2.0.0, 1.6.1
>
> Attachments: BatchProcessor-lockmin.java.20.diff, 
> BatchProcessor.java.20.diff, camel-core-1.x.patch, camel-core-2.x.patch
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
>  

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-16 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51240#action_51240
 ] 

Christopher Hunt commented on CAMEL-1510:
-

Hi William,

Thanks for submitting my patch to BatchProcessor. Thank you also for reviewing 
the code and noticing that queue.size() wasn't protected. I obtained your 
version of BatchProcessor from the trunk and further noticed that the call to 
isOutBatchCompleted can be safely performed while retaining the queue lock. I 
was under the original impression (through not looking) that isInBatchCompleted 
and isOutBatchCompleted were overload-able. Since they are private then this 
can not be the case and thus can be invoked while retaining the queue lock. The 
code is nicely simplified by removing the locking around these calls.

I have attached a minor patch reflecting the above after having performed the 
camel-core test cases successfully again. The patch is for the 2.0 source.

Kind regards,
Christopher

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Assignee: William Tam
>Priority: Critical
> Fix For: 2.0.0, 1.6.1
>
> Attachments: BatchProcessor-lockmin.java.20.diff, 
> BatchProcessor.java.20.diff, camel-core-1.x.patch, camel-core-2.x.patch
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
>  

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-16 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51238#action_51238
 ] 

William Tam commented on CAMEL-1510:


Submitted Martin's patch to trunk (r765729) and 1.x (r765731);

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Assignee: William Tam
>Priority: Critical
> Attachments: BatchProcessor.java.20.diff, camel-core-1.x.patch, 
> camel-core-2.x.patch
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
> 

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-16 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51236#action_51236
 ] 

William Tam commented on CAMEL-1510:


To answer Christopher's question:
"Just wondering... the batch processor's collection should always be a 
thread-safe type of collection. Is this the case in practice? If the collection 
is not thread safe then the batch sender run method will contend with the 
processor's isOutBatchCompleted(), doStop() and getCollection() methods."

The batch processor's collection is exclusively accessed by the BatchSender 
thread (with the exception accessed by the doStop() method), so the collection 
does not need to be a thread-safe type.  The doStop() method is called during 
shutdown and it interrupts the BatchSender thread before clear() on the 
collection, so it should be fine.  The getCollection() is a protected method 
and it never gets called.  We probably should get rid of getCollection() and 
make In/OutBatchCompleted() method private.  Thoughts?

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Assignee: William Tam
>Priority: Critical
> Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
>

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-16 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51235#action_51235
 ] 

William Tam commented on CAMEL-1510:


Christopher's patch has been submitted to trunk (r765686) and 1.x (r765689).   
Martin, I will submit your fix to StreamResequencer whenever you are ready. 

Thanks!


> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Assignee: William Tam
>Priority: Critical
> Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } fina

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-16 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51234#action_51234
 ] 

William Tam commented on CAMEL-1510:


The run() method body would look like this.   Do you see any problem?

{code]
 queueLock.lock();
try {
do {
try {
if (!exchangeEnqueued) {
exchangeEnqueuedCondition.await(batchTimeout, 
TimeUnit.MILLISECONDS);
}

// Wake up either because timeout or a messages have 
been enqueue
while (isInBatchCompleted(queue.size())) {
drainQueueTo(collection, batchSize);
}

if (exchangeEnqueued) {
exchangeEnqueued = false;
}

queueLock.unlock();
try {
if (!isOutBatchCompleted()) {
continue;
}

try {
sendExchanges();
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
} finally {
queueLock.lock();
}

} catch (InterruptedException e) {
break;
}

} while (true);

} finally {
queueLock.unlock();
}

{code}

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Assignee: William Tam
>Priority: Critical
> Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(coll

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-16 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51230#action_51230
 ] 

William Tam commented on CAMEL-1510:


Thanks guys.  Christopher, I have a few questions/comments in the patch.

1. Please use spaces instead of tabs for indentation.  You can actually run 
style checker by "mvn -Psourcecheck".
2. In the run() method, I am not sure if it is 100% safe to unlock before 
calling queue.size().
{code}
queueLock.unlock();
try {
while (isInBatchCompleted(queue.size())) {
queueLock.lock();
try {
drainQueueTo(collection, batchSize);
} finally {
queueLock.unlock();
}
}

if (!isOutBatchCompleted()) {
continue;
}
} finally {
queueLock.lock();
}
{code}

3. In the run() method, the first call "drainQueueTo(...)" probably should put 
a while loop around it (like the second call) since we don't know how many 
(could be 3X the InBatchSize for example) messages are on the queue.  We could 
move the while loop in the drainQueueTo() method.
{code}
if (!exchangeEnqueued) {
drainQueueTo(collection, batchSize);
{code}


> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
> Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQ

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-15 Thread Martin Krasser (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51224#action_51224
 ] 

Martin Krasser commented on CAMEL-1510:
---

I'll provide a patch for the StreamResequencer within the next days.

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
> Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
> }
> } catch (InterruptedEx

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-15 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51223#action_51223
 ] 

Christopher Hunt commented on CAMEL-1510:
-

I have now ran all of the camel-core tests and things appear fine:

{code}
cd Development/Eclipse/camel-workspace/camel-trunk/
cd camel-core/
mvn test
[INFO] Scanning for projects...
[INFO] 
[INFO] Building Camel :: Core
[INFO]task-segment: [test]
[INFO] 
[INFO] [resources:resources]
[INFO] Using default encoding to copy filtered resources.
[INFO] [compiler:compile]
[INFO] Compiling 1 source file to /Volumes/Users 
HD/Users/huntc/Development/Eclipse/camel-workspace/camel-trunk/camel-core/target/classes
[INFO] [resources:testResources]
[INFO] Using default encoding to copy filtered resources.
[INFO] [compiler:testCompile]
[INFO] Nothing to compile - all classes are up to date
[INFO] [surefire:test]
[INFO] Surefire report directory: /Volumes/Users 
HD/Users/huntc/Development/Eclipse/camel-workspace/camel-trunk/camel-core/target/surefire-reports

---
 T E S T S
---
Running org.apache.camel.management.JmxInstrumentationWithConnectorTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 7.151 sec
Running org.apache.camel.issues.InterceptorLogTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.821 sec
Running org.apache.camel.processor.RemoveHeaderTest
...
Running org.apache.camel.converter.ConverterTest
Tests run: 15, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.703 sec <<< 
FAILURE!
...
Running org.apache.camel.util.jndi.JndiTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.175 sec
Running org.apache.camel.component.bean.CustomParameterMappingStrategyTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.799 sec

Results :

Failed tests: 

Tests run: 941, Failures: 1, Errors: 0, Skipped: 0

[INFO] 
[ERROR] BUILD FAILURE
[INFO] 
{code}

ConverterTest fails because I am running on Mac OS X and have a path that 
includes a space:

{code}
ConverterTest
org.apache.camel.converter.ConverterTest
testFileToString(org.apache.camel.converter.ConverterTest)
junit.framework.AssertionFailedError: Should have returned a String!
at junit.framework.Assert.fail(Assert.java:47)
at junit.framework.Assert.assertTrue(Assert.java:20)
at junit.framework.Assert.assertNotNull(Assert.java:217)
at 
org.apache.camel.converter.ConverterTest.testFileToString(ConverterTest.java:166)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:585)
at junit.framework.TestCase.runTest(TestCase.java:164)
at junit.framework.TestCase.runBare(TestCase.java:130)
at junit.framework.TestResult$1.protect(TestResult.java:106)
at junit.framework.TestResult.runProtected(TestResult.java:124)
at junit.framework.TestResult.run(TestResult.java:109)
at junit.framework.TestCase.run(TestCase.java:120)
at junit.framework.TestSuite.runTest(TestSuite.java:230)
at junit.framework.TestSuite.run(TestSuite.java:225)
at 
org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:81)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:45)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)

{code}

(that's an unlreated problem)

Please note that I have not attempted to apply my changes to StreamResequencer. 
Martin highlighted that StreamResequencer has a similar structure to 
BatchProcessor.

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-15 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51222#action_51222
 ] 

William Tam commented on CAMEL-1510:


Hi Christopher,
Thanks for working on the issue.  I'd say at least running all tests in 
camel-core. 

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
> Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
> }
>

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-14 Thread Martin Krasser (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51209#action_51209
 ] 

Martin Krasser commented on CAMEL-1510:
---

Thank you guys for this great discussion. It would be great to have a patch for 
the 2.x trunk and the 1.x branch. If I can help you doing that please let me 
know.

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
> }
>

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-14 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51208#action_51208
 ] 

Christopher Hunt commented on CAMEL-1510:
-

Sure thing. I should be able to do this tomorrow - shall I work off the 2.0 
trunk?

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
> }
> } catch (InterruptedException e) {
>

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-14 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51207#action_51207
 ] 

William Tam commented on CAMEL-1510:


@Christopher
You are pretty close to have a patch, right?  Would you mind to create one?   

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
> }
> } catch (InterruptedException e) {
>  

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-14 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51203#action_51203
 ] 

William Tam commented on CAMEL-1510:


@Martin
Let me try to answer your question regarding InBatchCompleted(),  the original 
patch from CAMEL-1037 has an issue.   Suppose the batch size is a very small 
number (say 2).  Someone can send a large number (say 1000) of messages to the 
BatchProcessor in a short period of time.  It can cause the queue size to 
become much greater than the batch size. The reason is that the enqueueExchange 
only interrupts the Sender thread if it is sleeping.  If the Sender thread is 
not sleeping it only drain 2 messages from the queue.  The queue can back up 
pretty easily.   When that happens, messages are stuck on the queue until 
batchTimeout expires.  However, it only drains 2 messages (batchSize) for each 
batchTimeout.  

The "while (isInBatchCompleted(queue.size()" is a solution for that issue.  We 
actually introduced new parameters InBatchSize and OutBatchSize.  InBatchSize 
is how big the queue can grow before draining the messages to the collection.  
OutBatchSize is how big the collection can grow before messages are sent. 



> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> 

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-13 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51174#action_51174
 ] 

Christopher Hunt commented on CAMEL-1510:
-

Hi guys,

Thanks for this great dialogue.

With regards to:

_I think that's also an issue in the original code (i.e. in the codebase, not 
Christopher's proposal): the batch sender has always been interrupted whenever 
a message was enequeued. From a user's perspective there's no noticable batch 
timeout, for example, when the batchTimout is set to 1000 ms and every 300 ms 
there is a message coming in (assuming the batch size is 100). Normally, the 
batch processor should send 3 messages after the timout occurs but using the 
original code it would send more (I'd expect 100 messages)_

I do not believe that is the case with the original code or my proposal. The 
queue is only drained into the collection if the batch times out. Using my code 
to illustrate:

{code}
if (!exchangeQueued) {
  drainQueueTo(collection, batchSize);
}
{code}

Furthermore sendExchanges would not even be called unless the batch is complete 
given the following block:

{code}
  if (exchangeQueued) {
   exchangeQueued = false;
   queueMutex.unlock();
   try {
   while (isInBatchCompleted(queue.size())) {
   queueMutex.lock();
   try {
   drainQueueTo(collection, batchSize);
   } finally {
   queueMutex.unlock();
   }
   }

   if (!isOutBatchCompleted()) {
   continue;
   }
   } finally {
   queueMutex.lock();
   }

   }

{code}

... if the batch is not completed then the loop will continue.

On the re-factoring, please bear in mind that in essence, all that I have done 
is moved blocks of existing code around and used a condition to signal when 
adding to the queue.

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
>

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-13 Thread Martin Krasser (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51171#action_51171
 ] 

Martin Krasser commented on CAMEL-1510:
---

My feedback on Christopher's comments:

1) ok
2) ok
3) ok
4) I added {{isInBatchCompleted()}} here because otherwise we'd always stop the 
batch sender from waiting {{batchTimeout}} ms. I think that's also an issue in 
the original code (i.e. in the codebase, not Christopher's proposal): the batch 
sender has always been interrupted whenever a message was enequeued. From a 
user's perspective there's no noticable batch timeout, for example, when the 
{{batchTimout}} is set to 1000 ms and every 300 ms there is a message coming in 
(assuming the batch size is 100). Normally, the batch processor should send 3 
messages after the timout occurs but using the original code it would send more 
(I'd expect 100 messages) The unit test testing the batch timout only works 
because it only sends a single message (BTW the original patch from CAMEL-1037 
honored the batch timeout). I didn't verify these statements in a unit test - 
just derived that from looking at the code. We should consider that when fixing 
this issue (maybe Christopher's initial proposal is not 'built on a proven 
code'). Regarding all other comments for this point: ok (it makes sense to do a 
more fine-grained locking).
5) Actually, I don't fully understand why a {{while 
(isInBatchCompleted(queue.size()))}} is used here anyway. Any thoughts? 
Regarding lock granularity: a more fine-grained locking woud make sense here 
too.

Furthermore, I agree with William that Christopher's original proposal is a bit 
difficult to read. Some refactorings would help. 

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
>   

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-13 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51169#action_51169
 ] 

Christopher Hunt commented on CAMEL-1510:
-

My feedback on Martin's proposal:

1. cancelRequested is not required.
2. Line 19: move the lock before the while statement so that the block becomes 
(also using do instead of while) - less locking i.e.:

{code}
queueMutex.lock();
try {
  do {
try {
  boolean signalled = queueCondition.await(batchTimeout,
  TimeUnit.MILLISECONDS);
  processEnqueuedExchanges(signalled);
} catch (InterruptedException e) {
  break;
} catch (Exception e) {
  // TODO: handle exception ...
  e.printStackTrace();
}

  } while (true);
} finally {
  queueMutex.unlock();
}
{code}

3. Preserve the existing code for the cancel method i.e. it should continue 
performing an interrupt i.e.:

{code}
public void cancel() {
  interrupt();
}
{code}

4. enqueueExchange needs to add to the queue while the queue is locked i.e.:

{code}
  public void enqueueExchange(Exchange exchange) {
queueMutex.lock();
try {
  queue.add(exchange);
  if (isInBatchCompleted(queue.size())) {
queueCondition.signal();
  }
} finally {
  queueMutex.unlock();
}
  }
{code}

NOTE: isInBatchCompleted is called while the queue is locked - my example did 
not do this. My focus was on keeping the locks locked minimally given the goal 
of performance and throughput.

5. processEnqueuedExchanges does not need to check if the batch is cancelled as 
the interrupt would have previously called an exception i.e.:

{code}
  private void processEnqueuedExchanges(boolean signalled) throws Exception {

if (!signalled) {
  drainQueueTo(collection, batchSize);
} else {
  while (isInBatchCompleted(queue.size())) {
drainQueueTo(collection, batchSize);
  }

  if (!isOutBatchCompleted()) {
return;
  }
}

try {
  sendExchanges();
} catch (Exception e) {
  getExceptionHandler().handleException(e);
}

  }
{code}

NOTE: isInBatchCompleted is now being called a second time - once inside 
enqueueExchange and now here.

NOTE: sendExchanges is being called while the queue is locked. If there is some 
slow IO occurring (as was indeed the case with my determining this issue 
originally) then nothing can be added to the queue during sendExchanges.

My focus with the original code submission was on minimising lock contentions 
while retaining a structure that built on proven code.

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
>   

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-13 Thread William Tam (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51168#action_51168
 ] 

William Tam commented on CAMEL-1510:


Thanks for the great discussion and proposal.   I personally think Martin's 
version of BatchSender is a bit easier to read.  If they both solve the 
performance issue, I'd give it a +1.  I think we agree to call intercept() in 
cancel().  Regarding the last bullet, it looks like drainQueueTo() is called 
with queueMutex already held, right?

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> queueCondition.signal();
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandl

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-13 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51163#action_51163
 ] 

Christopher Hunt commented on CAMEL-1510:
-

"my intention was to provide an implementation that signals the batch sender to 
stop waiting when the batch size has been reached and to continue processing. I 
think we should keep that. "

I see the problem here - I forgot to include a signal of the condition 
variable. I will update my original comment to reflect this. My sincere 
apologies for the confusion. Would you mind re-reviewing my code?

"Maybe we should also consider to have a shared implementation for the 
wait/signal/cancel mechanisms for the BatchProcessor and the StreamResequencer, 
otherwise, we'd need to implement similar things in two different places. "

I agree, a shared batch sender style of class should be useful.

"Do you want to provide a patch file plus some tests or should we wait for 
comments from one of the commiters how to proceed?"

I'm happy to provide a patch file, though I did have difficulty building the 
camel distro. I could try again.

I think that it would be great to receive some more feedback on this incident.

Thanks again for the dialogue.

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
>  

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-13 Thread Martin Krasser (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51162#action_51162
 ] 

Martin Krasser commented on CAMEL-1510:
---

Christopher, my intention was to provide an implementation that signals the 
batch sender to stop waiting when the batch size has been reached and to 
continue processing. I think we should keep that. This is especially useful 
when the batch timeout is set to a high value. It prevents the batch sender 
from unecessarily waiting when the batch size has already been reached. From 
what I've seen in your proposal this is not the case i.e. the batch sender 
continues to wait even if the in-batch size has been reached before the 
timeout. Changing cancellation to use {{interrupt()}} makes sense to me.

Maybe we should also consider to have a shared implementation for the 
wait/signal/cancel mechanisms for the {{BatchProcessor}} and the 
{{StreamResequencer}}, otherwise, we'd need to implement similar things in two 
different places. Do you want to provide a patch file plus some tests or should 
we wait for comments from one of the commiters how to proceed?


> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
>  

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-12 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51160#action_51160
 ] 

Christopher Hunt commented on CAMEL-1510:
-

Hi Martin,

I'm still leaning to the patch that I provided... I think that it closely 
resembles the code that is already there which is essentially flawed only in 
the sense that it interrupts when adding an exchange. 

A couple of observations with your changes:
- cancel still needs to interrupt - you really want to interrupt with 
cancellations.
- cancelRequested is not being protected within the cancel method.
- cancelRequested wouldn't be required if cancel interrupts.
- drainQueueTo will need to protect the queue also.

... my code did do all of the above.

Kind regards,
Christopher

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
>

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-12 Thread Martin Krasser (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51159#action_51159
 ] 

Martin Krasser commented on CAMEL-1510:
---

Hi Christopher,

agreed that it's a bit more elegant to use the locking mechanism from 
{{java.util.concurrent.locks}} when using Java 5 or higher :) I just tried to 
solve the problem using {{ReentrantLock}} and {{Condition}} too but instead of 
using an {{exchangeQueued}} variable I let the {{enqueueExchange()}} and the 
{{cancel()}} methods to _signal_ the batch sender to resume processing. I 
tested the following code with the {{AggregatorTest}} unit tests.

{code:java}
private class BatchSender extends Thread {

private volatile boolean cancelRequested;

private Queue queue;

private Lock queueMutex = new ReentrantLock();

private Condition queueCondition = queueMutex.newCondition();

public BatchSender() {
super("Batch Sender");
this.queue = new LinkedList();
}

@Override
public void run() {
while (true) {
queueMutex.lock();
try {
boolean signalled = queueCondition.await(batchTimeout, 
TimeUnit.MILLISECONDS);
processEnqueuedExchanges(signalled);
} catch (InterruptedException e) {
break;
} catch (Exception e) {
// TODO: handle exception ...
e.printStackTrace();
} finally {
queueMutex.unlock();
}

}
}

public void cancel() {
cancelRequested = true;
queueMutex.lock();
try {
queueCondition.signal();
} finally {
queueMutex.unlock();
}
}
 
public void enqueueExchange(Exchange exchange) {
queue.add(exchange);
queueMutex.lock();
try {
if (isInBatchCompleted(queue.size())) {
queueCondition.signal();
}
} finally {
queueMutex.unlock();
}
}

private void processEnqueuedExchanges(boolean signalled) throws 
Exception {

if (!signalled) { 
drainQueueTo(collection, batchSize); 
} else {
if (cancelRequested) {
return;
}
while (isInBatchCompleted(queue.size())) {
drainQueueTo(collection, batchSize);  
}

if (!isOutBatchCompleted()) {
return;
}
}


try {
sendExchanges();
} catch (Exception e) {
getExceptionHandler().handleException(e);
}

}

private void sendExchanges() throws Exception {
   ...
}

private void drainQueueTo(Collection collection, int 
batchSize) {
   ...
}

}
{code}

Does this make sense to you?

BTW similar changes should also be applied to the stream resequencer. Let's 
close this issue only when both the {{BatchProcessor}} and 
{{StreamResequencer}} are fixed.


> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps some

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-12 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51158#action_51158
 ] 

Christopher Hunt commented on CAMEL-1510:
-

Hi Martin,

Thank you for replying.

I presume by synchronised you mean that the enqueueExchange and sendExchanges 
lock on some shared mutex.

I wonder with your suggestion if you might also have to try synchronising with 
other things that can then be overloaded e.g. isInBatchCompleted and 
isOutBatchCompleted. Who would know what these methods could eventually be 
doing?

Personally I prefer to see the batch sender awake from known conditions i.e. 
timeout or exchange enqueued. For some reason I also feel that Interrupts are a 
little brutal and should be used sparingly.

In addition I think that what I have proposed (albeit untested) would be more 
efficient as there is only one lock in play. The present solution has the lock 
associated within the blocking queue. You would of course being adding another 
lock with the potential for a deadlock.

Thanks for the continued dialogue.

Kind regards,
Christopher

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
>  

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-12 Thread Martin Krasser (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51156#action_51156
 ] 

Martin Krasser commented on CAMEL-1510:
---

Christopher, sorry for reacting so late - didn't see the JIRA notification 
earlier. I wonder if it's sufficient to make the methods {{enqueueExchange()}} 
and {{sendExchanges}} synchronized to solve that problem. What do you think?

> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0, 2.0-M1
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> } finally {
> queueMutex.lock();
>   

[jira] Commented: (CAMEL-1510) BatchProcessor interrupt has side effects

2009-04-01 Thread Christopher Hunt (JIRA)

[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=50968#action_50968
 ] 

Christopher Hunt commented on CAMEL-1510:
-

Just wondering... the batch processor's collection should always be a 
thread-safe type of collection. Is this the case in practice? If the collection 
is not thread safe then the batch sender run method will contend with the 
processor's isOutBatchCompleted(), doStop() and getCollection() methods.


> BatchProcessor interrupt has side effects
> -
>
> Key: CAMEL-1510
> URL: https://issues.apache.org/activemq/browse/CAMEL-1510
> Project: Apache Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 1.6.0
> Environment: Mac OS X
>Reporter: Christopher Hunt
>Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
> private class BatchSender extends Thread {
> private Queue queue;
> private boolean exchangeQueued = false;
> private Lock queueMutex = new ReentrantLock();
> private Condition queueCondition = queueMutex.newCondition();
> public BatchSender() {
> super("Batch Sender");
> this.queue = new LinkedList();
> }
> public void cancel() {
> interrupt();
> }
> private void drainQueueTo(Collection collection, int 
> batchSize) {
> for (int i = 0; i < batchSize; ++i) {
> Exchange e = queue.poll();
> if (e != null) {
> collection.add(e);
> } else {
> break;
> }
> }
> }
> public void enqueueExchange(Exchange exchange) {
> queueMutex.lock();
> try {
> queue.add(exchange);
> exchangeQueued = true;
> } finally {
> queueMutex.unlock();
> }
> }
> @Override
> public void run() {
> queueMutex.lock();
> try {
> do {
> try {
> if (!exchangeQueued) {
> queueCondition.await(batchTimeout,
> TimeUnit.MILLISECONDS);
> if (!exchangeQueued) {
> drainQueueTo(collection, batchSize);
> }
> }
> if (exchangeQueued) {
> exchangeQueued = false;
> queueMutex.unlock();
> try {
> while (isInBatchCompleted(queue.size())) {
> queueMutex.lock();
> try {
> drainQueueTo(collection, batchSize);
> } finally {
> queueMutex.unlock();
> }
> }
> if (!isOutBatchCompleted()) {
> continue;
> }
> } finally {
> queueMutex.lock();
> }
> }
> queueMutex.unlock();
> try {
> try {
> sendExchanges();
> } catch (Exception e) {
> getExceptionHandler().handleException(e);
> }
> }