Hello Cliff,

After spending more time debugging, I think we found where the real problem
comes from:
I attached a simplified use case in the Main.cpp, Broker.hpp.
When we call delivery.work_queue().add() line 35 of Main.cpp, we get an
random segfault and if we replace it instead by a
connection.work_queue().add() everything comes back to normal.

I tried to think about what might be the source of the problem but i can't
find any ideas yet...

Best regards,
Rabih



On Fri, Apr 26, 2019 at 9:38 AM Cliff Jansen <cliffjan...@gmail.com> wrote:

> Hi Jeremy,
>
> I haven't had a chance to parse your overall description to arrive at
> a reproducer of your segfault problem.
>
> However, the DeliverySettleTest.cpp code you provided runs correctly.
> The client view of the delivery is settled via
>
>   d.accept();  // line 34
>
> which results in the disposition frame including "settled=true" sent
> to the server (class Broker).  This also means that the client locally
> has determined that (in AMQP terminology) it is now "safe to forget"
> the disposition.  No changes to the disposition are legal after that,
> and no callbacks about the disposition would be expected either.
>
> On the server side, in both on_tracker_accept() and
> on_tracker_settle() you would find that t.settled() is true, as you
> would expect.  The call to
>
>   t.settle();  // line 147 Broker.hpp
>
> is important to release memory since auto_settle has been turned off.
> But proton knows that the peer has settled and forgotten the delivery.
> So it sensibly declines to send a disposition frame (about a forgotten
> entity) that would be ignored by the client as per the AMQP spec.
>
> Note that the above explanation is C++ specific.  If you use lower
> level C primitives, it is possible to accept a delivery without
> settling it until later and get closer to what you appear to be
> striving for in this test.
>
> However, I can't help thinking that relying on two wire transfers,
> which could be arbitrarily slow, is the mechanism you want to pursue
> to avoid the (lifecycle?) segfault you are trying to address.
>
> Perhaps you could post the actual code with the segfault problem for
> me to look at.
>
> On Thu, Apr 25, 2019 at 6:42 AM jeremy <jeremyao...@gmail.com> wrote:
>
> > Hello,
> >
> > In the client API we're writing, we let a consumer consume messages
> > asynchronously, receiving a future of a delivery. In the receiver
> handler,
> > we keep track of deliveries in a list, as such:
> >
> >
> https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/include/proton/imperative/Receiver.hpp#L59
> > The user can then get the delivery and accept/reject/release it:
> >
> >
> https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/src/Delivery.cpp#L7
> >
> > The proton delivery operations (accept/reject/release) are asynchronous
> > (added on the work queue:
> >
> >
> https://github.com/apache/qpid-proton/blob/7c0a3387a5096d86541dbddfeb55f36eb0b85dd8/c/src/core/engine.c#L732
> > )
> > unless I'm mistaken. In the above delivery code, removing the delivery
> > object from the list as soon as we go out of scope
> > (
> >
> https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/src/Delivery.cpp#L11
> > )
> > results in a random segfault, since at the time the accept is called on
> the
> > delivery in the work queue, the delivery would have been already removed
> > from the list.
> > As a solution, we thought of implementing on_delivery_settle on the
> > receiver's side, and removing the delivery there. However, we noticed
> that
> > on_delivery_settle is never called (delivery mode set to none on both
> > sender
> > and receiver sides, sender auto_settle set to false, receiver auto ack
> set
> > to false). I tested with all delivery modes on both sides, and the
> > on_delivery_settle was never called. I attached the corresponding code.
> >
> > Got the following log:
> >
> > broker on_container_start
> > [0x1021a70]:  -> AMQP
> > [0x1021a70]:0 -> @open(16)
> > [container-id="b7cdef05-f195-4760-b262-655d538f0419",
> hostname="127.0.0.1",
> > channel-max=32767]
> > [0x7f719c0032d0]:  <- AMQP
> > [0x7f719c0032d0]:0 <- @open(16)
> > [container-id="b7cdef05-f195-4760-b262-655d538f0419",
> hostname="127.0.0.1",
> > channel-max=32767]
> > broker on_connection_open
> > [0x7f719c0032d0]:  -> AMQP
> > [0x7f719c0032d0]:0 -> @open(16)
> > [container-id="64a68948-0d32-4c19-89c7-9d62c408e248", channel-max=32767]
> > [0x1021a70]:  <- AMQP
> > [0x1021a70]:0 <- @open(16)
> > [container-id="64a68948-0d32-4c19-89c7-9d62c408e248", channel-max=32767]
> > [0x1021a70]:0 -> @begin(17) [next-outgoing-id=0,
> > incoming-window=2147483647,
> > outgoing-window=2147483647]
> > [0x1021a70]:0 -> @attach(18)
> [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25",
> > handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0,
> > source=@source(40) [address="examples", durable=0, timeout=0,
> > dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false],
> > initial-delivery-count=0, max-message-size=0]
> > [0x1021a70]:0 -> @flow(19) [incoming-window=2147483647,
> next-outgoing-id=0,
> > outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10,
> > drain=false]
> > [0x7f719c0032d0]:0 <- @begin(17) [next-outgoing-id=0,
> > incoming-window=2147483647, outgoing-window=2147483647]
> > [0x7f719c0032d0]:0 <- @attach(18)
> > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", handle=0, role=true,
> > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> > [address="examples", durable=0, timeout=0, dynamic=false],
> > target=@target(41) [durable=0, timeout=0, dynamic=false],
> > initial-delivery-count=0, max-message-size=0]
> > [0x7f719c0032d0]:0 <- @flow(19) [incoming-window=2147483647,
> > next-outgoing-id=0, outgoing-window=2147483647, handle=0,
> delivery-count=0,
> > link-credit=10, drain=false]
> > broker on_sendable
> > broker on_sendable
> > [0x7f719c0032d0]:0 -> @begin(17) [remote-channel=0, next-outgoing-id=0,
> > incoming-window=2147483647, outgoing-window=2147483647]
> > [0x7f719c0032d0]:0 -> @attach(18)
> > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", handle=0, role=false,
> > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
> > [address="examples", durable=0, timeout=0, dynamic=false],
> > target=@target(41) [durable=0, timeout=0, dynamic=false],
> > initial-delivery-count=0, max-message-size=0]
> > [0x7f719c0032d0]:0 -> @transfer(20) [handle=0, delivery-id=0,
> > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0] (23)
> > "\x00SpE\x00SsE\x00Sw\xa1\x0amy message"
> > broker on_sendable
> > [0x1021a70]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0,
> > incoming-window=2147483647, outgoing-window=2147483647]
> > [0x1021a70]:0 <- @attach(18)
> [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25",
> > handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0,
> > source=@source(40) [address="examples", durable=0, timeout=0,
> > dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false],
> > initial-delivery-count=0, max-message-size=0]
> > [0x1021a70]:0 <- @transfer(20) [handle=0, delivery-id=0,
> > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0] (23)
> > "\x00SpE\x00SsE\x00Sw\xa1\x0amy message"
> > my message
> > on_message settled:0
> > [0x1021a70]:0 -> @flow(19) [next-incoming-id=1,
> incoming-window=2147483647,
> > next-outgoing-id=0, outgoing-window=2147483647, handle=0,
> delivery-count=1,
> > link-credit=10, drain=false]
> > [0x1021a70]:0 -> @disposition(21) [role=true, first=0, settled=true,
> > state=@accepted(36) []]
> > [0x7f719c0032d0]:0 <- @flow(19) [next-incoming-id=1,
> > incoming-window=2147483647, next-outgoing-id=0,
> outgoing-window=2147483647,
> > handle=0, delivery-count=1, link-credit=10, drain=false]
> > [0x7f719c0032d0]:0 <- @disposition(21) [role=true, first=0, settled=true,
> > state=@accepted(36) []]
> > broker on_sendable
> > broker on_tracker_accept
> > broker on_tracker_settle
> >
> >
> > DeliverySettleTest.cpp
> > <http://qpid.2158936.n2.nabble.com/file/t396337/DeliverySettleTest.cpp>
> > Broker.hpp <http://qpid.2158936.n2.nabble.com/file/t396337/Broker.hpp>
> >
> > Thanks,
> > Jeremy
> >
> >
> >
> > -----
> > Cheers,
> > Jeremy
> > --
> > Sent from:
> > http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> > For additional commands, e-mail: users-h...@qpid.apache.org
> >
> >
>
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/session.hpp>
#include <proton/work_queue.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/connection_options.hpp>
#include <proton/session_options.hpp>
#include <proton/delivery.hpp>

#include <Broker.hpp>

#include <iostream>
#include <thread>
#include <future>
#include <string>

struct ReceiverHandler : public proton::messaging_handler {
    std::string conn_url_;
    std::string addr_;
    proton::connection m_connection;

    static const int max = 5;

    int m_countDownLatch;
    std::promise<void> m_messageReceived;
    std::queue<proton::delivery> m_deliveries;

    ReceiverHandler(const std::string& u, const std::string& a) :
        conn_url_(u), addr_(a), m_countDownLatch(max)
    {}

    void acceptDelivery() {
        /* !!!!!! with the following line we encounter random segfaults !!!!!!
        if we use: m_connection.work_queue().add([&]{ instead it will pass 
correctly */
        m_deliveries.front().work_queue().add([&]{
            m_deliveries.front().accept();
            m_deliveries.pop();
        });
    }

    void on_container_start(proton::container& c) override {
        c.connect(conn_url_);
    }

    void on_connection_open(proton::connection& c) override {
        c.open_receiver(addr_, 
proton::receiver_options().delivery_mode(proton::delivery_mode::NONE).auto_accept(false));
        m_connection = c;
    }

    void on_message(proton::delivery &d, proton::message &m) override {
        std::cout << m.body() << std::endl;
        m_deliveries.emplace(d);
        if(--m_countDownLatch == 0) {
            m_messageReceived.set_value();
        }
    }
};

int main() {
{
    try {
        const std::string url("//127.0.0.1:5676");
        const std::string dest("examples");
        Broker brk(url, dest);
        std::vector<proton::message> msgs(ReceiverHandler::max, 
proton::message("my message"));
        brk.injectMessages(msgs);

        ReceiverHandler hw(url, dest);

        auto protonContainerClosed = std::async(std::launch::async, [&] {
            proton::container(hw).run();
        });

        hw.m_messageReceived.get_future().get();

        for(int i = 0; i < ReceiverHandler::max; ++i) {
            hw.acceptDelivery();
        }

        hw.m_connection.work_queue().add([&]{
            hw.m_connection.close();
        });

        protonContainerClosed.get();

    } catch (const std::exception& e) {
        std::cout << e.what() << std::endl;
    }
    return 0;
}
#ifndef TEST_BROKER_HPP
#define TEST_BROKER_HPP

#include <proton/listener.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/listen_handler.hpp>

#include <proton/imperative/ThreadRAII.hpp>

#include <string>
#include <iostream>
#include <thread>
#include <future>

class listenerHandler : public proton::listen_handler
{
public:
   std::future<void> getStartedFuture()
   {
      return m_containerStarted.get_future();
   }

private:
   void on_open(proton::listener&) override
   {
      m_containerStarted.set_value();
   }

   void on_error(proton::listener&, const std::string& s) override
   {
      
m_containerStarted.set_exception(std::make_exception_ptr(std::runtime_error(s)));
   }

   std::promise<void> m_containerStarted;
};

class Broker : public proton::messaging_handler
{
public:
   Broker(const std::string& url, const std::string& destination)
      :m_url(url + "/" + destination)
   {
      m_brokerThread = std::thread([&]() {
         try
         {
            proton::container(*this).run();
         }
         catch (const std::exception& e) {
            std::cerr << "Broker threw exception: " << e.what() << std::endl;
         }});

      // Wait for the container to start
      m_listenerHandler.getStartedFuture().get();
   }

   ~Broker()
   {
      if (!m_isClosed) {
         m_listener.stop();
      }
   }

   void injectMessages(std::vector<proton::message> messages)
   {
      m_messages.insert(m_messages.end(), messages.begin(), messages.end());
   }

   int m_acceptedMsgs = 0;
   int m_rejectedMsgs = 0;
   int m_releasedMsgs = 0;

private:
   void on_container_start(proton::container &c) override
   {
      std::cout << "broker on_container_start" << std::endl;
      c.receiver_options(proton::receiver_options());
      m_listener = c.listen(m_url, m_listenerHandler);
   }

   void on_connection_open(proton::connection &c) override
   {
      std::cout << "broker on_connection_open" << std::endl;
      m_connection = c;
      c.open();
   }

   void on_connection_close(proton::connection&) override
   {
      std::cout << "broker on_connection_close" << std::endl;
      m_listener.stop();
   }

   void on_transport_error(proton::transport &t) override
   {
      std::cout << "broker on_transport_error" << std::endl;
      std::cerr << "Broker::on_transport_error: " << t.error().what() << 
std::endl;
      m_listener.stop();
   }

   void on_error(const proton::error_condition &c) override
   {
      std::cout << "broker on_error" << std::endl;
      std::cerr << "Broker::on_error: " << c.what() << std::endl;

      m_isClosed = true;
      m_listener.stop();
   }

   void on_message(proton::delivery& delivery, proton::message& message) 
override
   {
      std::cout << "broker on_message" << std::endl;
      m_currentDelivery = { message, delivery };
   }

   void sendMessages(proton::sender& sender)
   {
      size_t numberOfMessagesToSend = 
std::min(static_cast<size_t>(sender.credit()), m_messages.size());

      for (auto count = numberOfMessagesToSend; count > 0; --count)
      {
         auto message = m_messages.front();
         m_messages.pop_front();
         sender.send(message);
      }
   }

   void on_sendable(proton::sender& sender) override
   {
      std::cout << "broker on_sendable" << std::endl;
      sendMessages(sender);
   }

   void on_tracker_accept(proton::tracker& ) override
   {
      ++m_acceptedMsgs;
   }

   void on_tracker_reject(proton::tracker&) override
   {
      ++m_rejectedMsgs;
   }

   void on_tracker_release(proton::tracker&) override
   {
      ++m_releasedMsgs;
   }

private:
   std::string m_url;
   bool m_isClosed = false;
   proton::ThreadRAII m_brokerThread;
   std::deque<proton::message> m_messages;
   std::pair<proton::message, proton::delivery> m_currentDelivery;
   listenerHandler m_listenerHandler;

   proton::listener m_listener;
   proton::connection m_connection;
};

#endif
#0  0x00007f9948014d80 in ?? ()
#1  0x00007f994f370200 in pn_class_decref (clazz=0x7f9948004c80, 
object=0x7f994800efe0)
    at /proton/qpid-proton-0.27.1/c/src/core/object/object.c:91
#2  0x00000000004adb2c in ReceiverHandler::~ReceiverHandler() ()
#3  0x00000000004a5f7f in DeliveryTest_rabih_Test::TestBody() ()
#4  0x00000000004d406e in void 
testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, 
void>(testing::Test*, void (testing::Test::*)(), char const*) ()
#5  0x00000000004cea82 in void 
testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, 
void>(testing::Test*, void (testing::Test::*)(), char const*) ()
#6  0x00000000004b52b3 in testing::Test::Run() ()
#7  0x00000000004b5ab1 in testing::TestInfo::Run() ()
#8  0x00000000004b60ba in testing::TestCase::Run() ()
#9  0x00000000004bc7cd in testing::internal::UnitTestImpl::RunAllTests() ()
#10 0x00000000004d4ead in bool 
testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl,
 bool>(testing::internal::UnitTestImpl*, bool 
(testing::internal::UnitTestImpl::*)(), char const*) ()
#11 0x00000000004cf7b0 in bool 
testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl,
 bool>(testing::internal::UnitTestImpl*, bool 
(testing::internal::UnitTestImpl::*)(), char const*) ()
#12 0x00000000004bb55f in testing::UnitTest::Run() ()
#13 0x00000000004e2b00 in RUN_ALL_TESTS() ()
#14 0x00000000004e2a9b in main ()
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to