Hello Cliff, After spending more time debugging, I think we found where the real problem comes from: I attached a simplified use case in the Main.cpp, Broker.hpp. When we call delivery.work_queue().add() line 35 of Main.cpp, we get an random segfault and if we replace it instead by a connection.work_queue().add() everything comes back to normal.
I tried to think about what might be the source of the problem but i can't find any ideas yet... Best regards, Rabih On Fri, Apr 26, 2019 at 9:38 AM Cliff Jansen <cliffjan...@gmail.com> wrote: > Hi Jeremy, > > I haven't had a chance to parse your overall description to arrive at > a reproducer of your segfault problem. > > However, the DeliverySettleTest.cpp code you provided runs correctly. > The client view of the delivery is settled via > > d.accept(); // line 34 > > which results in the disposition frame including "settled=true" sent > to the server (class Broker). This also means that the client locally > has determined that (in AMQP terminology) it is now "safe to forget" > the disposition. No changes to the disposition are legal after that, > and no callbacks about the disposition would be expected either. > > On the server side, in both on_tracker_accept() and > on_tracker_settle() you would find that t.settled() is true, as you > would expect. The call to > > t.settle(); // line 147 Broker.hpp > > is important to release memory since auto_settle has been turned off. > But proton knows that the peer has settled and forgotten the delivery. > So it sensibly declines to send a disposition frame (about a forgotten > entity) that would be ignored by the client as per the AMQP spec. > > Note that the above explanation is C++ specific. If you use lower > level C primitives, it is possible to accept a delivery without > settling it until later and get closer to what you appear to be > striving for in this test. > > However, I can't help thinking that relying on two wire transfers, > which could be arbitrarily slow, is the mechanism you want to pursue > to avoid the (lifecycle?) segfault you are trying to address. > > Perhaps you could post the actual code with the segfault problem for > me to look at. > > On Thu, Apr 25, 2019 at 6:42 AM jeremy <jeremyao...@gmail.com> wrote: > > > Hello, > > > > In the client API we're writing, we let a consumer consume messages > > asynchronously, receiving a future of a delivery. In the receiver > handler, > > we keep track of deliveries in a list, as such: > > > > > https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/include/proton/imperative/Receiver.hpp#L59 > > The user can then get the delivery and accept/reject/release it: > > > > > https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/src/Delivery.cpp#L7 > > > > The proton delivery operations (accept/reject/release) are asynchronous > > (added on the work queue: > > > > > https://github.com/apache/qpid-proton/blob/7c0a3387a5096d86541dbddfeb55f36eb0b85dd8/c/src/core/engine.c#L732 > > ) > > unless I'm mistaken. In the above delivery code, removing the delivery > > object from the list as soon as we go out of scope > > ( > > > https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/src/Delivery.cpp#L11 > > ) > > results in a random segfault, since at the time the accept is called on > the > > delivery in the work queue, the delivery would have been already removed > > from the list. > > As a solution, we thought of implementing on_delivery_settle on the > > receiver's side, and removing the delivery there. However, we noticed > that > > on_delivery_settle is never called (delivery mode set to none on both > > sender > > and receiver sides, sender auto_settle set to false, receiver auto ack > set > > to false). I tested with all delivery modes on both sides, and the > > on_delivery_settle was never called. I attached the corresponding code. > > > > Got the following log: > > > > broker on_container_start > > [0x1021a70]: -> AMQP > > [0x1021a70]:0 -> @open(16) > > [container-id="b7cdef05-f195-4760-b262-655d538f0419", > hostname="127.0.0.1", > > channel-max=32767] > > [0x7f719c0032d0]: <- AMQP > > [0x7f719c0032d0]:0 <- @open(16) > > [container-id="b7cdef05-f195-4760-b262-655d538f0419", > hostname="127.0.0.1", > > channel-max=32767] > > broker on_connection_open > > [0x7f719c0032d0]: -> AMQP > > [0x7f719c0032d0]:0 -> @open(16) > > [container-id="64a68948-0d32-4c19-89c7-9d62c408e248", channel-max=32767] > > [0x1021a70]: <- AMQP > > [0x1021a70]:0 <- @open(16) > > [container-id="64a68948-0d32-4c19-89c7-9d62c408e248", channel-max=32767] > > [0x1021a70]:0 -> @begin(17) [next-outgoing-id=0, > > incoming-window=2147483647, > > outgoing-window=2147483647] > > [0x1021a70]:0 -> @attach(18) > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", > > handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0, > > source=@source(40) [address="examples", durable=0, timeout=0, > > dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false], > > initial-delivery-count=0, max-message-size=0] > > [0x1021a70]:0 -> @flow(19) [incoming-window=2147483647, > next-outgoing-id=0, > > outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10, > > drain=false] > > [0x7f719c0032d0]:0 <- @begin(17) [next-outgoing-id=0, > > incoming-window=2147483647, outgoing-window=2147483647] > > [0x7f719c0032d0]:0 <- @attach(18) > > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", handle=0, role=true, > > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) > > [address="examples", durable=0, timeout=0, dynamic=false], > > target=@target(41) [durable=0, timeout=0, dynamic=false], > > initial-delivery-count=0, max-message-size=0] > > [0x7f719c0032d0]:0 <- @flow(19) [incoming-window=2147483647, > > next-outgoing-id=0, outgoing-window=2147483647, handle=0, > delivery-count=0, > > link-credit=10, drain=false] > > broker on_sendable > > broker on_sendable > > [0x7f719c0032d0]:0 -> @begin(17) [remote-channel=0, next-outgoing-id=0, > > incoming-window=2147483647, outgoing-window=2147483647] > > [0x7f719c0032d0]:0 -> @attach(18) > > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", handle=0, role=false, > > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) > > [address="examples", durable=0, timeout=0, dynamic=false], > > target=@target(41) [durable=0, timeout=0, dynamic=false], > > initial-delivery-count=0, max-message-size=0] > > [0x7f719c0032d0]:0 -> @transfer(20) [handle=0, delivery-id=0, > > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0] (23) > > "\x00SpE\x00SsE\x00Sw\xa1\x0amy message" > > broker on_sendable > > [0x1021a70]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0, > > incoming-window=2147483647, outgoing-window=2147483647] > > [0x1021a70]:0 <- @attach(18) > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", > > handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, > > source=@source(40) [address="examples", durable=0, timeout=0, > > dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false], > > initial-delivery-count=0, max-message-size=0] > > [0x1021a70]:0 <- @transfer(20) [handle=0, delivery-id=0, > > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0] (23) > > "\x00SpE\x00SsE\x00Sw\xa1\x0amy message" > > my message > > on_message settled:0 > > [0x1021a70]:0 -> @flow(19) [next-incoming-id=1, > incoming-window=2147483647, > > next-outgoing-id=0, outgoing-window=2147483647, handle=0, > delivery-count=1, > > link-credit=10, drain=false] > > [0x1021a70]:0 -> @disposition(21) [role=true, first=0, settled=true, > > state=@accepted(36) []] > > [0x7f719c0032d0]:0 <- @flow(19) [next-incoming-id=1, > > incoming-window=2147483647, next-outgoing-id=0, > outgoing-window=2147483647, > > handle=0, delivery-count=1, link-credit=10, drain=false] > > [0x7f719c0032d0]:0 <- @disposition(21) [role=true, first=0, settled=true, > > state=@accepted(36) []] > > broker on_sendable > > broker on_tracker_accept > > broker on_tracker_settle > > > > > > DeliverySettleTest.cpp > > <http://qpid.2158936.n2.nabble.com/file/t396337/DeliverySettleTest.cpp> > > Broker.hpp <http://qpid.2158936.n2.nabble.com/file/t396337/Broker.hpp> > > > > Thanks, > > Jeremy > > > > > > > > ----- > > Cheers, > > Jeremy > > -- > > Sent from: > > http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > > For additional commands, e-mail: users-h...@qpid.apache.org > > > > >
#include <proton/connection.hpp> #include <proton/container.hpp> #include <proton/session.hpp> #include <proton/work_queue.hpp> #include <proton/messaging_handler.hpp> #include <proton/connection_options.hpp> #include <proton/session_options.hpp> #include <proton/delivery.hpp> #include <Broker.hpp> #include <iostream> #include <thread> #include <future> #include <string> struct ReceiverHandler : public proton::messaging_handler { std::string conn_url_; std::string addr_; proton::connection m_connection; static const int max = 5; int m_countDownLatch; std::promise<void> m_messageReceived; std::queue<proton::delivery> m_deliveries; ReceiverHandler(const std::string& u, const std::string& a) : conn_url_(u), addr_(a), m_countDownLatch(max) {} void acceptDelivery() { /* !!!!!! with the following line we encounter random segfaults !!!!!! if we use: m_connection.work_queue().add([&]{ instead it will pass correctly */ m_deliveries.front().work_queue().add([&]{ m_deliveries.front().accept(); m_deliveries.pop(); }); } void on_container_start(proton::container& c) override { c.connect(conn_url_); } void on_connection_open(proton::connection& c) override { c.open_receiver(addr_, proton::receiver_options().delivery_mode(proton::delivery_mode::NONE).auto_accept(false)); m_connection = c; } void on_message(proton::delivery &d, proton::message &m) override { std::cout << m.body() << std::endl; m_deliveries.emplace(d); if(--m_countDownLatch == 0) { m_messageReceived.set_value(); } } }; int main() { { try { const std::string url("//127.0.0.1:5676"); const std::string dest("examples"); Broker brk(url, dest); std::vector<proton::message> msgs(ReceiverHandler::max, proton::message("my message")); brk.injectMessages(msgs); ReceiverHandler hw(url, dest); auto protonContainerClosed = std::async(std::launch::async, [&] { proton::container(hw).run(); }); hw.m_messageReceived.get_future().get(); for(int i = 0; i < ReceiverHandler::max; ++i) { hw.acceptDelivery(); } hw.m_connection.work_queue().add([&]{ hw.m_connection.close(); }); protonContainerClosed.get(); } catch (const std::exception& e) { std::cout << e.what() << std::endl; } return 0; }
#ifndef TEST_BROKER_HPP #define TEST_BROKER_HPP #include <proton/listener.hpp> #include <proton/messaging_handler.hpp> #include <proton/listen_handler.hpp> #include <proton/imperative/ThreadRAII.hpp> #include <string> #include <iostream> #include <thread> #include <future> class listenerHandler : public proton::listen_handler { public: std::future<void> getStartedFuture() { return m_containerStarted.get_future(); } private: void on_open(proton::listener&) override { m_containerStarted.set_value(); } void on_error(proton::listener&, const std::string& s) override { m_containerStarted.set_exception(std::make_exception_ptr(std::runtime_error(s))); } std::promise<void> m_containerStarted; }; class Broker : public proton::messaging_handler { public: Broker(const std::string& url, const std::string& destination) :m_url(url + "/" + destination) { m_brokerThread = std::thread([&]() { try { proton::container(*this).run(); } catch (const std::exception& e) { std::cerr << "Broker threw exception: " << e.what() << std::endl; }}); // Wait for the container to start m_listenerHandler.getStartedFuture().get(); } ~Broker() { if (!m_isClosed) { m_listener.stop(); } } void injectMessages(std::vector<proton::message> messages) { m_messages.insert(m_messages.end(), messages.begin(), messages.end()); } int m_acceptedMsgs = 0; int m_rejectedMsgs = 0; int m_releasedMsgs = 0; private: void on_container_start(proton::container &c) override { std::cout << "broker on_container_start" << std::endl; c.receiver_options(proton::receiver_options()); m_listener = c.listen(m_url, m_listenerHandler); } void on_connection_open(proton::connection &c) override { std::cout << "broker on_connection_open" << std::endl; m_connection = c; c.open(); } void on_connection_close(proton::connection&) override { std::cout << "broker on_connection_close" << std::endl; m_listener.stop(); } void on_transport_error(proton::transport &t) override { std::cout << "broker on_transport_error" << std::endl; std::cerr << "Broker::on_transport_error: " << t.error().what() << std::endl; m_listener.stop(); } void on_error(const proton::error_condition &c) override { std::cout << "broker on_error" << std::endl; std::cerr << "Broker::on_error: " << c.what() << std::endl; m_isClosed = true; m_listener.stop(); } void on_message(proton::delivery& delivery, proton::message& message) override { std::cout << "broker on_message" << std::endl; m_currentDelivery = { message, delivery }; } void sendMessages(proton::sender& sender) { size_t numberOfMessagesToSend = std::min(static_cast<size_t>(sender.credit()), m_messages.size()); for (auto count = numberOfMessagesToSend; count > 0; --count) { auto message = m_messages.front(); m_messages.pop_front(); sender.send(message); } } void on_sendable(proton::sender& sender) override { std::cout << "broker on_sendable" << std::endl; sendMessages(sender); } void on_tracker_accept(proton::tracker& ) override { ++m_acceptedMsgs; } void on_tracker_reject(proton::tracker&) override { ++m_rejectedMsgs; } void on_tracker_release(proton::tracker&) override { ++m_releasedMsgs; } private: std::string m_url; bool m_isClosed = false; proton::ThreadRAII m_brokerThread; std::deque<proton::message> m_messages; std::pair<proton::message, proton::delivery> m_currentDelivery; listenerHandler m_listenerHandler; proton::listener m_listener; proton::connection m_connection; }; #endif
#0 0x00007f9948014d80 in ?? () #1 0x00007f994f370200 in pn_class_decref (clazz=0x7f9948004c80, object=0x7f994800efe0) at /proton/qpid-proton-0.27.1/c/src/core/object/object.c:91 #2 0x00000000004adb2c in ReceiverHandler::~ReceiverHandler() () #3 0x00000000004a5f7f in DeliveryTest_rabih_Test::TestBody() () #4 0x00000000004d406e in void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) () #5 0x00000000004cea82 in void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) () #6 0x00000000004b52b3 in testing::Test::Run() () #7 0x00000000004b5ab1 in testing::TestInfo::Run() () #8 0x00000000004b60ba in testing::TestCase::Run() () #9 0x00000000004bc7cd in testing::internal::UnitTestImpl::RunAllTests() () #10 0x00000000004d4ead in bool testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) () #11 0x00000000004cf7b0 in bool testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>(testing::internal::UnitTestImpl*, bool (testing::internal::UnitTestImpl::*)(), char const*) () #12 0x00000000004bb55f in testing::UnitTest::Run() () #13 0x00000000004e2b00 in RUN_ALL_TESTS() () #14 0x00000000004e2a9b in main ()
--------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org For additional commands, e-mail: users-h...@qpid.apache.org