[ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51223#action_51223 ]
Christopher Hunt commented on CAMEL-1510: ----------------------------------------- I have now ran all of the camel-core tests and things appear fine: {code} cd Development/Eclipse/camel-workspace/camel-trunk/ cd camel-core/ mvn test [INFO] Scanning for projects... [INFO] ------------------------------------------------------------------------ [INFO] Building Camel :: Core [INFO] task-segment: [test] [INFO] ------------------------------------------------------------------------ [INFO] [resources:resources] [INFO] Using default encoding to copy filtered resources. [INFO] [compiler:compile] [INFO] Compiling 1 source file to /Volumes/Users HD/Users/huntc/Development/Eclipse/camel-workspace/camel-trunk/camel-core/target/classes [INFO] [resources:testResources] [INFO] Using default encoding to copy filtered resources. [INFO] [compiler:testCompile] [INFO] Nothing to compile - all classes are up to date [INFO] [surefire:test] [INFO] Surefire report directory: /Volumes/Users HD/Users/huntc/Development/Eclipse/camel-workspace/camel-trunk/camel-core/target/surefire-reports ------------------------------------------------------- T E S T S ------------------------------------------------------- Running org.apache.camel.management.JmxInstrumentationWithConnectorTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 7.151 sec Running org.apache.camel.issues.InterceptorLogTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.821 sec Running org.apache.camel.processor.RemoveHeaderTest ... Running org.apache.camel.converter.ConverterTest Tests run: 15, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.703 sec <<< FAILURE! ... Running org.apache.camel.util.jndi.JndiTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.175 sec Running org.apache.camel.component.bean.CustomParameterMappingStrategyTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.799 sec Results : Failed tests: Tests run: 941, Failures: 1, Errors: 0, Skipped: 0 [INFO] ------------------------------------------------------------------------ [ERROR] BUILD FAILURE [INFO] ------------------------------------------------------------------------ {code} ConverterTest fails because I am running on Mac OS X and have a path that includes a space: {code} ConverterTest org.apache.camel.converter.ConverterTest testFileToString(org.apache.camel.converter.ConverterTest) junit.framework.AssertionFailedError: Should have returned a String! at junit.framework.Assert.fail(Assert.java:47) at junit.framework.Assert.assertTrue(Assert.java:20) at junit.framework.Assert.assertNotNull(Assert.java:217) at org.apache.camel.converter.ConverterTest.testFileToString(ConverterTest.java:166) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:585) at junit.framework.TestCase.runTest(TestCase.java:164) at junit.framework.TestCase.runBare(TestCase.java:130) at junit.framework.TestResult$1.protect(TestResult.java:106) at junit.framework.TestResult.runProtected(TestResult.java:124) at junit.framework.TestResult.run(TestResult.java:109) at junit.framework.TestCase.run(TestCase.java:120) at junit.framework.TestSuite.runTest(TestSuite.java:230) at junit.framework.TestSuite.run(TestSuite.java:225) at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:81) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:45) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196) {code} (that's an unlreated problem) Please note that I have not attempted to apply my changes to StreamResequencer. Martin highlighted that StreamResequencer has a similar structure to BatchProcessor. > BatchProcessor interrupt has side effects > ----------------------------------------- > > Key: CAMEL-1510 > URL: https://issues.apache.org/activemq/browse/CAMEL-1510 > Project: Apache Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 1.6.0, 2.0-M1 > Environment: Mac OS X > Reporter: Christopher Hunt > Priority: Critical > Attachments: BatchProcessor.java.20.diff > > > I have noticed that the BatchProcessor class uses the Thread class interrupt > method to wake the run loop from sleeping within the enqueueExchange method. > The unfortunate side effect of this is that if the run loop is in the middle > of processing exchanges, and the processing involves something slow like > establishing a JMS connection over SSL or queuing to an asynchronous > processor, then the processing can become interrupted. The consequence of > this side effect is that the batch sender thread rarely gets the opportunity > to complete properly and exceptions regarding the interrupt are thrown. > This all became apparent during some performance testing that resulted in > continuously adding exchanges to the aggregator, the threshold becoming > reached, and then trying to enqueue the aggregated result to a JMS queue. > If my analysis of the BatchProcessor is correct then I would recommend finer > grained concurrency controls being used instead of relying upon interrupting > a thread. Perhaps something like the following (untested) re-write of the > sender: > {code} > private class BatchSender extends Thread { > private Queue<Exchange> queue; > private boolean exchangeQueued = false; > private Lock queueMutex = new ReentrantLock(); > private Condition queueCondition = queueMutex.newCondition(); > public BatchSender() { > super("Batch Sender"); > this.queue = new LinkedList<Exchange>(); > } > public void cancel() { > interrupt(); > } > private void drainQueueTo(Collection<Exchange> collection, int > batchSize) { > for (int i = 0; i < batchSize; ++i) { > Exchange e = queue.poll(); > if (e != null) { > collection.add(e); > } else { > break; > } > } > } > public void enqueueExchange(Exchange exchange) { > queueMutex.lock(); > try { > queue.add(exchange); > exchangeQueued = true; > queueCondition.signal(); > } finally { > queueMutex.unlock(); > } > } > @Override > public void run() { > queueMutex.lock(); > try { > do { > try { > if (!exchangeQueued) { > queueCondition.await(batchTimeout, > TimeUnit.MILLISECONDS); > if (!exchangeQueued) { > drainQueueTo(collection, batchSize); > } > } > if (exchangeQueued) { > exchangeQueued = false; > queueMutex.unlock(); > try { > while (isInBatchCompleted(queue.size())) { > queueMutex.lock(); > try { > drainQueueTo(collection, batchSize); > } finally { > queueMutex.unlock(); > } > } > if (!isOutBatchCompleted()) { > continue; > } > } finally { > queueMutex.lock(); > } > } > queueMutex.unlock(); > try { > try { > sendExchanges(); > } catch (Exception e) { > getExceptionHandler().handleException(e); > } > } finally { > queueMutex.lock(); > } > } catch (InterruptedException e) { > break; > } > } while (true); > } finally { > queueMutex.unlock(); > } > } > private void sendExchanges() throws Exception { > Iterator<Exchange> iter = collection.iterator(); > while (iter.hasNext()) { > Exchange exchange = iter.next(); > iter.remove(); > processExchange(exchange); > } > } > } > {code} > I have replaced the concurrent queue with a regular linked list and mutexed > its access. In addition any queuing of exchanges is noted. This should result > in less locking. > The main change though is that queuing an exchange does not interrupt the > batch sender's current activity. > I hope that this sample is useful. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.