This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch fix/CAMEL-23291
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ae66f915908c9a88be442c525a243222fb60e2db
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Jun 6 10:48:58 2026 +0200

    CAMEL-23291: Add ShutdownAware support to EventHubs and Google PubSub 
consumers
    
    Co-Authored-By: Claude <[email protected]>
    Signed-off-by: Claus Ibsen <[email protected]>
---
 .../azure/eventhubs/EventHubsConsumer.java         |  58 +++++++--
 .../eventhubs/EventHubsConsumerShutdownTest.java   | 145 +++++++++++++++++++++
 .../google/pubsub/GooglePubsubConsumer.java        |  76 ++++++++---
 .../pubsub/consumer/CamelMessageReceiver.java      |  34 +++--
 .../pubsub/GooglePubsubConsumerShutdownTest.java   |  83 ++++++++++++
 5 files changed, 355 insertions(+), 41 deletions(-)

diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index 58dc787dd88c..f465326fbe86 100644
--- 
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -28,7 +28,9 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
 import 
org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
+import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
 import org.slf4j.Logger;
@@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory;
 import static 
org.apache.camel.component.azure.eventhubs.EventHubsConstants.COMPLETED_BY_SIZE;
 import static 
org.apache.camel.component.azure.eventhubs.EventHubsConstants.COMPLETED_BY_TIMEOUT;
 
-public class EventHubsConsumer extends DefaultConsumer {
+public class EventHubsConsumer extends DefaultConsumer implements 
ShutdownAware {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsConsumer.class);
 
@@ -45,6 +47,7 @@ public class EventHubsConsumer extends DefaultConsumer {
     private EventProcessorClient processorClient;
 
     private final AtomicInteger processedEvents;
+    private final AtomicInteger pendingExchanges = new AtomicInteger();
     private ScheduledExecutorService scheduledExecutorService;
     private ScheduledFuture<?> lastScheduledTask;
     private EventHubsCheckpointUpdaterTask lastTask;
@@ -74,19 +77,44 @@ public class EventHubsConsumer extends DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         if (processorClient != null) {
-            // shutdown the client
+            // stop accepting new messages but keep the connection open
+            // so that in-flight exchanges can still complete
             processorClient.stop();
-            processorClient = null;
         }
 
-        // shutdown scheduled executor
+        // shutdown camel consumer
+        super.doStop();
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        processorClient = null;
+
+        // shutdown scheduled executor after all in-flight exchanges have 
completed
         if (scheduledExecutorService != null) {
             
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(scheduledExecutorService);
             scheduledExecutorService = null;
         }
 
-        // shutdown camel consumer
-        super.doStop();
+        super.doShutdown();
+    }
+
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        if (processorClient != null) {
+            processorClient.stop();
+        }
+        return true;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        return pendingExchanges.get();
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // noop
     }
 
     public EventHubsConfiguration getConfiguration() {
@@ -133,20 +161,30 @@ public class EventHubsConsumer extends DefaultConsumer {
     }
 
     private void onEventListener(final EventContext eventContext) {
+        pendingExchanges.incrementAndGet();
+
         final Exchange exchange = createAzureEventHubExchange(eventContext);
 
         // add exchange callback
         exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
             @Override
             public void onComplete(Exchange exchange) {
-                // we update the consumer offsets
-                processCommit(exchange, eventContext);
+                try {
+                    // we update the consumer offsets
+                    processCommit(exchange, eventContext);
+                } finally {
+                    pendingExchanges.decrementAndGet();
+                }
             }
 
             @Override
             public void onFailure(Exchange exchange) {
-                // we do nothing here
-                processRollback(exchange);
+                try {
+                    // we do nothing here
+                    processRollback(exchange);
+                } finally {
+                    pendingExchanges.decrementAndGet();
+                }
             }
         });
         // use default consumer callback
diff --git 
a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java
 
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java
new file mode 100644
index 000000000000..b52c7054ff2c
--- /dev/null
+++ 
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import java.lang.reflect.Field;
+
+import com.azure.messaging.eventhubs.EventProcessorClient;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class EventHubsConsumerShutdownTest {
+
+    private final EventHubsEndpoint endpoint = mock();
+    private final EventHubsConfiguration configuration = mock();
+    private final AsyncProcessor processor = mock();
+    private final CamelContext context = mock();
+    private final ExtendedCamelContext ecc = mock();
+    private final ExchangeFactory ef = mock();
+    private final EventProcessorClient processorClient = mock();
+
+    @BeforeEach
+    void setUp() {
+        when(endpoint.getCamelContext()).thenReturn(context);
+        when(context.getCamelContextExtension()).thenReturn(ecc);
+        when(ecc.getExchangeFactory()).thenReturn(ef);
+        when(ef.newExchangeFactory(any())).thenReturn(ef);
+        when(ef.create(any(Endpoint.class), anyBoolean()))
+                .thenAnswer(invocation -> 
DefaultExchange.newFromEndpoint(invocation.getArgument(0)));
+        when(endpoint.getConfiguration()).thenReturn(configuration);
+    }
+
+    @Test
+    void deferShutdownReturnsTrueAndStopsClient() throws Exception {
+        EventHubsConsumer consumer = new EventHubsConsumer(endpoint, 
processor);
+        setProcessorClient(consumer, processorClient);
+
+        boolean deferred = 
consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks);
+
+        assertTrue(deferred);
+        verify(processorClient).stop();
+    }
+
+    @Test
+    void pendingExchangesSizeIsZeroInitially() {
+        EventHubsConsumer consumer = new EventHubsConsumer(endpoint, 
processor);
+
+        assertEquals(0, consumer.getPendingExchangesSize());
+    }
+
+    @Test
+    void pendingExchangesSizeTracksInflightMessages() throws Exception {
+        EventHubsConsumer consumer = new EventHubsConsumer(endpoint, 
processor);
+        setProcessorClient(consumer, processorClient);
+
+        assertEquals(0, consumer.getPendingExchangesSize());
+
+        // simulate onEventListener by getting the completion callback and 
invoking it
+        Exchange exchange = DefaultExchange.newFromEndpoint(endpoint);
+        // manually increment to simulate what onEventListener does
+        incrementPendingExchanges(consumer);
+
+        assertEquals(1, consumer.getPendingExchangesSize());
+
+        // simulate completion
+        decrementPendingExchanges(consumer);
+
+        assertEquals(0, consumer.getPendingExchangesSize());
+    }
+
+    @Test
+    void doStopStopsClientWithoutNullingIt() throws Exception {
+        EventHubsConsumer consumer = new EventHubsConsumer(endpoint, 
processor);
+        setProcessorClient(consumer, processorClient);
+
+        consumer.doStop();
+
+        verify(processorClient).stop();
+        // processorClient should still be available for in-flight exchanges
+        assertEquals(processorClient, getProcessorClient(consumer));
+    }
+
+    @Test
+    void doShutdownNullsClient() throws Exception {
+        EventHubsConsumer consumer = new EventHubsConsumer(endpoint, 
processor);
+        setProcessorClient(consumer, processorClient);
+
+        consumer.doShutdown();
+
+        assertEquals(null, getProcessorClient(consumer));
+    }
+
+    private static void setProcessorClient(EventHubsConsumer consumer, 
EventProcessorClient client) throws Exception {
+        Field field = 
EventHubsConsumer.class.getDeclaredField("processorClient");
+        field.setAccessible(true);
+        field.set(consumer, client);
+    }
+
+    private static EventProcessorClient getProcessorClient(EventHubsConsumer 
consumer) throws Exception {
+        Field field = 
EventHubsConsumer.class.getDeclaredField("processorClient");
+        field.setAccessible(true);
+        return (EventProcessorClient) field.get(consumer);
+    }
+
+    private static void incrementPendingExchanges(EventHubsConsumer consumer) 
throws Exception {
+        Field field = 
EventHubsConsumer.class.getDeclaredField("pendingExchanges");
+        field.setAccessible(true);
+        ((java.util.concurrent.atomic.AtomicInteger) 
field.get(consumer)).incrementAndGet();
+    }
+
+    private static void decrementPendingExchanges(EventHubsConsumer consumer) 
throws Exception {
+        Field field = 
EventHubsConsumer.class.getDeclaredField("pendingExchanges");
+        field.setAccessible(true);
+        ((java.util.concurrent.atomic.AtomicInteger) 
field.get(consumer)).decrementAndGet();
+    }
+}
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 3835f525d621..49cc2ebccf2c 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.api.core.AbstractApiService;
 import com.google.api.core.ApiFuture;
@@ -43,11 +44,13 @@ import com.google.pubsub.v1.ReceivedMessage;
 import com.google.pubsub.v1.Subscription;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion;
 import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
 import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
 import 
org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.UnitOfWorkHelper;
@@ -56,12 +59,13 @@ import org.apache.camel.support.task.budget.Budgets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GooglePubsubConsumer extends DefaultConsumer {
+public class GooglePubsubConsumer extends DefaultConsumer implements 
ShutdownAware {
 
     private final Logger localLog;
 
     private final GooglePubsubEndpoint endpoint;
     private final Processor processor;
+    private final AtomicInteger pendingExchanges = new AtomicInteger();
     private ExecutorService executor;
     private final List<Subscriber> subscribers;
     private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses;
@@ -113,6 +117,11 @@ public class GooglePubsubConsumer extends DefaultConsumer {
 
         safeCancelSynchronousPullResponses();
 
+        super.doStop();
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
                 
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
@@ -122,7 +131,26 @@ public class GooglePubsubConsumer extends DefaultConsumer {
         }
         executor = null;
 
-        super.doStop();
+        super.doShutdown();
+    }
+
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        if (!subscribers.isEmpty()) {
+            subscribers.forEach(AbstractApiService::stopAsync);
+        }
+        safeCancelSynchronousPullResponses();
+        return true;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        return pendingExchanges.get();
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // noop
     }
 
     private void safeCancelSynchronousPullResponses() {
@@ -170,6 +198,14 @@ public class GooglePubsubConsumer extends DefaultConsumer {
         return resolvedMaxDeliveryAttempts;
     }
 
+    public void incrementPendingExchanges() {
+        pendingExchanges.incrementAndGet();
+    }
+
+    public void decrementPendingExchanges() {
+        pendingExchanges.decrementAndGet();
+    }
+
     private class SubscriberWrapper implements Runnable {
 
         private final String subscriptionName;
@@ -333,24 +369,30 @@ public class GooglePubsubConsumer extends DefaultConsumer 
{
                             exchange.getIn().setHeader(pubSubHeader, value);
                         }
 
+                        pendingExchanges.incrementAndGet();
                         try {
-                            processor.process(exchange);
-                        } catch (Exception e) {
-                            exchange.setException(e);
-                        }
+                            try {
+                                processor.process(exchange);
+                            } catch (Exception e) {
+                                exchange.setException(e);
+                            }
 
-                        // Handle exception if one occurred
-                        if (exchange.getException() != null) {
-                            getExceptionHandler().handleException("Error 
processing exchange", exchange,
-                                    exchange.getException());
-                        }
+                            // Handle exception if one occurred
+                            if (exchange.getException() != null) {
+                                getExceptionHandler().handleException("Error 
processing exchange", exchange,
+                                        exchange.getException());
+                            }
 
-                        // Execute synchronization callbacks (ACK/NACK) based 
on exchange status
-                        // This is required because we are directly calling 
processor.process() outside of the normal
-                        // Camel routing engine, so we must manually trigger 
the OnCompletion callbacks
-                        if (endpoint.getAckMode() != 
GooglePubsubConstants.AckMode.NONE) {
-                            List<Synchronization> synchronizations = 
exchange.getExchangeExtension().handoverCompletions();
-                            UnitOfWorkHelper.doneSynchronizations(exchange, 
synchronizations);
+                            // Execute synchronization callbacks (ACK/NACK) 
based on exchange status
+                            // This is required because we are directly 
calling processor.process() outside of the normal
+                            // Camel routing engine, so we must manually 
trigger the OnCompletion callbacks
+                            if (endpoint.getAckMode() != 
GooglePubsubConstants.AckMode.NONE) {
+                                List<Synchronization> synchronizations
+                                        = 
exchange.getExchangeExtension().handoverCompletions();
+                                
UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations);
+                            }
+                        } finally {
+                            pendingExchanges.decrementAndGet();
                         }
                     }
                 } catch (CancellationException e) {
diff --git 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index c2240b31c24b..eab17b95cc86 100644
--- 
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++ 
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -101,23 +101,29 @@ public class CamelMessageReceiver implements 
MessageReceiver {
             exchange.getIn().setHeader(pubSubHeader, value);
         }
 
+        consumer.incrementPendingExchanges();
         try {
-            processor.process(exchange);
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
+            try {
+                processor.process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
 
-        // Handle exception if one occurred
-        if (exchange.getException() != null) {
-            consumer.getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-        }
+            // Handle exception if one occurred
+            if (exchange.getException() != null) {
+                consumer.getExceptionHandler().handleException("Error 
processing exchange", exchange,
+                        exchange.getException());
+            }
 
-        // Execute synchronization callbacks (ACK/NACK) based on exchange 
status
-        // This is required because we are directly calling 
processor.process() outside of the normal
-        // Camel routing engine, so we must manually trigger the OnCompletion 
callbacks
-        if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
-            List<Synchronization> synchronizations = 
exchange.getExchangeExtension().handoverCompletions();
-            UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations);
+            // Execute synchronization callbacks (ACK/NACK) based on exchange 
status
+            // This is required because we are directly calling 
processor.process() outside of the normal
+            // Camel routing engine, so we must manually trigger the 
OnCompletion callbacks
+            if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
+                List<Synchronization> synchronizations = 
exchange.getExchangeExtension().handoverCompletions();
+                UnitOfWorkHelper.doneSynchronizations(exchange, 
synchronizations);
+            }
+        } finally {
+            consumer.decrementPendingExchanges();
         }
     }
 
diff --git 
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java
 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java
new file mode 100644
index 000000000000..ad2e16abc99d
--- /dev/null
+++ 
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.spi.ExchangeFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GooglePubsubConsumerShutdownTest {
+
+    private final GooglePubsubEndpoint endpoint = mock();
+    private final Processor processor = mock();
+    private final CamelContext context = mock();
+    private final ExtendedCamelContext ecc = mock();
+    private final ExchangeFactory ef = mock();
+
+    @BeforeEach
+    void setUp() {
+        when(endpoint.getCamelContext()).thenReturn(context);
+        when(context.getCamelContextExtension()).thenReturn(ecc);
+        when(ecc.getExchangeFactory()).thenReturn(ef);
+        when(ef.newExchangeFactory(any())).thenReturn(ef);
+    }
+
+    @Test
+    void deferShutdownReturnsTrue() {
+        GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint, 
processor);
+
+        boolean deferred = 
consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks);
+
+        assertTrue(deferred);
+    }
+
+    @Test
+    void pendingExchangesSizeIsZeroInitially() {
+        GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint, 
processor);
+
+        assertEquals(0, consumer.getPendingExchangesSize());
+    }
+
+    @Test
+    void pendingExchangesSizeTracksInflightMessages() {
+        GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint, 
processor);
+
+        assertEquals(0, consumer.getPendingExchangesSize());
+
+        consumer.incrementPendingExchanges();
+        assertEquals(1, consumer.getPendingExchangesSize());
+
+        consumer.incrementPendingExchanges();
+        assertEquals(2, consumer.getPendingExchangesSize());
+
+        consumer.decrementPendingExchanges();
+        assertEquals(1, consumer.getPendingExchangesSize());
+
+        consumer.decrementPendingExchanges();
+        assertEquals(0, consumer.getPendingExchangesSize());
+    }
+}

Reply via email to