[ https://issues.apache.org/jira/browse/CAMEL-10229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piero Cangianiello updated CAMEL-10229: --------------------------------------- Description: Run the following code and hit enter while one message is in unacked state (see RabbitMQ console): {code:java} public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { @Override public void configure() { from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1") .delayer(5000) .setHeader("rabbitmq.ROUTING_KEY", constant("destinationQueue")) .to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false") .routeId("myRoute"); } }); context.start(); new BufferedReader(new InputStreamReader(System.in)).readLine(); context.stop(); } {code} you get the following exception: {noformat} com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e (amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1): com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303) at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043) at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} I think that this is caused by a race condition between the main thread that runs channel.close(); immediately after channel.basicCancel(tag); (see org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the channel.basicAck(deliveryTag, false); in handleDelivery(). Another bad side effect is that you'll find a duplicate of a message on the destinationQueue. For example if you have 10 initial messages in sourceQueue and you hit enter while it's processing the third one, you'll get 7 messages in sourceQueue and 4 messages in destinationQueue. The correct behaviour should be the following: 1) Stop consumer: channel.basicCancel(tag) 2) Wait if there is a running consumer 3) The consumer acks the previous message 4) Close the channel was: Run the following code and hit enter while one message is in unacked state (see RabbitMQ console): {code:java} public static void main(String[] args) throws Exception { CamelContext context = new DefaultCamelContext(); createEndpoints(context); context.addRoutes(new RouteBuilder() { @Override public void configure() { from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1") .delayer(5000) .setHeader("rabbitmq.ROUTING_KEY", constant("destinationQueue")) .to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false") .routeId("myRoute"); } }); context.start(); new BufferedReader(new InputStreamReader(System.in)).readLine(); context.stop(); } {code} you get the following exception: {noformat} com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e (amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1) threw an exception for channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1): com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309) at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303) at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043) at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108) at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144) at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} I think that this is caused by a race condition between the main thread that runs channel.close(); immediately after channel.basicCancel(tag); (see org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the channel.basicAck(deliveryTag, false); in handleDelivery(). Another bad side effect is that you'll find a duplicate of a message on the destinationQueue. For example if you have 10 initial messages in sourceQueue and you hit enter while it's processing the third one, you'll get 7 messages in sourceQueue and 4 messages in destinationQueue. The correct behaviour should be the following: 1) Stop consumer: channel.basicCancel(tag) 2) Wait if there is a running consumer 3) The consumer acks the previous message 4) Close the channel > Race condition when stopping context with autoack=false > ------------------------------------------------------- > > Key: CAMEL-10229 > URL: https://issues.apache.org/jira/browse/CAMEL-10229 > Project: Camel > Issue Type: Bug > Components: camel-rabbitmq > Affects Versions: 2.17.3 > Reporter: Piero Cangianiello > Labels: autoack, rabbitmq, stop > > Run the following code and hit enter while one message is in unacked state > (see RabbitMQ console): > {code:java} > public static void main(String[] args) throws Exception { > CamelContext context = new DefaultCamelContext(); > context.addRoutes(new RouteBuilder() { > @Override > public void configure() { > > from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1") > .delayer(5000) > .setHeader("rabbitmq.ROUTING_KEY", > constant("destinationQueue")) > > .to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false") > .routeId("myRoute"); > } > }); > context.start(); > new BufferedReader(new InputStreamReader(System.in)).readLine(); > context.stop(); > } > {code} > you get the following exception: > {noformat} > com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer > org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e > (amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel > AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1) threw an exception for > channel AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1): > com.rabbitmq.client.AlreadyClosedException: channel is already closed due to > clean channel shutdown; protocol method: > #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) > at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195) > at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309) > at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303) > at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043) > at > org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108) > at > com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144) > at > com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > I think that this is caused by a race condition between the main thread that > runs channel.close(); immediately after channel.basicCancel(tag); (see > org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the > channel.basicAck(deliveryTag, false); in handleDelivery(). > Another bad side effect is that you'll find a duplicate of a message on the > destinationQueue. For example if you have 10 initial messages in sourceQueue > and you hit enter while it's processing the third one, you'll get 7 messages > in sourceQueue and 4 messages in destinationQueue. > The correct behaviour should be the following: > 1) Stop consumer: channel.basicCancel(tag) > 2) Wait if there is a running consumer > 3) The consumer acks the previous message > 4) Close the channel -- This message was sent by Atlassian JIRA (v6.3.4#6332)