This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.18.x by this push:
new 5adfcc66e136 CAMEL-23260: fix graceful shutdown for ServiceBus
consumer (#22444)
5adfcc66e136 is described below
commit 5adfcc66e136d7d26174a17712c0b9fdfcc9021f
Author: Federico Mariani <[email protected]>
AuthorDate: Wed Apr 8 08:45:44 2026 +0200
CAMEL-23260: fix graceful shutdown for ServiceBus consumer (#22444)
Implement ShutdownAware in ServiceBusConsumer to defer shutdown
until in-flight exchanges complete their ACK/NACK to Azure Service Bus.
Previously, doStop() immediately closed the client, causing
RejectedExecutionException and message redelivery when SIGTERM
was received during message processing.
---
.../azure/servicebus/ServiceBusConsumer.java | 93 ++++++++++++++++------
.../azure/servicebus/ServiceBusConsumerTest.java | 73 +++++++++++++++++
2 files changed, 142 insertions(+), 24 deletions(-)
diff --git
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
index 51197fa47d41..c4b153288f5a 100644
---
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
+++
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.azure.servicebus;
import java.util.Arrays;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.azure.messaging.servicebus.ServiceBusErrorContext;
@@ -31,17 +32,20 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServiceBusConsumer extends DefaultConsumer {
+public class ServiceBusConsumer extends DefaultConsumer implements
ShutdownAware {
private static final Logger LOG =
LoggerFactory.getLogger(ServiceBusConsumer.class);
private ServiceBusProcessorClient client;
+ private final AtomicInteger pendingExchanges = new AtomicInteger();
public ServiceBusConsumer(final ServiceBusEndpoint endpoint, final
Processor processor) {
super(endpoint, processor);
@@ -75,6 +79,7 @@ public class ServiceBusConsumer extends DefaultConsumer {
}
private void processMessage(ServiceBusReceivedMessageContext
messageContext) {
+ pendingExchanges.incrementAndGet();
final ServiceBusReceivedMessage message = messageContext.getMessage();
final Exchange exchange = createServiceBusExchange(message);
final ConsumerOnCompletion onCompletion = new
ConsumerOnCompletion(messageContext);
@@ -98,14 +103,45 @@ public class ServiceBusConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
if (client != null) {
- // shutdown the client
- client.close();
+ // stop accepting new messages but keep the connection open
+ // so that in-flight exchanges can still complete/abandon messages
+ client.stop();
}
// shutdown camel consumer
super.doStop();
}
+ @Override
+ protected void doShutdown() throws Exception {
+ if (client != null) {
+ // close the client after all in-flight exchanges have completed
+ client.close();
+ }
+
+ super.doShutdown();
+ }
+
+ @Override
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ // stop accepting new messages but keep the connection open
+ // so that in-flight exchanges can still complete/abandon messages
+ if (client != null) {
+ client.stop();
+ }
+ return true;
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ return pendingExchanges.get();
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // noop
+ }
+
public ServiceBusConfiguration getConfiguration() {
return getEndpoint().getConfiguration();
}
@@ -171,36 +207,45 @@ public class ServiceBusConsumer extends DefaultConsumer {
@Override
public void onComplete(Exchange exchange) {
- super.onComplete(exchange);
- if (getConfiguration().getServiceBusReceiveMode() ==
ServiceBusReceiveMode.PEEK_LOCK) {
- messageContext.complete();
+ try {
+ super.onComplete(exchange);
+ if (getConfiguration().getServiceBusReceiveMode() ==
ServiceBusReceiveMode.PEEK_LOCK) {
+ messageContext.complete();
+ }
+ } finally {
+ pendingExchanges.decrementAndGet();
}
}
@Override
public void onFailure(Exchange exchange) {
- final Exception cause = exchange.getException();
- if (cause != null) {
- getExceptionHandler().handleException("Error during processing
exchange.", exchange, cause);
- }
+ try {
+ final Exception cause = exchange.getException();
+ if (cause != null) {
+ getExceptionHandler().handleException("Error during
processing exchange.", exchange, cause);
+ }
- if (getConfiguration().getServiceBusReceiveMode() ==
ServiceBusReceiveMode.PEEK_LOCK) {
- if (getConfiguration().isEnableDeadLettering() &&
(ObjectHelper.isEmpty(getConfiguration().getSubQueue())
- ||
ObjectHelper.equal(getConfiguration().getSubQueue(), SubQueue.NONE))) {
- DeadLetterOptions deadLetterOptions = new
DeadLetterOptions();
- if (cause != null) {
- deadLetterOptions
- .setDeadLetterReason(String.format("%s: %s",
cause.getClass().getName(), cause.getMessage()));
-
deadLetterOptions.setDeadLetterErrorDescription(Arrays.stream(cause.getStackTrace())
- .map(StackTraceElement::toString)
- .collect(Collectors.joining("\n")));
- messageContext.deadLetter(deadLetterOptions);
+ if (getConfiguration().getServiceBusReceiveMode() ==
ServiceBusReceiveMode.PEEK_LOCK) {
+ if (getConfiguration().isEnableDeadLettering() &&
(ObjectHelper.isEmpty(getConfiguration().getSubQueue())
+ ||
ObjectHelper.equal(getConfiguration().getSubQueue(), SubQueue.NONE))) {
+ DeadLetterOptions deadLetterOptions = new
DeadLetterOptions();
+ if (cause != null) {
+ deadLetterOptions
+ .setDeadLetterReason(
+ String.format("%s: %s",
cause.getClass().getName(), cause.getMessage()));
+
deadLetterOptions.setDeadLetterErrorDescription(Arrays.stream(cause.getStackTrace())
+ .map(StackTraceElement::toString)
+ .collect(Collectors.joining("\n")));
+ messageContext.deadLetter(deadLetterOptions);
+ } else {
+ messageContext.deadLetter();
+ }
} else {
- messageContext.deadLetter();
+ messageContext.abandon();
}
- } else {
- messageContext.abandon();
}
+ } finally {
+ pendingExchanges.decrementAndGet();
}
}
}
diff --git
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
index 914573ff0de9..0965859c715f 100644
---
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
+++
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
@@ -517,6 +517,79 @@ public class ServiceBusConsumerTest {
}
}
+ @Test
+ void deferShutdownStopsClientAndReturnsTrue() throws Exception {
+ try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint,
processor)) {
+ consumer.doStart();
+
+ boolean deferred =
consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks);
+
+ assertThat(deferred).isTrue();
+ verify(client).stop();
+ }
+ }
+
+ @Test
+ void pendingExchangesSizeTracksInflightMessages() throws Exception {
+ try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint,
processor)) {
+
when(configuration.getServiceBusReceiveMode()).thenReturn(ServiceBusReceiveMode.PEEK_LOCK);
+ consumer.doStart();
+
+ assertThat(consumer.getPendingExchangesSize()).isZero();
+
+ when(messageContext.getMessage()).thenReturn(message);
+ processMessageCaptor.getValue().accept(messageContext);
+
+ assertThat(consumer.getPendingExchangesSize()).isEqualTo(1);
+
+ Exchange exchange = exchangeCaptor.getValue();
+ Synchronization synchronization =
exchange.getExchangeExtension().handoverCompletions().get(0);
+ synchronization.onComplete(exchange);
+
+ assertThat(consumer.getPendingExchangesSize()).isZero();
+ }
+ }
+
+ @Test
+ void pendingExchangesSizeDecrementsOnFailure() throws Exception {
+ try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint,
processor)) {
+
when(configuration.getServiceBusReceiveMode()).thenReturn(ServiceBusReceiveMode.PEEK_LOCK);
+ consumer.doStart();
+
+ when(messageContext.getMessage()).thenReturn(message);
+ processMessageCaptor.getValue().accept(messageContext);
+
+ assertThat(consumer.getPendingExchangesSize()).isEqualTo(1);
+
+ Exchange exchange = exchangeCaptor.getValue();
+ Synchronization synchronization =
exchange.getExchangeExtension().handoverCompletions().get(0);
+ synchronization.onFailure(exchange);
+
+ assertThat(consumer.getPendingExchangesSize()).isZero();
+ }
+ }
+
+ @Test
+ void doStopStopsClientWithoutClosing() throws Exception {
+ ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint,
processor);
+ consumer.doStart();
+
+ consumer.doStop();
+
+ verify(client).stop();
+ verify(client, never()).close();
+ }
+
+ @Test
+ void doShutdownClosesClient() throws Exception {
+ ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint,
processor);
+ consumer.doStart();
+
+ consumer.doShutdown();
+
+ verify(client).close();
+ }
+
private void configureMockMessage() {
when(message.getApplicationProperties()).thenReturn(new HashMap<>());
when(message.getBody()).thenReturn(BinaryData.fromBytes(MESSAGE_BODY.getBytes()));