[ https://issues.apache.org/jira/browse/CAMEL-15580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Brad Harvey updated CAMEL-15580: -------------------------------- Attachment: potentialPatch.txt > SJMS Batch Consumer startup race condition > ------------------------------------------ > > Key: CAMEL-15580 > URL: https://issues.apache.org/jira/browse/CAMEL-15580 > Project: Camel > Issue Type: Bug > Components: camel-sjms > Affects Versions: 3.4.3 > Reporter: Brad Harvey > Priority: Major > Attachments: potentialPatch.txt > > > There is a race condition between the SJMS Batch Consumer route start thread > and the batch consumption loop thread. When it triggers the batch > consumption loop exits early and the SJMS Batch Consumer does not read any > JMS messages. > In short: > * The AtomicBoolean running is used as a flag to shut down the batch > consumption loop > * The batch consumption loop is submitted to another thread and only after > that running is changed to true > * This means sometimes the batch consumption loop sees running as false > during startup > The easiest way to reproduce it is to add a sleep into > SJMSBatchConsumer$StartConsumerTask#run > > {code:java} > final List<AtomicBoolean> triggers = new ArrayList<>(); > for (int i = 0; i < consumerCount; i++) { > BatchConsumptionLoop loop = new BatchConsumptionLoop(); > loop.setKeepAliveDelay(keepAliveDelay); > triggers.add(loop.getCompletionTimeoutTrigger()); > /* > * Note: Batch consumption loop is submitted to another thread here > */ > jmsConsumerExecutors.submit(loop); > } > if (completionInterval > 0) { > // trigger completion based on interval > timeoutCheckerExecutorService.scheduleAtFixedRate(new > CompletionIntervalTask(triggers), completionInterval, completionInterval, > TimeUnit.MILLISECONDS); > } > if (attempt > 1) { > LOG.info("Successfully refreshed connection after {} attempts.", attempt); > } > /* > * Note: Add this sleep to reproduce the race condition, simulating > * this thread being pre-empted by other work > */ > Thread.sleep(100); > LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, > completionSize); > /* > * Note: running is only changed to true here but the batch consumption loop > * that reads this values was submitted to another thread earlier > */ > running.set(true); > return; > {code} > > The batch consumption loop checks the running flag like this: > {code:java} > private void consumeBatchesOnLoop(final Session session, final > MessageConsumer consumer) throws JMSException { > final boolean usingTimeout = completionTimeout > 0; > LOG.trace("BatchConsumptionTask +++ start +++"); > while (running.get()) { {code} > > Usually there's a second check that would cause everything to loop again - it > may see running as false but see isStarting() as true. > {code:java} > }while (running.get() || isStarting()); {code} > But with asyncStartListener enabled I think that isStarting() is likely to be > false as well. > > I believe this issue is causing fairly frequent intermittent test failures in > our CI environment (jenkins slaves in kubernetes, linux). But I've been > unable to reproduce it on my laptop (windows) without adding the artificial > delay on the main thread. > I've been able to get thread dumps from the CI environment showing the > executor waiting for a task instead of executing the batch consumption loop > {code:java} > "Camel (camel-8) thread #125 - SjmsBatchConsumer" > java.lang.Thread.State: WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > Usually they should look like this: > {code:java} > "Camel (camel-8) thread #123 - SjmsBatchConsumer" > java.lang.Thread.State: TIMED_WAITING > at java.lang.Object.wait(Native Method) > at > org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74) > at > org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486) > at > org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653) > at > org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:429) > at > org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.access$1300(SjmsBatchConsumer.java:383) > at > org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:326) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) {code} > > I also get tracing logs where the batch consumption tasks starts & ends very > quickly. > {code:java} > Line 4377: 2020-09-24 03:16:41.567 DEBUG||| 4604 --- [artStopListener] > o.a.c.c.sjms.batch.SjmsBatchConsumer : Attempt #1. Starting 1 consumer(s) > for myqueue:300 > Line 4415: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] > o.a.c.c.sjms.batch.SjmsBatchConsumer : BatchConsumptionTask +++ start +++ > Line 4416: 2020-09-24 03:16:41.576 TRACE||| 4604 --- [msBatchConsumer] > o.a.c.c.sjms.batch.SjmsBatchConsumer : BatchConsumptionTask +++ end +++ > Line 4435: 2020-09-24 03:16:41.568 INFO ||| 4604 --- [artStopListener] > o.a.c.c.sjms.batch.SjmsBatchConsumer : Started 1 consumer(s) for > myqueue:300 {code} > > Side note: Could the queue name be added to the thread name? The JMS > component consumers do that. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)