Alex created CAMEL-23260:
----------------------------
Summary: ServiceBusComponent: RejectedExecutionException during
graceful shutdown after successful exchange completion
Key: CAMEL-23260
URL: https://issues.apache.org/jira/browse/CAMEL-23260
Project: Camel
Issue Type: Bug
Components: camel-azure-servicebus
Affects Versions: 4.17.0
Environment: - Camel version: 4.17.0
- Spring Boot 3.5.11 with camel-spring-boot
- Azure Service Bus
- Kubernetes (AKS), pods receive SIGTERM on rolling deployment / Local
environment using InteliJ with ASB emulator container and kill -15 <PID>
Reporter: Alex
When a service receives SIGTERM while a message is actively being processed,
the following errors appear in logs after the message is processed successfully:
{code:java}
2026-03-27 13:39:22,907 traceId=- spanId=- user=- [app] WARN [receiverPump-4]
org.apache.camel.component.azure.servicebus.ServiceBusConsumer: Error during
processing exchange.. Exchange[6AC93CB3F0229D8-0000000000000001]. Caused by:
[java.util.concurrent.RejectedExecutionException -
null]java.util.concurrent.RejectedExecutionException at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:711)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:162) at
org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:377)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:353)
at
org.apache.camel.component.azure.servicebus.ServiceBusConsumer.processMessage(ServiceBusConsumer.java:85)
at
com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163)
at
com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$0(MessagePump.java:148)
at
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176)
at
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106)
at
com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141)
at
com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226)
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) at
reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) at
reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
at
reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
at
reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576) at
reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430) at
com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69)
at
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99)
at
com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39)
at
com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at
com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:481)
at
com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:410)
at
com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:890)
at
com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:734)
at
reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:446)
at
reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:533)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at
reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)2026-03-27 13:39:22,910
traceId=- spanId=- user=- [app] ERROR [receiverPump-4]
com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient: Cannot perform
operation 'abandoned' on a disposed receiver.2026-03-27 13:39:22,910 traceId=-
spanId=- user=- [app] WARN [receiverPump-4]
org.apache.camel.support.UnitOfWorkHelper: Exception occurred during
onCompletion. This exception will be ignored.java.lang.IllegalStateException:
Cannot perform operation 'abandoned' on a disposed receiver. at
com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.updateDisposition(ServiceBusReceiverAsyncClient.java:1528)
at
com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.abandon(ServiceBusReceiverAsyncClient.java:509)
at
com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.abandon(ServiceBusReceivedMessageContext.java:79)
at
org.apache.camel.component.azure.servicebus.ServiceBusConsumer$ConsumerOnCompletion.onFailure(ServiceBusConsumer.java:202)
at
org.apache.camel.support.UnitOfWorkHelper.doneSynchronization(UnitOfWorkHelper.java:85)
at
org.apache.camel.support.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:77)
at
org.apache.camel.impl.engine.DefaultUnitOfWork.done(DefaultUnitOfWork.java:269)
at
org.apache.camel.support.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:53) at
org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:959)
at
org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:896)
at
org.apache.camel.impl.engine.AdviceIterator.runAfterTask(AdviceIterator.java:45)
at
org.apache.camel.impl.engine.AdviceIterator.runAfterTasks(AdviceIterator.java:39)
at
org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:255)
at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44) at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:162) at
org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:377)
at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:353)
at
org.apache.camel.component.azure.servicebus.ServiceBusConsumer.processMessage(ServiceBusConsumer.java:85)
at
com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163)
at
com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$0(MessagePump.java:148)
at
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176)
at
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106)
at
com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141)
at
com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226)
at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) at
reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) at
reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
at
reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
at
reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576) at
reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430) at
com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69)
at
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99)
at
com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39)
at
com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at
com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:481)
at
com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:410)
at
com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:890)
at
com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:734)
{code}
Also in logs we can see that camel correctly detected that there is one
inflight message and started it's countdown for graceful shutdown timeout
{code:java}
inflight and pending exchanges to complete, timeout in 31 seconds. Inflights
per route: [route-transaction-from-queue-process = 1]
2026-03-27 13:39:17,393 traceId=- spanId=- user=- [app] INFO [Camel (camel-1)
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy:
Waiting as there are still 1 inflight and pending exchanges to complete,
timeout in 30 seconds. Inflights per route:
[route-transaction-from-queue-process = 1]
2026-03-27 13:39:18,399 traceId=- spanId=- user=- [app] INFO [Camel (camel-1)
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy:
Waiting as there are still 1 inflight and pending exchanges to complete,
timeout in 29 seconds. Inflights per route:
[route-transaction-from-queue-process = 1]
2026-03-27 13:39:19,402 traceId=- spanId=- user=- [app] INFO [Camel (camel-1)
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy:
Waiting as there are still 1 inflight and pending exchanges to complete,
timeout in 28 seconds. Inflights per route:
[route-transaction-from-queue-process = 1]
2026-03-27 13:39:20,408 traceId=- spanId=- user=- [app] INFO [Camel (camel-1)
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy:
Waiting as there are still 1 inflight and pending exchanges to complete,
timeout in 27 seconds. Inflights per route:
[route-transaction-from-queue-process = 1]
2026-03-27 13:39:21,413 traceId=- spanId=- user=- [app] INFO [Camel (camel-1)
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy:
Waiting as there are still 1 inflight and pending exchanges to complete,
timeout in 26 seconds. Inflights per route:
[route-transaction-from-queue-process = 1] {code}
The message was processed successfully — the processor finished successfully,
changes to database were comitted successfully, the downstream topic was
published — but the ACK never reached ASB. The message is left unacknowledged
and redelivered, causing duplicate processing.
Assumption is that there is some issue with the shutdown orchestration of
DefaultShutdownStrategy, which shuts down underlying ASB clients as soon as
SIGTERM is received while it correctly detects that there are inflight messages
and waits for all of them to finish. However, when they finish, since
underlying ASB layer is already shut down, it can't send ACK to the ASB
service, causing message to be redelivered once message lock expires.
We were expecting that once SIGTERM signal is received, it should only prevent
consumption of new messages, not block in-flight exchanges from completing
their pipeline.
Reproduction steps:
# Ensure that spring application has graceful shutdown configured by setting **
*server.shutdown=graceful* property
# Send message to ASB queue
# Ensure that processing of that message takes some time in camel processor,
in our case we used Thread.sleep(30000) to simulate longer processing time
# While message is processing, issue SIGTERM signal to the application, *kill
-15 <PID>*
# You should see that Camel started graceful shutdown process and it detected
in flight message, it will start decreasing its shutdown timer which is by
default 45s
# Once message is processed (After 30s in this case), you should see above
mentioned exception logged
--
This message was sent by Atlassian Jira
(v8.20.10#820010)