This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch fix/CAMEL-23291 in repository https://gitbox.apache.org/repos/asf/camel.git
commit ae66f915908c9a88be442c525a243222fb60e2db Author: Claus Ibsen <[email protected]> AuthorDate: Sat Jun 6 10:48:58 2026 +0200 CAMEL-23291: Add ShutdownAware support to EventHubs and Google PubSub consumers Co-Authored-By: Claude <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../azure/eventhubs/EventHubsConsumer.java | 58 +++++++-- .../eventhubs/EventHubsConsumerShutdownTest.java | 145 +++++++++++++++++++++ .../google/pubsub/GooglePubsubConsumer.java | 76 ++++++++--- .../pubsub/consumer/CamelMessageReceiver.java | 34 +++-- .../pubsub/GooglePubsubConsumerShutdownTest.java | 83 ++++++++++++ 5 files changed, 355 insertions(+), 41 deletions(-) diff --git a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java index 58dc787dd88c..f465326fbe86 100644 --- a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java +++ b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java @@ -28,7 +28,9 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.ShutdownRunningTask; import org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory; +import org.apache.camel.spi.ShutdownAware; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultConsumer; import org.slf4j.Logger; @@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory; import static org.apache.camel.component.azure.eventhubs.EventHubsConstants.COMPLETED_BY_SIZE; import static org.apache.camel.component.azure.eventhubs.EventHubsConstants.COMPLETED_BY_TIMEOUT; -public class EventHubsConsumer extends DefaultConsumer { +public class EventHubsConsumer extends DefaultConsumer implements ShutdownAware { private static final Logger LOG = LoggerFactory.getLogger(EventHubsConsumer.class); @@ -45,6 +47,7 @@ public class EventHubsConsumer extends DefaultConsumer { private EventProcessorClient processorClient; private final AtomicInteger processedEvents; + private final AtomicInteger pendingExchanges = new AtomicInteger(); private ScheduledExecutorService scheduledExecutorService; private ScheduledFuture<?> lastScheduledTask; private EventHubsCheckpointUpdaterTask lastTask; @@ -74,19 +77,44 @@ public class EventHubsConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { if (processorClient != null) { - // shutdown the client + // stop accepting new messages but keep the connection open + // so that in-flight exchanges can still complete processorClient.stop(); - processorClient = null; } - // shutdown scheduled executor + // shutdown camel consumer + super.doStop(); + } + + @Override + protected void doShutdown() throws Exception { + processorClient = null; + + // shutdown scheduled executor after all in-flight exchanges have completed if (scheduledExecutorService != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(scheduledExecutorService); scheduledExecutorService = null; } - // shutdown camel consumer - super.doStop(); + super.doShutdown(); + } + + @Override + public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { + if (processorClient != null) { + processorClient.stop(); + } + return true; + } + + @Override + public int getPendingExchangesSize() { + return pendingExchanges.get(); + } + + @Override + public void prepareShutdown(boolean suspendOnly, boolean forced) { + // noop } public EventHubsConfiguration getConfiguration() { @@ -133,20 +161,30 @@ public class EventHubsConsumer extends DefaultConsumer { } private void onEventListener(final EventContext eventContext) { + pendingExchanges.incrementAndGet(); + final Exchange exchange = createAzureEventHubExchange(eventContext); // add exchange callback exchange.getExchangeExtension().addOnCompletion(new Synchronization() { @Override public void onComplete(Exchange exchange) { - // we update the consumer offsets - processCommit(exchange, eventContext); + try { + // we update the consumer offsets + processCommit(exchange, eventContext); + } finally { + pendingExchanges.decrementAndGet(); + } } @Override public void onFailure(Exchange exchange) { - // we do nothing here - processRollback(exchange); + try { + // we do nothing here + processRollback(exchange); + } finally { + pendingExchanges.decrementAndGet(); + } } }); // use default consumer callback diff --git a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java new file mode 100644 index 000000000000..b52c7054ff2c --- /dev/null +++ b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.eventhubs; + +import java.lang.reflect.Field; + +import com.azure.messaging.eventhubs.EventProcessorClient; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class EventHubsConsumerShutdownTest { + + private final EventHubsEndpoint endpoint = mock(); + private final EventHubsConfiguration configuration = mock(); + private final AsyncProcessor processor = mock(); + private final CamelContext context = mock(); + private final ExtendedCamelContext ecc = mock(); + private final ExchangeFactory ef = mock(); + private final EventProcessorClient processorClient = mock(); + + @BeforeEach + void setUp() { + when(endpoint.getCamelContext()).thenReturn(context); + when(context.getCamelContextExtension()).thenReturn(ecc); + when(ecc.getExchangeFactory()).thenReturn(ef); + when(ef.newExchangeFactory(any())).thenReturn(ef); + when(ef.create(any(Endpoint.class), anyBoolean())) + .thenAnswer(invocation -> DefaultExchange.newFromEndpoint(invocation.getArgument(0))); + when(endpoint.getConfiguration()).thenReturn(configuration); + } + + @Test + void deferShutdownReturnsTrueAndStopsClient() throws Exception { + EventHubsConsumer consumer = new EventHubsConsumer(endpoint, processor); + setProcessorClient(consumer, processorClient); + + boolean deferred = consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks); + + assertTrue(deferred); + verify(processorClient).stop(); + } + + @Test + void pendingExchangesSizeIsZeroInitially() { + EventHubsConsumer consumer = new EventHubsConsumer(endpoint, processor); + + assertEquals(0, consumer.getPendingExchangesSize()); + } + + @Test + void pendingExchangesSizeTracksInflightMessages() throws Exception { + EventHubsConsumer consumer = new EventHubsConsumer(endpoint, processor); + setProcessorClient(consumer, processorClient); + + assertEquals(0, consumer.getPendingExchangesSize()); + + // simulate onEventListener by getting the completion callback and invoking it + Exchange exchange = DefaultExchange.newFromEndpoint(endpoint); + // manually increment to simulate what onEventListener does + incrementPendingExchanges(consumer); + + assertEquals(1, consumer.getPendingExchangesSize()); + + // simulate completion + decrementPendingExchanges(consumer); + + assertEquals(0, consumer.getPendingExchangesSize()); + } + + @Test + void doStopStopsClientWithoutNullingIt() throws Exception { + EventHubsConsumer consumer = new EventHubsConsumer(endpoint, processor); + setProcessorClient(consumer, processorClient); + + consumer.doStop(); + + verify(processorClient).stop(); + // processorClient should still be available for in-flight exchanges + assertEquals(processorClient, getProcessorClient(consumer)); + } + + @Test + void doShutdownNullsClient() throws Exception { + EventHubsConsumer consumer = new EventHubsConsumer(endpoint, processor); + setProcessorClient(consumer, processorClient); + + consumer.doShutdown(); + + assertEquals(null, getProcessorClient(consumer)); + } + + private static void setProcessorClient(EventHubsConsumer consumer, EventProcessorClient client) throws Exception { + Field field = EventHubsConsumer.class.getDeclaredField("processorClient"); + field.setAccessible(true); + field.set(consumer, client); + } + + private static EventProcessorClient getProcessorClient(EventHubsConsumer consumer) throws Exception { + Field field = EventHubsConsumer.class.getDeclaredField("processorClient"); + field.setAccessible(true); + return (EventProcessorClient) field.get(consumer); + } + + private static void incrementPendingExchanges(EventHubsConsumer consumer) throws Exception { + Field field = EventHubsConsumer.class.getDeclaredField("pendingExchanges"); + field.setAccessible(true); + ((java.util.concurrent.atomic.AtomicInteger) field.get(consumer)).incrementAndGet(); + } + + private static void decrementPendingExchanges(EventHubsConsumer consumer) throws Exception { + Field field = EventHubsConsumer.class.getDeclaredField("pendingExchanges"); + field.setAccessible(true); + ((java.util.concurrent.atomic.AtomicInteger) field.get(consumer)).decrementAndGet(); + } +} diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java index 3835f525d621..49cc2ebccf2c 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import com.google.api.core.AbstractApiService; import com.google.api.core.ApiFuture; @@ -43,11 +44,13 @@ import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.Subscription; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.ShutdownRunningTask; import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion; import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync; import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver; import org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge; import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.ShutdownAware; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.DefaultConsumer; import org.apache.camel.support.UnitOfWorkHelper; @@ -56,12 +59,13 @@ import org.apache.camel.support.task.budget.Budgets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GooglePubsubConsumer extends DefaultConsumer { +public class GooglePubsubConsumer extends DefaultConsumer implements ShutdownAware { private final Logger localLog; private final GooglePubsubEndpoint endpoint; private final Processor processor; + private final AtomicInteger pendingExchanges = new AtomicInteger(); private ExecutorService executor; private final List<Subscriber> subscribers; private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses; @@ -113,6 +117,11 @@ public class GooglePubsubConsumer extends DefaultConsumer { safeCancelSynchronousPullResponses(); + super.doStop(); + } + + @Override + protected void doShutdown() throws Exception { if (executor != null) { if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor); @@ -122,7 +131,26 @@ public class GooglePubsubConsumer extends DefaultConsumer { } executor = null; - super.doStop(); + super.doShutdown(); + } + + @Override + public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { + if (!subscribers.isEmpty()) { + subscribers.forEach(AbstractApiService::stopAsync); + } + safeCancelSynchronousPullResponses(); + return true; + } + + @Override + public int getPendingExchangesSize() { + return pendingExchanges.get(); + } + + @Override + public void prepareShutdown(boolean suspendOnly, boolean forced) { + // noop } private void safeCancelSynchronousPullResponses() { @@ -170,6 +198,14 @@ public class GooglePubsubConsumer extends DefaultConsumer { return resolvedMaxDeliveryAttempts; } + public void incrementPendingExchanges() { + pendingExchanges.incrementAndGet(); + } + + public void decrementPendingExchanges() { + pendingExchanges.decrementAndGet(); + } + private class SubscriberWrapper implements Runnable { private final String subscriptionName; @@ -333,24 +369,30 @@ public class GooglePubsubConsumer extends DefaultConsumer { exchange.getIn().setHeader(pubSubHeader, value); } + pendingExchanges.incrementAndGet(); try { - processor.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } + try { + processor.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } - // Handle exception if one occurred - if (exchange.getException() != null) { - getExceptionHandler().handleException("Error processing exchange", exchange, - exchange.getException()); - } + // Handle exception if one occurred + if (exchange.getException() != null) { + getExceptionHandler().handleException("Error processing exchange", exchange, + exchange.getException()); + } - // Execute synchronization callbacks (ACK/NACK) based on exchange status - // This is required because we are directly calling processor.process() outside of the normal - // Camel routing engine, so we must manually trigger the OnCompletion callbacks - if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { - List<Synchronization> synchronizations = exchange.getExchangeExtension().handoverCompletions(); - UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations); + // Execute synchronization callbacks (ACK/NACK) based on exchange status + // This is required because we are directly calling processor.process() outside of the normal + // Camel routing engine, so we must manually trigger the OnCompletion callbacks + if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { + List<Synchronization> synchronizations + = exchange.getExchangeExtension().handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations); + } + } finally { + pendingExchanges.decrementAndGet(); } } } catch (CancellationException e) { diff --git a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java index c2240b31c24b..eab17b95cc86 100644 --- a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java +++ b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java @@ -101,23 +101,29 @@ public class CamelMessageReceiver implements MessageReceiver { exchange.getIn().setHeader(pubSubHeader, value); } + consumer.incrementPendingExchanges(); try { - processor.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } + try { + processor.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } - // Handle exception if one occurred - if (exchange.getException() != null) { - consumer.getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); - } + // Handle exception if one occurred + if (exchange.getException() != null) { + consumer.getExceptionHandler().handleException("Error processing exchange", exchange, + exchange.getException()); + } - // Execute synchronization callbacks (ACK/NACK) based on exchange status - // This is required because we are directly calling processor.process() outside of the normal - // Camel routing engine, so we must manually trigger the OnCompletion callbacks - if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { - List<Synchronization> synchronizations = exchange.getExchangeExtension().handoverCompletions(); - UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations); + // Execute synchronization callbacks (ACK/NACK) based on exchange status + // This is required because we are directly calling processor.process() outside of the normal + // Camel routing engine, so we must manually trigger the OnCompletion callbacks + if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) { + List<Synchronization> synchronizations = exchange.getExchangeExtension().handoverCompletions(); + UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations); + } + } finally { + consumer.decrementPendingExchanges(); } } diff --git a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java new file mode 100644 index 000000000000..ad2e16abc99d --- /dev/null +++ b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.pubsub; + +import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.Processor; +import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.spi.ExchangeFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GooglePubsubConsumerShutdownTest { + + private final GooglePubsubEndpoint endpoint = mock(); + private final Processor processor = mock(); + private final CamelContext context = mock(); + private final ExtendedCamelContext ecc = mock(); + private final ExchangeFactory ef = mock(); + + @BeforeEach + void setUp() { + when(endpoint.getCamelContext()).thenReturn(context); + when(context.getCamelContextExtension()).thenReturn(ecc); + when(ecc.getExchangeFactory()).thenReturn(ef); + when(ef.newExchangeFactory(any())).thenReturn(ef); + } + + @Test + void deferShutdownReturnsTrue() { + GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint, processor); + + boolean deferred = consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks); + + assertTrue(deferred); + } + + @Test + void pendingExchangesSizeIsZeroInitially() { + GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint, processor); + + assertEquals(0, consumer.getPendingExchangesSize()); + } + + @Test + void pendingExchangesSizeTracksInflightMessages() { + GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint, processor); + + assertEquals(0, consumer.getPendingExchangesSize()); + + consumer.incrementPendingExchanges(); + assertEquals(1, consumer.getPendingExchangesSize()); + + consumer.incrementPendingExchanges(); + assertEquals(2, consumer.getPendingExchangesSize()); + + consumer.decrementPendingExchanges(); + assertEquals(1, consumer.getPendingExchangesSize()); + + consumer.decrementPendingExchanges(); + assertEquals(0, consumer.getPendingExchangesSize()); + } +}
