This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch jms-inout in repository https://gitbox.apache.org/repos/asf/camel.git
commit 63b317e7731510742a679f409449489656219da6 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sat Mar 23 17:41:45 2024 +0100 camel-jms - Using in/out should use single atomic operation with the timeout map for the correlation id. Could help with org.apache.camel.component.jms.JmsProducerConcurrentWithReplyTest.testConcurrentProducers --- .../java/org/apache/camel/component/jms/reply/QueueReplyManager.java | 3 +-- .../org/apache/camel/component/jms/reply/ReplyManagerSupport.java | 4 ++-- .../apache/camel/component/jms/reply/TemporaryQueueReplyManager.java | 3 +-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java index c0697cf329d..000350941d2 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java @@ -70,13 +70,12 @@ public class QueueReplyManager extends ReplyManagerSupport { @Override protected void handleReplyMessage(String correlationID, Message message, Session session) { - ReplyHandler handler = correlation.get(correlationID); + ReplyHandler handler = correlation.remove(correlationID); if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) { handler = waitForProvisionCorrelationToBeUpdated(correlationID, message); } if (handler != null) { - correlation.remove(correlationID); handler.onReply(correlationID, message, session); } else { // we could not correlate the received reply message to a matching request and therefore diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index 8118c78ce27..b8e12e72078 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -17,6 +17,7 @@ package org.apache.camel.component.jms.reply; import java.time.Duration; +import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -250,12 +251,11 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl .build()) .build(); - return task.run(() -> getReplyHandler(correlationID), answer -> answer != null).orElse(null); + return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null); } private ReplyHandler getReplyHandler(String correlationID) { log.trace("Early reply not found handler. Waiting a bit longer."); - return correlation.get(correlationID); } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index fc715521c26..181463055d9 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -88,13 +88,12 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { @Override protected void handleReplyMessage(String correlationID, Message message, Session session) { - ReplyHandler handler = correlation.get(correlationID); + ReplyHandler handler = correlation.remove(correlationID); if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) { handler = waitForProvisionCorrelationToBeUpdated(correlationID, message); } if (handler != null) { - correlation.remove(correlationID); handler.onReply(correlationID, message, session); } else { // we could not correlate the received reply message to a matching request and therefore