[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51230#action_51230
 ] 

William Tam edited comment on CAMEL-1510 at 4/16/09 9:15 AM:
-------------------------------------------------------------

Thanks guys.  Christopher, I have a few questions/comments in the patch.

1. Please use spaces instead of tabs for indentation.  You can actually run 
style checker by "mvn -Psourcecheck".

Actually, now that we don't use thread interruption as signaling mechanism we 
do have to put the drainQueueTo() in a while loop.  (I removed some of my 
comments).  I will revise the patch and post it.



      was (Author: wtam):
    Thanks guys.  Christopher, I have a few questions/comments in the patch.

1. Please use spaces instead of tabs for indentation.  You can actually run 
style checker by "mvn -Psourcecheck".
2. In the run() method, I am not sure if it is 100% safe to unlock before 
calling queue.size().
{code}
queueLock.unlock();
                            try {
                                while (isInBatchCompleted(queue.size())) {
                                    queueLock.lock();
                                    try {
                                        drainQueueTo(collection, batchSize);
                                    } finally {
                                        queueLock.unlock();
                                    }
                                }

                                if (!isOutBatchCompleted()) {
                                    continue;
                                }
                            } finally {
                                queueLock.lock();
                            }
{code}

3. (I know this is inherited from the original code).  In the run() method, the 
first call "drainQueueTo(...)" probably should put a while loop around it (like 
the second call) since we don't know how many (could be 3X the InBatchSize for 
example) messages are on the queue.  We could move the while loop in the 
drainQueueTo() method.
{code}
                        if (!exchangeEnqueued) {
                            drainQueueTo(collection, batchSize);
{code}

Also, the following should be done regardless the exchangeEnqueued.   
(inherited from the original code as well)
{code}
                            if (!isOutBatchCompleted()) {
                                continue;
                            }
{code}
  
> BatchProcessor interrupt has side effects
> -----------------------------------------
>
>                 Key: CAMEL-1510
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-1510
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 1.6.0, 2.0-M1
>         Environment: Mac OS X
>            Reporter: Christopher Hunt
>            Assignee: William Tam
>            Priority: Critical
>         Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
>     private class BatchSender extends Thread {
>         private Queue<Exchange> queue;
>         private boolean exchangeQueued = false;
>         private Lock queueMutex = new ReentrantLock();
>         private Condition queueCondition = queueMutex.newCondition();
>         public BatchSender() {
>             super("Batch Sender");
>             this.queue = new LinkedList<Exchange>();
>         }
>         public void cancel() {
>             interrupt();
>         }
>         private void drainQueueTo(Collection<Exchange> collection, int 
> batchSize) {
>             for (int i = 0; i < batchSize; ++i) {
>                 Exchange e = queue.poll();
>                 if (e != null) {
>                     collection.add(e);
>                 } else {
>                     break;
>                 }
>             }
>         }
>         public void enqueueExchange(Exchange exchange) {
>             queueMutex.lock();
>             try {
>                 queue.add(exchange);
>                 exchangeQueued = true;
>                 queueCondition.signal();
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         @Override
>         public void run() {
>             queueMutex.lock();
>             try {
>                 do {
>                     try {
>                         if (!exchangeQueued) {
>                             queueCondition.await(batchTimeout,
>                                     TimeUnit.MILLISECONDS);
>                             if (!exchangeQueued) {
>                                 drainQueueTo(collection, batchSize);
>                             }
>                         }
>                         if (exchangeQueued) {
>                             exchangeQueued = false;
>                             queueMutex.unlock();
>                             try {
>                                 while (isInBatchCompleted(queue.size())) {
>                                     queueMutex.lock();
>                                     try {
>                                         drainQueueTo(collection, batchSize);
>                                     } finally {
>                                         queueMutex.unlock();
>                                     }
>                                 }
>                                 if (!isOutBatchCompleted()) {
>                                     continue;
>                                 }
>                             } finally {
>                                 queueMutex.lock();
>                             }
>                         }
>                         queueMutex.unlock();
>                         try {
>                             try {
>                                 sendExchanges();
>                             } catch (Exception e) {
>                                 getExceptionHandler().handleException(e);
>                             }
>                         } finally {
>                             queueMutex.lock();
>                         }
>                     } catch (InterruptedException e) {
>                         break;
>                     }
>                 } while (true);
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         private void sendExchanges() throws Exception {
>             Iterator<Exchange> iter = collection.iterator();
>             while (iter.hasNext()) {
>                 Exchange exchange = iter.next();
>                 iter.remove();
>                 processExchange(exchange);
>             }
>         }
>     }
> {code}
> I have replaced the concurrent queue with a regular linked list and mutexed 
> its access. In addition any queuing of exchanges is noted. This should result 
> in less locking.
> The main change though is that queuing an exchange does not interrupt the 
> batch sender's current activity.
> I hope that this sample is useful.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to