[
https://issues.apache.org/jira/browse/CAMEL-23260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Federico Mariani resolved CAMEL-23260.
--------------------------------------
Resolution: Fixed
> ServiceBusComponent: RejectedExecutionException during graceful shutdown
> after successful exchange completion
> -------------------------------------------------------------------------------------------------------------
>
> Key: CAMEL-23260
> URL: https://issues.apache.org/jira/browse/CAMEL-23260
> Project: Camel
> Issue Type: Bug
> Components: camel-azure-servicebus
> Affects Versions: 4.17.0
> Environment: - Camel version: 4.17.0
> - Spring Boot 3.5.11 with camel-spring-boot
> - Azure Service Bus
> - Kubernetes (AKS), pods receive SIGTERM on rolling deployment / Local
> environment using InteliJ with ASB emulator container and kill -15 <PID>
> Reporter: Alex
> Assignee: Federico Mariani
> Priority: Major
> Fix For: 4.18.2, 4.19.0
>
>
> When a service receives SIGTERM while a message is actively being processed,
> the following errors appear in logs after the message is processed
> successfully:
> {code:java}
> 2026-03-27 13:39:22,907 traceId=- spanId=- user=- [app] WARN
> [receiverPump-4]
> org.apache.camel.component.azure.servicebus.ServiceBusConsumer: Error during
> processing exchange.. Exchange[6AC93CB3F0229D8-0000000000000001]. Caused by:
> [java.util.concurrent.RejectedExecutionException -
> null]java.util.concurrent.RejectedExecutionException at
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:711)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:162) at
> org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:377)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:353)
> at
> org.apache.camel.component.azure.servicebus.ServiceBusConsumer.processMessage(ServiceBusConsumer.java:85)
> at
> com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163)
> at
> com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$0(MessagePump.java:148)
> at
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176)
> at
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106)
> at
> com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141)
> at
> com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226)
> at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) at
> reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) at
> reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
> at
> reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
> at
> reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
> at reactor.core.publisher.Mono.subscribe(Mono.java:4576) at
> reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)
> at
> com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69)
> at
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99)
> at
> com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39)
> at
> com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68)
> at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
> at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
> at
> com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:481)
> at
> com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:410)
> at
> com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:890)
> at
> com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:734)
> at
> reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:446)
> at
> reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:533)
> at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) at
> reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> at java.base/java.lang.Thread.run(Thread.java:1583)2026-03-27
> 13:39:22,910 traceId=- spanId=- user=- [app] ERROR [receiverPump-4]
> com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient: Cannot perform
> operation 'abandoned' on a disposed receiver.2026-03-27 13:39:22,910
> traceId=- spanId=- user=- [app] WARN [receiverPump-4]
> org.apache.camel.support.UnitOfWorkHelper: Exception occurred during
> onCompletion. This exception will be ignored.java.lang.IllegalStateException:
> Cannot perform operation 'abandoned' on a disposed receiver. at
> com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.updateDisposition(ServiceBusReceiverAsyncClient.java:1528)
> at
> com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.abandon(ServiceBusReceiverAsyncClient.java:509)
> at
> com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.abandon(ServiceBusReceivedMessageContext.java:79)
> at
> org.apache.camel.component.azure.servicebus.ServiceBusConsumer$ConsumerOnCompletion.onFailure(ServiceBusConsumer.java:202)
> at
> org.apache.camel.support.UnitOfWorkHelper.doneSynchronization(UnitOfWorkHelper.java:85)
> at
> org.apache.camel.support.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:77)
> at
> org.apache.camel.impl.engine.DefaultUnitOfWork.done(DefaultUnitOfWork.java:269)
> at
> org.apache.camel.support.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:53)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:959)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:896)
> at
> org.apache.camel.impl.engine.AdviceIterator.runAfterTask(AdviceIterator.java:45)
> at
> org.apache.camel.impl.engine.AdviceIterator.runAfterTasks(AdviceIterator.java:39)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:255)
> at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44) at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:162) at
> org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:377)
> at
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:353)
> at
> org.apache.camel.component.azure.servicebus.ServiceBusConsumer.processMessage(ServiceBusConsumer.java:85)
> at
> com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163)
> at
> com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$0(MessagePump.java:148)
> at
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176)
> at
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106)
> at
> com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141)
> at
> com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226)
> at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73) at
> reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32) at
> reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
> at
> reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
> at
> reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
> at reactor.core.publisher.Mono.subscribe(Mono.java:4576) at
> reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)
> at
> com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69)
> at
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99)
> at
> com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39)
> at
> com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68)
> at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
> at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
> at
> com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:481)
> at
> com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:410)
> at
> com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:890)
> at
> com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:734)
> {code}
> Also in logs we can see that camel correctly detected that there is one
> inflight message and started it's countdown for graceful shutdown timeout
> {code:java}
> inflight and pending exchanges to complete, timeout in 31 seconds. Inflights
> per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:17,393 traceId=- spanId=- user=- [app] INFO [Camel
> (camel-1) thread #1 - ShutdownTask]
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 30 seconds.
> Inflights per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:18,399 traceId=- spanId=- user=- [app] INFO [Camel
> (camel-1) thread #1 - ShutdownTask]
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 29 seconds.
> Inflights per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:19,402 traceId=- spanId=- user=- [app] INFO [Camel
> (camel-1) thread #1 - ShutdownTask]
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 28 seconds.
> Inflights per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:20,408 traceId=- spanId=- user=- [app] INFO [Camel
> (camel-1) thread #1 - ShutdownTask]
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 27 seconds.
> Inflights per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:21,413 traceId=- spanId=- user=- [app] INFO [Camel
> (camel-1) thread #1 - ShutdownTask]
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 26 seconds.
> Inflights per route: [route-transaction-from-queue-process = 1] {code}
> The message was processed successfully — the processor finished successfully,
> changes to database were comitted successfully, the downstream topic was
> published — but the ACK never reached ASB. The message is left unacknowledged
> and redelivered, causing duplicate processing.
>
> Assumption is that there is some issue with the shutdown orchestration of
> DefaultShutdownStrategy, which shuts down underlying ASB clients as soon as
> SIGTERM is received while it correctly detects that there are inflight
> messages and waits for all of them to finish. However, when they finish,
> since underlying ASB layer is already shut down, it can't send ACK to the ASB
> service, causing message to be redelivered once message lock expires.
>
> We were expecting that once SIGTERM signal is received, it should only
> prevent consumption of new messages, not block in-flight exchanges from
> completing their pipeline.
>
> Reproduction steps:
> # Ensure that spring application has graceful shutdown configured by setting
> **
> *server.shutdown=graceful* property
> # Send message to ASB queue
> # Ensure that processing of that message takes some time in camel processor,
> in our case we used Thread.sleep(30000) to simulate longer processing time
> # While message is processing, issue SIGTERM signal to the application,
> *kill -15 <PID>*
> # You should see that Camel started graceful shutdown process and it
> detected in flight message, it will start decreasing its shutdown timer which
> is by default 45s
> # Once message is processed (After 30s in this case), you should see above
> mentioned exception logged
--
This message was sent by Atlassian Jira
(v8.20.10#820010)