Hi All,

Been digging today through some of the Camel-RabbitMQ code and can confirm now 
that the temp queue is created with the main endpoint's relevant properties 
(prefetch, autoAck and some others). The ack mode is set when the consumer get 
bound to the channel - TemporaryQueueReplyManager line 139:

/**
         * Bind consumer to channel
         */
        private void start() throws IOException {
            tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), 
this);
        }

What I haven't been able to dig out is where the TemporaryQueueReplyManager or 
TemporaryQueueReplyHandler (handleReplyMessage, onReply) or any other temp 
queue related logic would be sending the manual ack to RabbitMQ 
(channel.basicAck) in case the original endpoint, that the settings are taken 
from, had autoAck set to false.

My suspicion is that either the handler class or the manager class should be 
responsible for sending the manual ack, either TemporaryQueueReplyManager line 
66-67:

if (handler != null) {
            correlation.remove(correlationID);
            handler.onReply(correlationID, properties, message);
        }

Or TemporaryQueueReplyHandler line 56 - 59:

public void onReply(String correlationId, AMQP.BasicProperties properties, 
byte[] reply) {
        // create holder object with the the reply
        log.info("in onReply with correlationId= {}", correlationId);
        ReplyHolder holder = new ReplyHolder(exchange, callback, 
originalCorrelationId, correlationId, properties, reply);
        // process the reply
        replyManager.processReply(holder);
    }

The processReply method in ReplyManagerSupport doesn't seem to do it either, 
really lost on this one.

Could someone please point me to the place in the code where it's done? It 
might just maybe give me an idea or two of what I might be missing in my 
routing setup and how to get this working or at least how to work around this 
issue.


Thanks,
Valdis

From: Valdis Andersons
Sent: 16 August 2018 16:10
To: users@camel.apache.org
Subject: Camel with Rabbitmq: messages in temp reply queue not being acked

Hi All,

Hopefully someone here can educate me on the below matter.

Here is a rather long-winded description of the issue: 
https://stackoverflow.com/questions/51875646/apache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au<https://scanmail.trustwave.com/?c=6600&d=4pP1262DeccXeHHSPqygoqcKZcNR_lN14CUo2YIHRg&s=33&u=https%3a%2f%2fstackoverflow%2ecom%2fquestions%2f51875646%2fapache-camel-with-rabbitmq-messages-in-temp-reply-queue-not-being-acked-when-au>

The essence of it is that in an InOut route with autoAck=false on the main 
queue the messages on the temporary reply queue are not being acked even though 
they are being fetched as indicated by the prefetch setting and the amount of 
messages in an un-acked state on the temp queues.

If I set the autoAck=true then it works fine but in that case I'm risking 
losing messages and that's not what I want (we switched from SEDA to RabbitMQ 
for this reason among others).
If I set the exchange pattern to InOnly then it works fine as well as the route 
then doesn't need the temp queue for the reply. That unfortunately causes a 
race condition in this scenario (outputEmailEndpoint and outputArchiveEndpoint 
get the message at the same time, but email route needs to finish first before 
it can be archived):

rest(restEndpoint).post(postEndpoint)
                  .type(MyRequest.class)
                  .route()
                  .startupOrder(Integer.MAX_VALUE - 2)
                                  .process(this::process)
                                  .choice()
                                    
.when(header(DELIVERYSTATUS_HEADER).isEqualTo(Status.COMPLETED)).to(outputEmailEndpoint,
 outputArchiveEndpoint)

I'd also like to be sending the correct response status to the REST client 
calling this service and at the moment that doesn't quite work too well as the 
un-acked messages start blocking the route or the response will be sent before 
email route has finished.

I'm only new to Camel so I might be missing something very obvious here but the 
last few days of searching around and trying various config options haven't 
really got me anywhere further.


Thanks and Best Regards,
Valdis

Reply via email to