This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.20.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.20.x by this push: new da1fe9177e2 [CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered. (#10604) da1fe9177e2 is described below commit da1fe9177e2de8377f1e416b457c34abf4fb0777 Author: Nikunj Kumar Gupta <nikunj.gupta...@gmail.com> AuthorDate: Fri Jul 7 11:38:25 2023 +0530 [CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered. (#10604) * close all consumers concurrently Closing all consumers in RabbitMqConsumer concurrently * RabbitMQConsumer to suspend Consumer when suspending. * cancelling all RabbitConsumers in RabbitMQConsumer before trying to close them --- .../org/apache/camel/component/rabbitmq/RabbitConsumer.java | 13 ++++++++++++- .../apache/camel/component/rabbitmq/RabbitMQConsumer.java | 7 +++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java index 13115cd4536..82385124190 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -48,6 +48,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu private Channel channel; private String tag; private volatile String consumerTag; + private boolean cancelled; private final Semaphore lock = new Semaphore(1); @@ -208,12 +209,22 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu consumer.getEndpoint().isExclusiveConsumer(), null, this); } + protected void cancelChannel() throws Exception { + if (channel == null) { + return; + } + if (tag != null && isChannelOpen() && !cancelled) { + channel.basicCancel(tag); + cancelled = true; + } + } + @Override protected void doStop() throws Exception { if (channel == null) { return; } - if (tag != null && isChannelOpen()) { + if (tag != null && isChannelOpen() && !cancelled) { channel.basicCancel(tag); } try { diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 330ec59a590..e2d446ea3aa 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -155,6 +155,13 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { if (startConsumerCallable != null) { startConsumerCallable.stop(); } + for (RabbitConsumer consumer : this.consumers) { + try { + consumer.cancelChannel(); + } catch (Exception e) { + LOG.warn("Error occurred while cancelling consumer. This exception is ignored", e); + } + } for (RabbitConsumer consumer : this.consumers) { try { ServiceHelper.stopAndShutdownService(consumer);