I would like to add a bit of context. My C++ client has a method called
"receiveTextMessage()". This method is supposed to simulate what
JMS.receiveMessage() does which is block until a message is received and
settled.
So, I am using wait/notify mechanism to unblock my consumer when the message is
settled and no longer available on the broker.
It seems in the code, when the auto_accept is set to true (default behavior),
there is no way to be notified when the delivery is settled and only on_message
will be called (proton::messaging_adapter.cpp).
In that case, between the time MyHandler::on_message is called and the time
where the message is settled (d.accept() in the below code), can't anything go
wrong?
PS: As the event_loop has performance issues, I am using the timerTask and
schedule for the interaction between the consumer main thread and the handler
thread.
messaging_adapter.cpp Code block myHandler is reaching
-----------------------------------------------------------------------------
delegate_.on_message(d, msg);
if (lctx.auto_accept && !d.settled())
d.accept(); //This is reached as well
messaging_adapter.cpp Full block code
-----------------------------------------------------------------------------
if (pn_link_is_receiver(lnk)) {
delivery d(make_wrapper<delivery>(dlv));
if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) {
// generate on_message
pn_connection_t *pnc = pn_session_connection(pn_link_session(lnk));
connection_context& ctx = connection_context::get(pnc);
// Reusable per-connection message.
// Avoid expensive heap malloc/free overhead.
// See PROTON-998
class message &msg(ctx.event_message);
msg.decode(d);
if (pn_link_state(lnk) & PN_LOCAL_CLOSED) {
if (lctx.auto_accept)
d.release();
} else {
delegate_.on_message(d, msg);
if (lctx.auto_accept && !d.settled())
d.accept();
if (lctx.draining && !pn_link_credit(lnk)) {
lctx.draining = false;
receiver r(make_wrapper<receiver>(lnk));
delegate_.on_receiver_drain_finish(r);
}
}
}
else if (pn_delivery_updated(dlv) && d.settled()) {
delegate_.on_delivery_settle(d);
}
if (lctx.draining && pn_link_credit(lnk) == 0) {
lctx.draining = false;
pn_link_set_drain(lnk, false);
receiver r(make_wrapper<receiver>(lnk));
delegate_.on_receiver_drain_finish(r);
if (lctx.pending_credit) {
pn_link_flow(lnk, lctx.pending_credit);
lctx.pending_credit = 0;
}
}
credit_topup(lnk);
}
________________________________
From: Adel Boutros <[email protected]>
Sent: Wednesday, November 16, 2016 7:08:56 PM
To: [email protected]
Subject: [Proton-c] [C++ bindings][0.14.0] How do I know a message was settled
by the receiver?
Hello,
I have a simple C++ client which connects to a Qpid Java Broker (6.0.4) and
receives messages.
With the default behavior of the C++ client, how can I know when a delivery has
been settled?
It seems there is no way for me to know that.
I have debugged a bit and when the on_message is called, the message becomes in
"Acquired" state on the broker. Later on, somewhere which I haven't found yet,
the message is settled and disappears from the broker. However none of the
functions I implemented below is called.
Can you please assist?
Console output
-----------------------
on_message
Code
-----------------------
class MyHandler : public proton::messaging_handler
{
public:
virtual void on_container_start(proton::container &c)
{
proton::url url("localhost:5672");
proton::connection connection = c.connect(url);
proton::receiver_options
receiverOptions(proton::receiver_options().credit_window(0));
connection.open_receiver("queue.name", receiverOptions);
}
virtual void on_receiver_open(proton::receiver& l)
{
l.add_credit(1);
}
virtual void on_message(proton::delivery &d, proton::message &m)
{
std::cout << "on_message" << std::endl;
}
virtual void on_delivery_settle(proton::delivery &d)
{
std::cout << "on_delivery_settle" << std::endl;
}
virtual void on_tracker_accept(proton::tracker &d)
{
std::cout << "on_tracker_accept" << std::endl;
}
virtual void on_tracker_reject(proton::tracker &d)
{
std::cout << "on_tracker_reject" << std::endl;
}
virtual void on_tracker_release(proton::tracker &d)
{
std::cout << "on_tracker_release" << std::endl;
}
virtual void on_tracker_settle(proton::tracker &d)
{
std::cout << "on_tracker_settle" << std::endl;
}
};
Regards,
Adel