Piero Cangianiello created CAMEL-10229:
------------------------------------------

             Summary: Race condition when stopping context with autoack=false
                 Key: CAMEL-10229
                 URL: https://issues.apache.org/jira/browse/CAMEL-10229
             Project: Camel
          Issue Type: Bug
          Components: camel-rabbitmq
    Affects Versions: 2.17.3
            Reporter: Piero Cangianiello


Run the following code and hit enter while one message is in unacked state (see 
RabbitMQ console):

{code:java}
public static void main(String[] args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        createEndpoints(context);

        context.addRoutes(new RouteBuilder() {
                @Override
                public void configure() {
                        
from("rabbitmq://localhost/?queue=sourceQueue&skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false&prefetchEnabled=true&prefetchCount=1")
                                        .delayer(5000)
                                        .setHeader("rabbitmq.ROUTING_KEY", 
constant("destinationQueue"))
                                        
.to("rabbitmq://localhost/?skipExchangeDeclare=true&skipQueueDeclare=true&autoAck=false")
                                        .routeId("myRoute");
                }
        });
        context.start();
        new BufferedReader(new InputStreamReader(System.in)).readLine();
        context.stop();
}
{code}

you get the following exception:

{noformat}
com.rabbitmq.client.impl.DefaultExceptionHandler: Consumer 
org.apache.camel.component.rabbitmq.RabbitConsumer@4c57777e 
(amq.ctag-dWpQw46flmamv0dM_Fa_Qg) method handleDelivery for channel 
AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1) threw an exception for channel 
AMQChannel(amqp://rabbit_user@127.0.0.1:5672/,1):
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to 
clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, 
reply-text=OK, class-id=0, method-id=0)
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:195)
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:309)
        at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303)
        at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1043)
        at 
org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:108)
        at 
com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
        at 
com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

I think that this is caused by a race condition between the main thread that 
runs channel.close(); immediately after channel.basicCancel(tag); (see 
org.apache.camel.component.rabbitmq.RabbitConsumer) without waiting the 
channel.basicAck(deliveryTag, false); in handleDelivery().

Another bad side effect is that you'll find a duplicate of a message on the 
destinationQueue. For example if you have 10 initial messages in sourceQueue 
and you hit enter while it's processing the third one, you'll get 7 messages in 
sourceQueue and 4 messages in destinationQueue.

The correct behaviour should be the following:
1) Stop consumer: channel.basicCancel(tag)
2) Wait if there is a running consumer
3) The consumer acks the previous message
4) Close the channel



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to