Hello Cliff,
After spending more time debugging, I think we found where the real problem
comes from:
I attached a simplified use case in the Main.cpp, Broker.hpp.
When we call delivery.work_queue().add() line 35 of Main.cpp, we get an
random segfault and if we replace it instead by a
connection.work_queue().add() everything comes back to normal.
I tried to think about what might be the source of the problem but i can't
find any ideas yet...
Best regards,
Rabih
On Fri, Apr 26, 2019 at 9:38 AM Cliff Jansen <[email protected]> wrote:
> Hi Jeremy,
>
> I haven't had a chance to parse your overall description to arrive at
> a reproducer of your segfault problem.
>
> However, the DeliverySettleTest.cpp code you provided runs correctly.
> The client view of the delivery is settled via
>
> d.accept(); // line 34
>
> which results in the disposition frame including "settled=true" sent
> to the server (class Broker). This also means that the client locally
> has determined that (in AMQP terminology) it is now "safe to forget"
> the disposition. No changes to the disposition are legal after that,
> and no callbacks about the disposition would be expected either.
>
> On the server side, in both on_tracker_accept() and
> on_tracker_settle() you would find that t.settled() is true, as you
> would expect. The call to
>
> t.settle(); // line 147 Broker.hpp
>
> is important to release memory since auto_settle has been turned off.
> But proton knows that the peer has settled and forgotten the delivery.
> So it sensibly declines to send a disposition frame (about a forgotten
> entity) that would be ignored by the client as per the AMQP spec.
>
> Note that the above explanation is C++ specific. If you use lower
> level C primitives, it is possible to accept a delivery without
> settling it until later and get closer to what you appear to be
> striving for in this test.
>
> However, I can't help thinking that relying on two wire transfers,
> which could be arbitrarily slow, is the mechanism you want to pursue
> to avoid the (lifecycle?) segfault you are trying to address.
>
> Perhaps you could post the actual code with the segfault problem for
> me to look at.
>
> On Thu, Apr 25, 2019 at 6:42 AM jeremy <[email protected]> wrote:
>
> > Hello,
> >
> > In the client API we're writing, we let a consumer consume messages
> > asynchronously, receiving a future of a delivery. In the receiver
> handler,
> > we keep track of deliveries in a list, as such:
> >
> >
> https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/include/proton/imperative/Receiver.hpp#L59
> > The user can then get the delivery and accept/reject/release it:
> >
> >
> https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/src/Delivery.cpp#L7
> >
> > The proton delivery operations (accept/reject/release) are asynchronous
> > (added on the work queue:
> >
> >
> https://github.com/apache/qpid-proton/blob/7c0a3387a5096d86541dbddfeb55f36eb0b85dd8/c/src/core/engine.c#L732
> > )
> > unless I'm mistaken. In the above delivery code, removing the delivery
> > object from the list as soon as we go out of scope
> > (
> >
> https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/src/Delivery.cpp#L11
> > )
> > results in a random segfault, since at the time the accept is called on
> the
> > delivery in the work queue, the delivery would have been already removed
> > from the list.
> > As a solution, we thought of implementing on_delivery_settle on the
> > receiver's side, and removing the delivery there. However, we noticed
> that
> > on_delivery_settle is never called (delivery mode set to none on both
> > sender
> > and receiver sides, sender auto_settle set to false, receiver auto ack
> set
> > to false). I tested with all delivery modes on both sides, and the
> > on_delivery_settle was never called. I attached the corresponding code.
> >
> > Got the following log:
> >
> > broker on_container_start
> > [0x1021a70]: -> AMQP
> > [0x1021a70]:0 -> @open(16)
> > [container-id="b7cdef05-f195-4760-b262-655d538f0419",
> hostname="127.0.0.1",
> > channel-max=32767]
> > [0x7f719c0032d0]: <- AMQP
> > [0x7f719c0032d0]:0 <- @open(16)
> > [container-id="b7cdef05-f195-4760-b262-655d538f0419",
> hostname="127.0.0.1",
> > channel-max=32767]
> > broker on_connection_open
> > [0x7f719c0032d0]: -> AMQP
> > [0x7f719c0032d0]:0 -> @open(16)
> > [container-id="64a68948-0d32-4c19-89c7-9d62c408e248", channel-max=32767]
> > [0x1021a70]: <- AMQP
> > [0x1021a70]:0 <- @open(16)
> > [container-id="64a68948-0d32-4c19-89c7-9d62c408e248", channel-max=32767]
> > [0x1021a70]:0 -> @begin(17) [next-outgoing-id=0,
> > incoming-window=2147483647,
> > outgoing-window=2147483647]
> > [0x1021a70]:0 -> @attach(18)
> [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25",
> > handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0,
> > source=@source(40) [address="examples", durable=0, timeout=0,
> > dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false],
> > initial-delivery-count=0, max-message-size=0]
> > [0x1021a70]:0 -> @flow(19) [incoming-window=2147483647,
> next-outgoing-id=0,
> > outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10,
> > drain=false]
> > [0x7f719c0032d0]:0 <- @begin(17) [next-outgoing-id=0,
> > incoming-window=2147483647, outgoing-window=2147483647]
> > [0x7f719c0032d0]:0 <- @attach(18)
> > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", handle=0, role=true,
> > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> > [address="examples", durable=0, timeout=0, dynamic=false],
> > target=@target(41) [durable=0, timeout=0, dynamic=false],
> > initial-delivery-count=0, max-message-size=0]
> > [0x7f719c0032d0]:0 <- @flow(19) [incoming-window=2147483647,
> > next-outgoing-id=0, outgoing-window=2147483647, handle=0,
> delivery-count=0,
> > link-credit=10, drain=false]
> > broker on_sendable
> > broker on_sendable
> > [0x7f719c0032d0]:0 -> @begin(17) [remote-channel=0, next-outgoing-id=0,
> > incoming-window=2147483647, outgoing-window=2147483647]
> > [0x7f719c0032d0]:0 -> @attach(18)
> > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", handle=0, role=false,
> > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> > [address="examples", durable=0, timeout=0, dynamic=false],
> > target=@target(41) [durable=0, timeout=0, dynamic=false],
> > initial-delivery-count=0, max-message-size=0]
> > [0x7f719c0032d0]:0 -> @transfer(20) [handle=0, delivery-id=0,
> > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0] (23)
> > "\x00SpE\x00SsE\x00Sw\xa1\x0amy message"
> > broker on_sendable
> > [0x1021a70]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0,
> > incoming-window=2147483647, outgoing-window=2147483647]
> > [0x1021a70]:0 <- @attach(18)
> [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25",
> > handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0,
> > source=@source(40) [address="examples", durable=0, timeout=0,
> > dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false],
> > initial-delivery-count=0, max-message-size=0]
> > [0x1021a70]:0 <- @transfer(20) [handle=0, delivery-id=0,
> > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0] (23)
> > "\x00SpE\x00SsE\x00Sw\xa1\x0amy message"
> > my message
> > on_message settled:0
> > [0x1021a70]:0 -> @flow(19) [next-incoming-id=1,
> incoming-window=2147483647,
> > next-outgoing-id=0, outgoing-window=2147483647, handle=0,
> delivery-count=1,
> > link-credit=10, drain=false]
> > [0x1021a70]:0 -> @disposition(21) [role=true, first=0, settled=true,
> > state=@accepted(36) []]
> > [0x7f719c0032d0]:0 <- @flow(19) [next-incoming-id=1,
> > incoming-window=2147483647, next-outgoing-id=0,
> outgoing-window=2147483647,
> > handle=0, delivery-count=1, link-credit=10, drain=false]
> > [0x7f719c0032d0]:0 <- @disposition(21) [role=true, first=0, settled=true,
> > state=@accepted(36) []]
> > broker on_sendable
> > broker on_tracker_accept
> > broker on_tracker_settle
> >
> >
> > DeliverySettleTest.cpp
> > <http://qpid.2158936.n2.nabble.com/file/t396337/DeliverySettleTest.cpp>
> > Broker.hpp <http://qpid.2158936.n2.nabble.com/file/t396337/Broker.hpp>
> >
> > Thanks,
> > Jeremy
> >
> >
> >
> > -----
> > Cheers,
> > Jeremy
> > --
> > Sent from:
> > http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]
> >
> >
>
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/session.hpp>
#include <proton/work_queue.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/connection_options.hpp>
#include <proton/session_options.hpp>
#include <proton/delivery.hpp>
#include <Broker.hpp>
#include <iostream>
#include <thread>
#include <future>
#include <string>
struct ReceiverHandler : public proton::messaging_handler {
std::string conn_url_;
std::string addr_;
proton::connection m_connection;
static const int max = 5;
int m_countDownLatch;
std::promise<void> m_messageReceived;
std::queue<proton::delivery> m_deliveries;
ReceiverHandler(const std::string& u, const std::string& a) :
conn_url_(u), addr_(a), m_countDownLatch(max)
{}
void acceptDelivery() {
/* !!!!!! with the following line we encounter random segfaults !!!!!!
if we use: m_connection.work_queue().add([&]{ instead it will pass
correctly */
m_deliveries.front().work_queue().add([&]{
m_deliveries.front().accept();
m_deliveries.pop();
});
}
void on_container_start(proton::container& c) override {
c.connect(conn_url_);
}
void on_connection_open(proton::connection& c) override {
c.open_receiver(addr_,
proton::receiver_options().delivery_mode(proton::delivery_mode::NONE).auto_accept(false));
m_connection = c;
}
void on_message(proton::delivery &d, proton::message &m) override {
std::cout << m.body() << std::endl;
m_deliveries.emplace(d);
if(--m_countDownLatch == 0) {
m_messageReceived.set_value();
}
}
};
int main() {
{
try {
const std::string url("//127.0.0.1:5676");
const std::string dest("examples");
Broker brk(url, dest);
std::vector<proton::message> msgs(ReceiverHandler::max,
proton::message("my message"));
brk.injectMessages(msgs);
ReceiverHandler hw(url, dest);
auto protonContainerClosed = std::async(std::launch::async, [&] {
proton::container(hw).run();
});
hw.m_messageReceived.get_future().get();
for(int i = 0; i < ReceiverHandler::max; ++i) {
hw.acceptDelivery();
}
hw.m_connection.work_queue().add([&]{
hw.m_connection.close();
});
protonContainerClosed.get();
} catch (const std::exception& e) {
std::cout << e.what() << std::endl;
}
return 0;
}
#ifndef TEST_BROKER_HPP
#define TEST_BROKER_HPP
#include <proton/listener.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/listen_handler.hpp>
#include <proton/imperative/ThreadRAII.hpp>
#include <string>
#include <iostream>
#include <thread>
#include <future>
class listenerHandler : public proton::listen_handler
{
public:
std::future<void> getStartedFuture()
{
return m_containerStarted.get_future();
}
private:
void on_open(proton::listener&) override
{
m_containerStarted.set_value();
}
void on_error(proton::listener&, const std::string& s) override
{
m_containerStarted.set_exception(std::make_exception_ptr(std::runtime_error(s)));
}
std::promise<void> m_containerStarted;
};
class Broker : public proton::messaging_handler
{
public:
Broker(const std::string& url, const std::string& destination)
:m_url(url + "/" + destination)
{
m_brokerThread = std::thread([&]() {
try
{
proton::container(*this).run();
}
catch (const std::exception& e) {
std::cerr << "Broker threw exception: " << e.what() << std::endl;
}});
// Wait for the container to start
m_listenerHandler.getStartedFuture().get();
}
~Broker()
{
if (!m_isClosed) {
m_listener.stop();
}
}
void injectMessages(std::vector<proton::message> messages)
{
m_messages.insert(m_messages.end(), messages.begin(), messages.end());
}
int m_acceptedMsgs = 0;
int m_rejectedMsgs = 0;
int m_releasedMsgs = 0;
private:
void on_container_start(proton::container &c) override
{
std::cout << "broker on_container_start" << std::endl;
c.receiver_options(proton::receiver_options());
m_listener = c.listen(m_url, m_listenerHandler);
}
void on_connection_open(proton::connection &c) override
{
std::cout << "broker on_connection_open" << std::endl;
m_connection = c;
c.open();
}
void on_connection_close(proton::connection&) override
{
std::cout << "broker on_connection_close" << std::endl;
m_listener.stop();
}
void on_transport_error(proton::transport &t) override
{
std::cout << "broker on_transport_error" << std::endl;
std::cerr << "Broker::on_transport_error: " << t.error().what() <<
std::endl;
m_listener.stop();
}
void on_error(const proton::error_condition &c) override
{
std::cout << "broker on_error" << std::endl;
std::cerr << "Broker::on_error: " << c.what() << std::endl;
m_isClosed = true;
m_listener.stop();
}
void on_message(proton::delivery& delivery, proton::message& message)
override
{
std::cout << "broker on_message" << std::endl;
m_currentDelivery = { message, delivery };
}
void sendMessages(proton::sender& sender)
{
size_t numberOfMessagesToSend =
std::min(static_cast<size_t>(sender.credit()), m_messages.size());
for (auto count = numberOfMessagesToSend; count > 0; --count)
{
auto message = m_messages.front();
m_messages.pop_front();
sender.send(message);
}
}
void on_sendable(proton::sender& sender) override
{
std::cout << "broker on_sendable" << std::endl;
sendMessages(sender);
}
void on_tracker_accept(proton::tracker& ) override
{
++m_acceptedMsgs;
}
void on_tracker_reject(proton::tracker&) override
{
++m_rejectedMsgs;
}
void on_tracker_release(proton::tracker&) override
{
++m_releasedMsgs;
}
private:
std::string m_url;
bool m_isClosed = false;
proton::ThreadRAII m_brokerThread;
std::deque<proton::message> m_messages;
std::pair<proton::message, proton::delivery> m_currentDelivery;
listenerHandler m_listenerHandler;
proton::listener m_listener;
proton::connection m_connection;
};
#endif
#0 0x00007f9948014d80 in ?? ()
#1 0x00007f994f370200 in pn_class_decref (clazz=0x7f9948004c80,
object=0x7f994800efe0)
at /proton/qpid-proton-0.27.1/c/src/core/object/object.c:91
#2 0x00000000004adb2c in ReceiverHandler::~ReceiverHandler() ()
#3 0x00000000004a5f7f in DeliveryTest_rabih_Test::TestBody() ()
#4 0x00000000004d406e in void
testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test,
void>(testing::Test*, void (testing::Test::*)(), char const*) ()
#5 0x00000000004cea82 in void
testing::internal::HandleExceptionsInMethodIfSupported<testing::Test,
void>(testing::Test*, void (testing::Test::*)(), char const*) ()
#6 0x00000000004b52b3 in testing::Test::Run() ()
#7 0x00000000004b5ab1 in testing::TestInfo::Run() ()
#8 0x00000000004b60ba in testing::TestCase::Run() ()
#9 0x00000000004bc7cd in testing::internal::UnitTestImpl::RunAllTests() ()
#10 0x00000000004d4ead in bool
testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl,
bool>(testing::internal::UnitTestImpl*, bool
(testing::internal::UnitTestImpl::*)(), char const*) ()
#11 0x00000000004cf7b0 in bool
testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl,
bool>(testing::internal::UnitTestImpl*, bool
(testing::internal::UnitTestImpl::*)(), char const*) ()
#12 0x00000000004bb55f in testing::UnitTest::Run() ()
#13 0x00000000004e2b00 in RUN_ALL_TESTS() ()
#14 0x00000000004e2a9b in main ()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]