[ 
https://issues.apache.org/jira/browse/CAMEL-23260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Mariani reassigned CAMEL-23260:
----------------------------------------

    Assignee: Federico Mariani

> ServiceBusComponent: RejectedExecutionException during graceful shutdown 
> after successful exchange completion
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-23260
>                 URL: https://issues.apache.org/jira/browse/CAMEL-23260
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-azure-servicebus
>    Affects Versions: 4.17.0
>         Environment: - Camel version: 4.17.0
> - Spring Boot 3.5.11 with camel-spring-boot
> - Azure Service Bus
> - Kubernetes (AKS), pods receive SIGTERM on rolling deployment / Local 
> environment using InteliJ with ASB emulator container and kill -15 <PID>
>            Reporter: Alex
>            Assignee: Federico Mariani
>            Priority: Major
>
> When a service receives SIGTERM while a message is actively being processed, 
> the following errors appear in logs after the message is processed 
> successfully:
> {code:java}
> 2026-03-27 13:39:22,907 traceId=- spanId=- user=- [app] WARN  
> [receiverPump-4] 
> org.apache.camel.component.azure.servicebus.ServiceBusConsumer: Error during 
> processing exchange.. Exchange[6AC93CB3F0229D8-0000000000000001]. Caused by: 
> [java.util.concurrent.RejectedExecutionException - 
> null]java.util.concurrent.RejectedExecutionException      at 
> org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$RedeliveryTask.run(RedeliveryErrorHandler.java:711)
>    at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
>   at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
>  at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
>        at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
>    at org.apache.camel.processor.Pipeline.process(Pipeline.java:162)       at 
> org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:377)
>     at 
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:353)
>  at 
> org.apache.camel.component.azure.servicebus.ServiceBusConsumer.processMessage(ServiceBusConsumer.java:85)
>     at 
> com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163)
>        at 
> com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$0(MessagePump.java:148)
>       at 
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176)
>      at 
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106)
>         at 
> com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141)
>        at 
> com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226)
>   at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)       at 
> reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)       at 
> reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
>  at 
> reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
>        at 
> reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
>     at reactor.core.publisher.Mono.subscribe(Mono.java:4576)        at 
> reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)  
> at 
> com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69)
>   at 
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99)
>  at 
> com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39)
>  at 
> com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68)
>    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)   
>      at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) 
>        at 
> com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:481)
>     at 
> com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:410)
>         at 
> com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:890)
>   at 
> com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:734)
>   at 
> reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:446)
>     at 
> reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:533)
>  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)   at 
> reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)   at 
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)   at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>      at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>     at java.base/java.lang.Thread.run(Thread.java:1583)2026-03-27 
> 13:39:22,910 traceId=- spanId=- user=- [app] ERROR [receiverPump-4] 
> com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient: Cannot perform 
> operation 'abandoned' on a disposed receiver.2026-03-27 13:39:22,910 
> traceId=- spanId=- user=- [app] WARN  [receiverPump-4] 
> org.apache.camel.support.UnitOfWorkHelper: Exception occurred during 
> onCompletion. This exception will be ignored.java.lang.IllegalStateException: 
> Cannot perform operation 'abandoned' on a disposed receiver.      at 
> com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.updateDisposition(ServiceBusReceiverAsyncClient.java:1528)
>       at 
> com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient.abandon(ServiceBusReceiverAsyncClient.java:509)
>  at 
> com.azure.messaging.servicebus.ServiceBusReceivedMessageContext.abandon(ServiceBusReceivedMessageContext.java:79)
>     at 
> org.apache.camel.component.azure.servicebus.ServiceBusConsumer$ConsumerOnCompletion.onFailure(ServiceBusConsumer.java:202)
>    at 
> org.apache.camel.support.UnitOfWorkHelper.doneSynchronization(UnitOfWorkHelper.java:85)
>       at 
> org.apache.camel.support.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:77)
>      at 
> org.apache.camel.impl.engine.DefaultUnitOfWork.done(DefaultUnitOfWork.java:269)
>       at 
> org.apache.camel.support.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:53)  
> at 
> org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:959)
>  at 
> org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:896)
>  at 
> org.apache.camel.impl.engine.AdviceIterator.runAfterTask(AdviceIterator.java:45)
>      at 
> org.apache.camel.impl.engine.AdviceIterator.runAfterTasks(AdviceIterator.java:39)
>     at 
> org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:255)
>      at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44)    at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.doRun(DefaultReactiveExecutor.java:199)
>   at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:189)
>     at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.tryExecuteReactiveWork(DefaultReactiveExecutor.java:166)
>  at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:148)
>        at 
> org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
>    at org.apache.camel.processor.Pipeline.process(Pipeline.java:162)       at 
> org.apache.camel.impl.engine.CamelInternalProcessor.processNonTransacted(CamelInternalProcessor.java:377)
>     at 
> org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:353)
>  at 
> org.apache.camel.component.azure.servicebus.ServiceBusConsumer.processMessage(ServiceBusConsumer.java:85)
>     at 
> com.azure.messaging.servicebus.MessagePump.notifyMessage(MessagePump.java:163)
>        at 
> com.azure.messaging.servicebus.MessagePump.lambda$handleMessage$0(MessagePump.java:148)
>       at 
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.wrap(ServiceBusReceiverInstrumentation.java:176)
>      at 
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:106)
>         at 
> com.azure.messaging.servicebus.MessagePump.handleMessage(MessagePump.java:141)
>        at 
> com.azure.messaging.servicebus.MessagePump$RunOnWorker.lambda$apply$0(MessagePump.java:226)
>   at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)       at 
> reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)       at 
> reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:228)
>  at 
> reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:52)
>        at 
> reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
>     at reactor.core.publisher.Mono.subscribe(Mono.java:4576)        at 
> reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)  
> at 
> com.azure.messaging.servicebus.TracingFluxOperator.lambda$hookOnNext$0(TracingFluxOperator.java:69)
>   at 
> com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation.instrumentProcess(ServiceBusReceiverInstrumentation.java:99)
>  at 
> com.azure.messaging.servicebus.TracingFluxOperator$1.lambda$subscribe$0(TracingFluxOperator.java:39)
>  at 
> com.azure.messaging.servicebus.TracingFluxOperator.hookOnNext(TracingFluxOperator.java:68)
>    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)   
>      at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) 
>        at 
> com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drainLoop(MessageFlux.java:481)
>     at 
> com.azure.core.amqp.implementation.MessageFlux$RecoverableReactorReceiver.drain(MessageFlux.java:410)
>         at 
> com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:890)
>   at 
> com.azure.core.amqp.implementation.MessageFlux$ReactorReceiverMediator.onNext(MessageFlux.java:734)
>  {code}
> Also in logs we can see that camel correctly detected that there is one 
> inflight message and started it's countdown for graceful shutdown timeout
> {code:java}
> inflight and pending exchanges to complete, timeout in 31 seconds. Inflights 
> per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:17,393 traceId=- spanId=- user=- [app] INFO  [Camel 
> (camel-1) thread #1 - ShutdownTask] 
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are 
> still 1 inflight and pending exchanges to complete, timeout in 30 seconds. 
> Inflights per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:18,399 traceId=- spanId=- user=- [app] INFO  [Camel 
> (camel-1) thread #1 - ShutdownTask] 
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are 
> still 1 inflight and pending exchanges to complete, timeout in 29 seconds. 
> Inflights per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:19,402 traceId=- spanId=- user=- [app] INFO  [Camel 
> (camel-1) thread #1 - ShutdownTask] 
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are 
> still 1 inflight and pending exchanges to complete, timeout in 28 seconds. 
> Inflights per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:20,408 traceId=- spanId=- user=- [app] INFO  [Camel 
> (camel-1) thread #1 - ShutdownTask] 
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are 
> still 1 inflight and pending exchanges to complete, timeout in 27 seconds. 
> Inflights per route: [route-transaction-from-queue-process = 1]
> 2026-03-27 13:39:21,413 traceId=- spanId=- user=- [app] INFO  [Camel 
> (camel-1) thread #1 - ShutdownTask] 
> org.apache.camel.impl.engine.DefaultShutdownStrategy: Waiting as there are 
> still 1 inflight and pending exchanges to complete, timeout in 26 seconds. 
> Inflights per route: [route-transaction-from-queue-process = 1] {code}
> The message was processed successfully — the processor finished successfully, 
> changes to database were comitted successfully, the downstream topic was 
> published — but the ACK never reached ASB. The message is left unacknowledged 
> and redelivered, causing duplicate processing.
>  
> Assumption is that there is some issue with the shutdown orchestration of 
> DefaultShutdownStrategy, which shuts down underlying ASB clients as soon as 
> SIGTERM is received while it correctly detects that there are inflight 
> messages and waits for all of them to finish. However, when they finish, 
> since underlying ASB layer is already shut down, it can't send ACK to the ASB 
> service, causing message to be redelivered once message lock expires. 
>  
> We were expecting that once SIGTERM signal is received, it should only 
> prevent consumption of new messages, not block in-flight exchanges from 
> completing their pipeline.
>  
> Reproduction steps:
>  # Ensure that spring application has graceful shutdown configured by setting 
> **
> *server.shutdown=graceful* property
>  # Send message to ASB queue
>  # Ensure that processing of that message takes some time in camel processor, 
> in our case we used Thread.sleep(30000) to simulate longer processing time
>  # While message is processing, issue SIGTERM signal to the application, 
> *kill -15 <PID>*
>  # You should see that Camel started graceful shutdown process and it 
> detected in flight message, it will start decreasing its shutdown timer which 
> is by default 45s
>  # Once message is processed (After 30s in this case), you should see above 
> mentioned exception logged



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to