Alex created CAMEL-23260:
----------------------------

             Summary: ServiceBusComponent: RejectedExecutionException during 
graceful shutdown after successful exchange completion
                 Key: CAMEL-23260
                 URL: https://issues.apache.org/jira/browse/CAMEL-23260
             Project: Camel
          Issue Type: Bug
          Components: camel-azure-servicebus
    Affects Versions: 4.17.0
         Environment: - Camel version: 4.17.0
- Spring Boot 3.5.11 with camel-spring-boot
- Azure Service Bus
- Kubernetes (AKS), pods receive SIGTERM on rolling deployment / Local 
environment using InteliJ with ASB emulator container and kill -15 <PID>
            Reporter: Alex


When a service receives SIGTERM while a message is actively being processed, 
the following errors appear in logs after the message is processed successfully:
{code:java}
2026-03-27 13:39:22,907 traceId=- spanId=- user=- [app] WARN  [receiverPump-4] 
org.apache.camel.component.azure.servicebus.ServiceBusConsumer: Error during 
processing exchange.. Exchange[6AC93CB3F0229D8-0000000000000001]. Caused by: 
[java.util.concurrent.RejectedExecutionException - 
null]java.util.concurrent.RejectedExecutionException        at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:711)
   at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
  at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
 at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
       at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
   at org.apache.camel.processor.Pipeline.process(Pipeline.java:162)       at 
org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:377)
    at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:353)
 at 
org.apache.camel.component.azure.servicebus.ServiceBusConsumer.processMessage(ServiceBusConsumer.java:85)
    at 
com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163)  
     at 
com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$0(MessagePump.java:148)
      at 
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176)
     at 
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106)
        at 
com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141)  
     at 
com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226)
  at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)       at 
reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)       at 
reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
 at 
reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)  
     at 
reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4576)        at 
reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)  at 
com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69)
  at 
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99)
 at 
com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39)
 at 
com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68)
   at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)     
   at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)     
   at 
com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:481)
    at 
com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:410)
        at 
com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:890)
  at 
com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:734)
  at 
reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:446)
    at 
reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:533)
 at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)   at 
reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)   at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)   at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
     at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)2026-03-27 13:39:22,910 
traceId=- spanId=- user=- [app] ERROR [receiverPump-4] 
com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient: Cannot perform 
operation 'abandoned' on a disposed receiver.2026-03-27 13:39:22,910 traceId=- 
spanId=- user=- [app] WARN  [receiverPump-4] 
org.apache.camel.support.UnitOfWorkHelper: Exception occurred during 
onCompletion. This exception will be ignored.java.lang.IllegalStateException: 
Cannot perform operation 'abandoned' on a disposed receiver.      at 
com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.updateDisposition(ServiceBusReceiverAsyncClient.java:1528)
      at 
com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.abandon(ServiceBusReceiverAsyncClient.java:509)
 at 
com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.abandon(ServiceBusReceivedMessageContext.java:79)
    at 
org.apache.camel.component.azure.servicebus.ServiceBusConsumer$ConsumerOnCompletion.onFailure(ServiceBusConsumer.java:202)
   at 
org.apache.camel.support.UnitOfWorkHelper.doneSynchronization(UnitOfWorkHelper.java:85)
      at 
org.apache.camel.support.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:77)
     at 
org.apache.camel.impl.engine.DefaultUnitOfWork.done(DefaultUnitOfWork.java:269) 
     at 
org.apache.camel.support.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:53)  at 
org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:959)
 at 
org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:896)
 at 
org.apache.camel.impl.engine.AdviceIterator.runAfterTask(AdviceIterator.java:45)
     at 
org.apache.camel.impl.engine.AdviceIterator.runAfterTasks(AdviceIterator.java:39)
    at 
org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:255)
     at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44)    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
  at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
    at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
 at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
       at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
   at org.apache.camel.processor.Pipeline.process(Pipeline.java:162)       at 
org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:377)
    at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:353)
 at 
org.apache.camel.component.azure.servicebus.ServiceBusConsumer.processMessage(ServiceBusConsumer.java:85)
    at 
com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163)  
     at 
com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$0(MessagePump.java:148)
      at 
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176)
     at 
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106)
        at 
com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141)  
     at 
com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226)
  at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)       at 
reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)       at 
reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
 at 
reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)  
     at 
reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4576)        at 
reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)  at 
com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69)
  at 
com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99)
 at 
com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39)
 at 
com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68)
   at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)     
   at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)     
   at 
com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:481)
    at 
com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:410)
        at 
com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:890)
  at 
com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:734)
 {code}
Also in logs we can see that camel correctly detected that there is one 
inflight message and started it's countdown for graceful shutdown timeout
{code:java}
inflight and pending exchanges to complete, timeout in 31 seconds. Inflights 
per route: [route-transaction-from-queue-process = 1]
2026-03-27 13:39:17,393 traceId=- spanId=- user=- [app] INFO  [Camel (camel-1) 
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy: 
Waiting as there are still 1 inflight and pending exchanges to complete, 
timeout in 30 seconds. Inflights per route: 
[route-transaction-from-queue-process = 1]
2026-03-27 13:39:18,399 traceId=- spanId=- user=- [app] INFO  [Camel (camel-1) 
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy: 
Waiting as there are still 1 inflight and pending exchanges to complete, 
timeout in 29 seconds. Inflights per route: 
[route-transaction-from-queue-process = 1]
2026-03-27 13:39:19,402 traceId=- spanId=- user=- [app] INFO  [Camel (camel-1) 
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy: 
Waiting as there are still 1 inflight and pending exchanges to complete, 
timeout in 28 seconds. Inflights per route: 
[route-transaction-from-queue-process = 1]
2026-03-27 13:39:20,408 traceId=- spanId=- user=- [app] INFO  [Camel (camel-1) 
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy: 
Waiting as there are still 1 inflight and pending exchanges to complete, 
timeout in 27 seconds. Inflights per route: 
[route-transaction-from-queue-process = 1]
2026-03-27 13:39:21,413 traceId=- spanId=- user=- [app] INFO  [Camel (camel-1) 
thread #1 - ShutdownTask] org.apache.camel.impl.engine.DefaultShutdownStrategy: 
Waiting as there are still 1 inflight and pending exchanges to complete, 
timeout in 26 seconds. Inflights per route: 
[route-transaction-from-queue-process = 1] {code}
The message was processed successfully — the processor finished successfully, 
changes to database were comitted successfully, the downstream topic was 
published — but the ACK never reached ASB. The message is left unacknowledged 
and redelivered, causing duplicate processing.

 

Assumption is that there is some issue with the shutdown orchestration of 
DefaultShutdownStrategy, which shuts down underlying ASB clients as soon as 
SIGTERM is received while it correctly detects that there are inflight messages 
and waits for all of them to finish. However, when they finish, since 
underlying ASB layer is already shut down, it can't send ACK to the ASB 
service, causing message to be redelivered once message lock expires. 

 

We were expecting that once SIGTERM signal is received, it should only prevent 
consumption of new messages, not block in-flight exchanges from completing 
their pipeline.

 

Reproduction steps:
 # Ensure that spring application has graceful shutdown configured by setting **
*server.shutdown=graceful* property
 # Send message to ASB queue
 # Ensure that processing of that message takes some time in camel processor, 
in our case we used Thread.sleep(30000) to simulate longer processing time
 # While message is processing, issue SIGTERM signal to the application, *kill 
-15 <PID>*
 # You should see that Camel started graceful shutdown process and it detected 
in flight message, it will start decreasing its shutdown timer which is by 
default 45s
 # Once message is processed (After 30s in this case), you should see above 
mentioned exception logged



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to