On 08/06/2020 8:00 pm, Attila Kun wrote:
Hi,
I’m using the qpid-proton C++ library and receiving messages via this
component:
https://qpid.apache.org/releases/qpid-proton-0.30.0/proton/cpp/api/classproton_1_1messaging__handler.html
In the on_container_start method, I used the idle_timeout connection option to
define some time after which the connection should be dropped if there was no
heartbeat messages transferred between my app and the broker.
The following scenario can happen:
1. My app is down.
2. Let’s say 2 messages accumulate in the broker: message A and message B.
3. My app comes online.
4. My app processes message A from the broker in its
proton::messaging_handler:: on_message override method. However, processing
this message takes longer than the idle_timeout defined above.
This suggests you are blocking the event thread with your message
processing, which is not advised.
5. My app processes message B from the broker.
6. Because of the long processing time of message A (point 4 above), the
on_transport_error method is invoked with an “amqp:resource-limit-exceeded:
local-idle-timeout expired” error.
7. Container execution stops because of the error above.
8. My app starts the container again.
9. My app receives messages A and B again. These are undesirable duplicates
that the app has already processed in points 4-5.
My aim is to eliminate the duplicate messages in point 9. I tried the following:
1. I could use a local cache where I store the message ids that my app has
already processed. I don’t really want to do this, because this cache might
grow arbitrarily large. In my scenario above, I used 2 messages (A and B) but
in practice an arbitrarily large number of duplicates may arrive. I would be
okay with a solution that implies a constant number of messages (preferably 0
or 1) is duplicated.
2. I could use at-most-once delivery mode. I can’t afford this, because
then I would lose messages if my app crashes while processing a message.
3. I could use exactly-once delivery-mode. I saw this mentioned in the
docs<https://qpid.apache.org/releases/qpid-proton-0.30.0/proton/cpp/api/overview_page.html>,
but I couldn’t exactly find how to set this on the API. The necessary option seems
to be missing from the relevant enum:
https://github.com/apache/qpid-proton/blob/master/cpp/include/proton/delivery_mode.hpp#L33
4. I tried using at-least-once delivery mode with explicitly calling
proton::delivery::accept() and proton::delivery::settle() in the
proton::messaging_handler:: on_message override. Unfortunately, this still
resulted in duplicates. This suggests that messages are not actually settled
synchronously when I’m calling accept() or settle(). I suspect that the attempt
to settle the message is placed in some sort of a queue whose processing is
then aborted due to the call to on_transport_error and the subsequent
container stop.
Correct these are asynchronous methods and the API is itself
asynchronous. The thread that runs the container should not be blocked
by other processing, or it will not be able to do what it needs to (e.g.
send heartbeats or acknowledgements as requested).
You can do the processing of messages on a separate thread,
communicating with the event thread using work_queues (see
https://qpid.apache.org/releases/qpid-proton-0.31.0/proton/cpp/api/mt_page.html
and
https://qpid.apache.org/releases/qpid-proton-0.31.0/proton/cpp/api/classproton_1_1work__queue.html)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]