[ 
https://issues.apache.org/jira/browse/CAMEL-15580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad Harvey updated CAMEL-15580:
--------------------------------
    Attachment: potentialPatch.txt

> SJMS Batch Consumer startup race condition
> ------------------------------------------
>
>                 Key: CAMEL-15580
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15580
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sjms
>    Affects Versions: 3.4.3
>            Reporter: Brad Harvey
>            Priority: Major
>         Attachments: potentialPatch.txt
>
>
> There is a race condition between the SJMS Batch Consumer route start thread 
> and the batch consumption loop thread.  When it triggers the batch 
> consumption loop exits early and the SJMS Batch Consumer does not read any 
> JMS messages.
> In short:
>  * The AtomicBoolean running is used as a flag to shut down the batch 
> consumption loop
>  * The batch consumption loop is submitted to another thread and only after 
> that running is changed to true
>  * This means sometimes the batch consumption loop sees running as false 
> during startup
> The easiest way to reproduce it is to add a sleep into 
> SJMSBatchConsumer$StartConsumerTask#run
>  
> {code:java}
> final List<AtomicBoolean> triggers = new ArrayList<>();
> for (int i = 0; i < consumerCount; i++) {
>     BatchConsumptionLoop loop = new BatchConsumptionLoop();
>     loop.setKeepAliveDelay(keepAliveDelay);
>     triggers.add(loop.getCompletionTimeoutTrigger());
>     /*
>      * Note: Batch consumption loop is submitted to another thread here
>      */
>     jmsConsumerExecutors.submit(loop);
> }
> if (completionInterval > 0) {
>     // trigger completion based on interval
>     timeoutCheckerExecutorService.scheduleAtFixedRate(new 
> CompletionIntervalTask(triggers), completionInterval, completionInterval, 
> TimeUnit.MILLISECONDS);
> }
> if (attempt > 1) {
>     LOG.info("Successfully refreshed connection after {} attempts.", attempt);
> }
> /*
>  * Note: Add this sleep to reproduce the race condition, simulating
>  * this thread being pre-empted by other work
>  */
> Thread.sleep(100);  
> LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, 
> completionSize);
> /*
>  * Note: running is only changed to true here but the batch consumption loop
>  * that reads this values was submitted to another thread earlier
>  */
> running.set(true);
> return;
>  {code}
>  
> The batch consumption loop checks the running flag like this:
> {code:java}
>             private void consumeBatchesOnLoop(final Session session, final 
> MessageConsumer consumer) throws JMSException {
>                 final boolean usingTimeout = completionTimeout > 0;
>                 LOG.trace("BatchConsumptionTask +++ start +++");
>                 while (running.get()) { {code}
>  
> Usually there's a second check that would cause everything to loop again - it 
> may see running as false but see isStarting() as true.
> {code:java}
>                 }while (running.get() || isStarting()); {code}
> But with asyncStartListener enabled I think that isStarting() is likely to be 
> false as well.
>  
> I believe this issue is causing fairly frequent intermittent test failures in 
> our CI environment (jenkins slaves in kubernetes, linux).  But I've been 
> unable to reproduce it on my laptop (windows) without adding the artificial 
> delay on the main thread.  
> I've been able to get thread dumps from the CI environment showing the 
> executor waiting for a task instead of executing the batch consumption loop
> {code:java}
> "Camel (camel-8) thread #125 - SjmsBatchConsumer" 
>    java.lang.Thread.State: WAITING
>         at sun.misc.Unsafe.park(Native Method)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
>         at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
>         at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
>  {code}
>  
> Usually they should look like this:
> {code:java}
> "Camel (camel-8) thread #123 - SjmsBatchConsumer" 
>    java.lang.Thread.State: TIMED_WAITING
>         at java.lang.Object.wait(Native Method)
>         at 
> org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74)
>         at 
> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486)
>         at 
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653)
>         at 
> org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:429)
>         at 
> org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.access$1300(SjmsBatchConsumer.java:383)
>         at 
> org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:326)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748) {code}
>  
> I also get tracing logs where the batch consumption tasks starts & ends very 
> quickly.
> {code:java}
>       Line 4377: 2020-09-24 03:16:41.567 DEBUG||| 4604 --- [artStopListener] 
> o.a.c.c.sjms.batch.SjmsBatchConsumer     : Attempt #1. Starting 1 consumer(s) 
> for myqueue:300
>       Line 4415: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] 
> o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask +++ start +++
>       Line 4416: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] 
> o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask +++ end +++
>       Line 4435: 2020-09-24 03:16:41.568 INFO ||| 4604 --- [artStopListener] 
> o.a.c.c.sjms.batch.SjmsBatchConsumer     : Started 1 consumer(s) for 
> myqueue:300 {code}
>  
> Side note: Could the queue name be added to the thread name?  The JMS 
> component consumers do that.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to