[ 
https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51223#action_51223
 ] 

Christopher Hunt commented on CAMEL-1510:
-----------------------------------------

I have now ran all of the camel-core tests and things appear fine:

{code}
cd Development/Eclipse/camel-workspace/camel-trunk/
cd camel-core/
mvn test
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Building Camel :: Core
[INFO]    task-segment: [test]
[INFO] ------------------------------------------------------------------------
[INFO] [resources:resources]
[INFO] Using default encoding to copy filtered resources.
[INFO] [compiler:compile]
[INFO] Compiling 1 source file to /Volumes/Users 
HD/Users/huntc/Development/Eclipse/camel-workspace/camel-trunk/camel-core/target/classes
[INFO] [resources:testResources]
[INFO] Using default encoding to copy filtered resources.
[INFO] [compiler:testCompile]
[INFO] Nothing to compile - all classes are up to date
[INFO] [surefire:test]
[INFO] Surefire report directory: /Volumes/Users 
HD/Users/huntc/Development/Eclipse/camel-workspace/camel-trunk/camel-core/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.apache.camel.management.JmxInstrumentationWithConnectorTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 7.151 sec
Running org.apache.camel.issues.InterceptorLogTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.821 sec
Running org.apache.camel.processor.RemoveHeaderTest
...
Running org.apache.camel.converter.ConverterTest
Tests run: 15, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.703 sec <<< 
FAILURE!
...
Running org.apache.camel.util.jndi.JndiTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.175 sec
Running org.apache.camel.component.bean.CustomParameterMappingStrategyTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.799 sec

Results :

Failed tests: 

Tests run: 941, Failures: 1, Errors: 0, Skipped: 0

[INFO] ------------------------------------------------------------------------
[ERROR] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
{code}

ConverterTest fails because I am running on Mac OS X and have a path that 
includes a space:

{code}
ConverterTest
org.apache.camel.converter.ConverterTest
testFileToString(org.apache.camel.converter.ConverterTest)
junit.framework.AssertionFailedError: Should have returned a String!
        at junit.framework.Assert.fail(Assert.java:47)
        at junit.framework.Assert.assertTrue(Assert.java:20)
        at junit.framework.Assert.assertNotNull(Assert.java:217)
        at 
org.apache.camel.converter.ConverterTest.testFileToString(ConverterTest.java:166)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:585)
        at junit.framework.TestCase.runTest(TestCase.java:164)
        at junit.framework.TestCase.runBare(TestCase.java:130)
        at junit.framework.TestResult$1.protect(TestResult.java:106)
        at junit.framework.TestResult.runProtected(TestResult.java:124)
        at junit.framework.TestResult.run(TestResult.java:109)
        at junit.framework.TestCase.run(TestCase.java:120)
        at junit.framework.TestSuite.runTest(TestSuite.java:230)
        at junit.framework.TestSuite.run(TestSuite.java:225)
        at 
org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:81)
        at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:45)
        at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
        at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
        at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
        at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
        at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)

{code}

(that's an unlreated problem)

Please note that I have not attempted to apply my changes to StreamResequencer. 
Martin highlighted that StreamResequencer has a similar structure to 
BatchProcessor.

> BatchProcessor interrupt has side effects
> -----------------------------------------
>
>                 Key: CAMEL-1510
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-1510
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 1.6.0, 2.0-M1
>         Environment: Mac OS X
>            Reporter: Christopher Hunt
>            Priority: Critical
>         Attachments: BatchProcessor.java.20.diff
>
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt 
> method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle 
> of processing exchanges, and the processing involves something slow like 
> establishing a JMS connection over SSL or queuing to an asynchronous 
> processor, then the processing can become interrupted. The consequence of 
> this side effect is that the batch sender thread rarely gets the opportunity 
> to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in 
> continuously adding exchanges to the aggregator, the threshold becoming 
> reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer 
> grained concurrency controls being used instead of relying upon interrupting 
> a thread. Perhaps something like the following (untested) re-write of the 
> sender:
> {code}
>     private class BatchSender extends Thread {
>         private Queue<Exchange> queue;
>         private boolean exchangeQueued = false;
>         private Lock queueMutex = new ReentrantLock();
>         private Condition queueCondition = queueMutex.newCondition();
>         public BatchSender() {
>             super("Batch Sender");
>             this.queue = new LinkedList<Exchange>();
>         }
>         public void cancel() {
>             interrupt();
>         }
>         private void drainQueueTo(Collection<Exchange> collection, int 
> batchSize) {
>             for (int i = 0; i < batchSize; ++i) {
>                 Exchange e = queue.poll();
>                 if (e != null) {
>                     collection.add(e);
>                 } else {
>                     break;
>                 }
>             }
>         }
>         public void enqueueExchange(Exchange exchange) {
>             queueMutex.lock();
>             try {
>                 queue.add(exchange);
>                 exchangeQueued = true;
>                 queueCondition.signal();
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         @Override
>         public void run() {
>             queueMutex.lock();
>             try {
>                 do {
>                     try {
>                         if (!exchangeQueued) {
>                             queueCondition.await(batchTimeout,
>                                     TimeUnit.MILLISECONDS);
>                             if (!exchangeQueued) {
>                                 drainQueueTo(collection, batchSize);
>                             }
>                         }
>                         if (exchangeQueued) {
>                             exchangeQueued = false;
>                             queueMutex.unlock();
>                             try {
>                                 while (isInBatchCompleted(queue.size())) {
>                                     queueMutex.lock();
>                                     try {
>                                         drainQueueTo(collection, batchSize);
>                                     } finally {
>                                         queueMutex.unlock();
>                                     }
>                                 }
>                                 if (!isOutBatchCompleted()) {
>                                     continue;
>                                 }
>                             } finally {
>                                 queueMutex.lock();
>                             }
>                         }
>                         queueMutex.unlock();
>                         try {
>                             try {
>                                 sendExchanges();
>                             } catch (Exception e) {
>                                 getExceptionHandler().handleException(e);
>                             }
>                         } finally {
>                             queueMutex.lock();
>                         }
>                     } catch (InterruptedException e) {
>                         break;
>                     }
>                 } while (true);
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         private void sendExchanges() throws Exception {
>             Iterator<Exchange> iter = collection.iterator();
>             while (iter.hasNext()) {
>                 Exchange exchange = iter.next();
>                 iter.remove();
>                 processExchange(exchange);
>             }
>         }
>     }
> {code}
> I have replaced the concurrent queue with a regular linked list and mutexed 
> its access. In addition any queuing of exchanges is noted. This should result 
> in less locking.
> The main change though is that queuing an exchange does not interrupt the 
> batch sender's current activity.
> I hope that this sample is useful.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to