On 08/06/2020 8:00 pm, Attila Kun wrote:
Hi,

I’m using the qpid-proton C++ library and receiving messages via this 
component: 
https://qpid.apache.org/releases/qpid-proton-0.30.0/proton/cpp/api/classproton_1_1messaging__handler.html
In the on_container_start method, I used the idle_timeout connection option to 
define some time after which the connection should be dropped if there was no 
heartbeat messages transferred between my app and the broker.
The following scenario can happen:

   1.  My app is down.
   2.  Let’s say 2 messages accumulate in the broker: message A and message B.
   3.  My app comes online.
   4.  My app processes message A from the broker in its 
proton::messaging_handler:: on_message override method. However, processing 
this message takes longer than the idle_timeout defined above.

This suggests you are blocking the event thread with your message processing, which is not advised.

   5.  My app processes message B from the broker.
   6.  Because of the long processing time of message A (point 4 above), the 
on_transport_error method is invoked with an “amqp:resource-limit-exceeded: 
local-idle-timeout expired” error.
   7.  Container execution stops because of the error above.
   8.  My app starts the container again.
   9.  My app receives messages A and B again. These are undesirable duplicates 
that the app has already processed in points 4-5.

My aim is to eliminate the duplicate messages in point 9. I tried the following:

   1.  I could use a local cache where I store the message ids that my app has 
already processed. I don’t really want to do this, because this cache might 
grow arbitrarily large. In my scenario above, I used 2 messages (A and B) but 
in practice an arbitrarily large number of duplicates may arrive. I would be 
okay with a solution that implies a constant number of messages (preferably 0 
or 1) is duplicated.
   2.  I could use at-most-once delivery mode. I can’t afford this, because 
then I would lose messages if my app crashes while processing a message.
   3.  I could use exactly-once delivery-mode. I saw this mentioned in the 
docs<https://qpid.apache.org/releases/qpid-proton-0.30.0/proton/cpp/api/overview_page.html>,
 but I couldn’t exactly find how to set this on the API. The necessary option seems 
to be missing from the relevant enum: 
https://github.com/apache/qpid-proton/blob/master/cpp/include/proton/delivery_mode.hpp#L33
   4.  I tried using at-least-once delivery mode with explicitly calling 
proton::delivery::accept() and proton::delivery::settle() in the 
proton::messaging_handler:: on_message override. Unfortunately, this still 
resulted in duplicates. This suggests that messages are not actually settled 
synchronously when I’m calling accept() or settle(). I suspect that the attempt 
to settle the message is placed in some sort of a queue whose processing is 
then aborted due to the call to  on_transport_error and the subsequent 
container stop.

Correct these are asynchronous methods and the API is itself asynchronous. The thread that runs the container should not be blocked by other processing, or it will not be able to do what it needs to (e.g. send heartbeats or acknowledgements as requested).

You can do the processing of messages on a separate thread, communicating with the event thread using work_queues (see https://qpid.apache.org/releases/qpid-proton-0.31.0/proton/cpp/api/mt_page.html and https://qpid.apache.org/releases/qpid-proton-0.31.0/proton/cpp/api/classproton_1_1work__queue.html)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to