This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.18.x by this push:
     new 5adfcc66e136 CAMEL-23260: fix graceful shutdown for ServiceBus 
consumer (#22444)
5adfcc66e136 is described below

commit 5adfcc66e136d7d26174a17712c0b9fdfcc9021f
Author: Federico Mariani <[email protected]>
AuthorDate: Wed Apr 8 08:45:44 2026 +0200

    CAMEL-23260: fix graceful shutdown for ServiceBus consumer (#22444)
    
    Implement ShutdownAware in ServiceBusConsumer to defer shutdown
    until in-flight exchanges complete their ACK/NACK to Azure Service Bus.
    Previously, doStop() immediately closed the client, causing
    RejectedExecutionException and message redelivery when SIGTERM
    was received during message processing.
---
 .../azure/servicebus/ServiceBusConsumer.java       | 93 ++++++++++++++++------
 .../azure/servicebus/ServiceBusConsumerTest.java   | 73 +++++++++++++++++
 2 files changed, 142 insertions(+), 24 deletions(-)

diff --git 
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
 
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
index 51197fa47d41..c4b153288f5a 100644
--- 
a/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
+++ 
b/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.azure.servicebus;
 
 import java.util.Arrays;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import com.azure.messaging.servicebus.ServiceBusErrorContext;
@@ -31,17 +32,20 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ServiceBusConsumer extends DefaultConsumer {
+public class ServiceBusConsumer extends DefaultConsumer implements 
ShutdownAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ServiceBusConsumer.class);
     private ServiceBusProcessorClient client;
+    private final AtomicInteger pendingExchanges = new AtomicInteger();
 
     public ServiceBusConsumer(final ServiceBusEndpoint endpoint, final 
Processor processor) {
         super(endpoint, processor);
@@ -75,6 +79,7 @@ public class ServiceBusConsumer extends DefaultConsumer {
     }
 
     private void processMessage(ServiceBusReceivedMessageContext 
messageContext) {
+        pendingExchanges.incrementAndGet();
         final ServiceBusReceivedMessage message = messageContext.getMessage();
         final Exchange exchange = createServiceBusExchange(message);
         final ConsumerOnCompletion onCompletion = new 
ConsumerOnCompletion(messageContext);
@@ -98,14 +103,45 @@ public class ServiceBusConsumer extends DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         if (client != null) {
-            // shutdown the client
-            client.close();
+            // stop accepting new messages but keep the connection open
+            // so that in-flight exchanges can still complete/abandon messages
+            client.stop();
         }
 
         // shutdown camel consumer
         super.doStop();
     }
 
+    @Override
+    protected void doShutdown() throws Exception {
+        if (client != null) {
+            // close the client after all in-flight exchanges have completed
+            client.close();
+        }
+
+        super.doShutdown();
+    }
+
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // stop accepting new messages but keep the connection open
+        // so that in-flight exchanges can still complete/abandon messages
+        if (client != null) {
+            client.stop();
+        }
+        return true;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        return pendingExchanges.get();
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // noop
+    }
+
     public ServiceBusConfiguration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
@@ -171,36 +207,45 @@ public class ServiceBusConsumer extends DefaultConsumer {
 
         @Override
         public void onComplete(Exchange exchange) {
-            super.onComplete(exchange);
-            if (getConfiguration().getServiceBusReceiveMode() == 
ServiceBusReceiveMode.PEEK_LOCK) {
-                messageContext.complete();
+            try {
+                super.onComplete(exchange);
+                if (getConfiguration().getServiceBusReceiveMode() == 
ServiceBusReceiveMode.PEEK_LOCK) {
+                    messageContext.complete();
+                }
+            } finally {
+                pendingExchanges.decrementAndGet();
             }
         }
 
         @Override
         public void onFailure(Exchange exchange) {
-            final Exception cause = exchange.getException();
-            if (cause != null) {
-                getExceptionHandler().handleException("Error during processing 
exchange.", exchange, cause);
-            }
+            try {
+                final Exception cause = exchange.getException();
+                if (cause != null) {
+                    getExceptionHandler().handleException("Error during 
processing exchange.", exchange, cause);
+                }
 
-            if (getConfiguration().getServiceBusReceiveMode() == 
ServiceBusReceiveMode.PEEK_LOCK) {
-                if (getConfiguration().isEnableDeadLettering() && 
(ObjectHelper.isEmpty(getConfiguration().getSubQueue())
-                        || 
ObjectHelper.equal(getConfiguration().getSubQueue(), SubQueue.NONE))) {
-                    DeadLetterOptions deadLetterOptions = new 
DeadLetterOptions();
-                    if (cause != null) {
-                        deadLetterOptions
-                                .setDeadLetterReason(String.format("%s: %s", 
cause.getClass().getName(), cause.getMessage()));
-                        
deadLetterOptions.setDeadLetterErrorDescription(Arrays.stream(cause.getStackTrace())
-                                .map(StackTraceElement::toString)
-                                .collect(Collectors.joining("\n")));
-                        messageContext.deadLetter(deadLetterOptions);
+                if (getConfiguration().getServiceBusReceiveMode() == 
ServiceBusReceiveMode.PEEK_LOCK) {
+                    if (getConfiguration().isEnableDeadLettering() && 
(ObjectHelper.isEmpty(getConfiguration().getSubQueue())
+                            || 
ObjectHelper.equal(getConfiguration().getSubQueue(), SubQueue.NONE))) {
+                        DeadLetterOptions deadLetterOptions = new 
DeadLetterOptions();
+                        if (cause != null) {
+                            deadLetterOptions
+                                    .setDeadLetterReason(
+                                            String.format("%s: %s", 
cause.getClass().getName(), cause.getMessage()));
+                            
deadLetterOptions.setDeadLetterErrorDescription(Arrays.stream(cause.getStackTrace())
+                                    .map(StackTraceElement::toString)
+                                    .collect(Collectors.joining("\n")));
+                            messageContext.deadLetter(deadLetterOptions);
+                        } else {
+                            messageContext.deadLetter();
+                        }
                     } else {
-                        messageContext.deadLetter();
+                        messageContext.abandon();
                     }
-                } else {
-                    messageContext.abandon();
                 }
+            } finally {
+                pendingExchanges.decrementAndGet();
             }
         }
     }
diff --git 
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
 
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
index 914573ff0de9..0965859c715f 100644
--- 
a/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
+++ 
b/components/camel-azure/camel-azure-servicebus/src/test/java/org/apache/camel/component/azure/servicebus/ServiceBusConsumerTest.java
@@ -517,6 +517,79 @@ public class ServiceBusConsumerTest {
         }
     }
 
+    @Test
+    void deferShutdownStopsClientAndReturnsTrue() throws Exception {
+        try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, 
processor)) {
+            consumer.doStart();
+
+            boolean deferred = 
consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks);
+
+            assertThat(deferred).isTrue();
+            verify(client).stop();
+        }
+    }
+
+    @Test
+    void pendingExchangesSizeTracksInflightMessages() throws Exception {
+        try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, 
processor)) {
+            
when(configuration.getServiceBusReceiveMode()).thenReturn(ServiceBusReceiveMode.PEEK_LOCK);
+            consumer.doStart();
+
+            assertThat(consumer.getPendingExchangesSize()).isZero();
+
+            when(messageContext.getMessage()).thenReturn(message);
+            processMessageCaptor.getValue().accept(messageContext);
+
+            assertThat(consumer.getPendingExchangesSize()).isEqualTo(1);
+
+            Exchange exchange = exchangeCaptor.getValue();
+            Synchronization synchronization = 
exchange.getExchangeExtension().handoverCompletions().get(0);
+            synchronization.onComplete(exchange);
+
+            assertThat(consumer.getPendingExchangesSize()).isZero();
+        }
+    }
+
+    @Test
+    void pendingExchangesSizeDecrementsOnFailure() throws Exception {
+        try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, 
processor)) {
+            
when(configuration.getServiceBusReceiveMode()).thenReturn(ServiceBusReceiveMode.PEEK_LOCK);
+            consumer.doStart();
+
+            when(messageContext.getMessage()).thenReturn(message);
+            processMessageCaptor.getValue().accept(messageContext);
+
+            assertThat(consumer.getPendingExchangesSize()).isEqualTo(1);
+
+            Exchange exchange = exchangeCaptor.getValue();
+            Synchronization synchronization = 
exchange.getExchangeExtension().handoverCompletions().get(0);
+            synchronization.onFailure(exchange);
+
+            assertThat(consumer.getPendingExchangesSize()).isZero();
+        }
+    }
+
+    @Test
+    void doStopStopsClientWithoutClosing() throws Exception {
+        ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, 
processor);
+        consumer.doStart();
+
+        consumer.doStop();
+
+        verify(client).stop();
+        verify(client, never()).close();
+    }
+
+    @Test
+    void doShutdownClosesClient() throws Exception {
+        ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, 
processor);
+        consumer.doStart();
+
+        consumer.doShutdown();
+
+        verify(client).close();
+    }
+
     private void configureMockMessage() {
         when(message.getApplicationProperties()).thenReturn(new HashMap<>());
         
when(message.getBody()).thenReturn(BinaryData.fromBytes(MESSAGE_BODY.getBytes()));

Reply via email to