Hi Rabih,

Thank you for the concise test case.

I believe the problem is that you are using

  proton::delivery::work_queue()

from the exterior thread, and that call is not thread safe.

If you look at the multithreaded client examples, you will see that
the work_queue is acquired and saved into a member variable called
"work_queue_".  This is done in the Proton callback thread, where it
is thread safe.  The fact that the example is using an accessor also
called "work_queue()" perhaps distracts from its message that

  proton::work_queue::add()

is the thread safety mechanism.

I would also note that proton::connection::work_queue() has less
thread-unsafe moving parts, and perhaps very difficult to go awry in
the manner that you use it (as a counterexample in Main.cpp), but it
is thread-unsafe too.

Cliff

On Tue, Apr 30, 2019 at 4:55 AM Rabih M <rabih.prom...@gmail.com> wrote:
>
> Hello Cliff,
>
> After spending more time debugging, I think we found where the real problem 
> comes from:
> I attached a simplified use case in the Main.cpp, Broker.hpp.
> When we call delivery.work_queue().add() line 35 of Main.cpp, we get an 
> random segfault and if we replace it instead by a 
> connection.work_queue().add() everything comes back to normal.
>
> I tried to think about what might be the source of the problem but i can't 
> find any ideas yet...
>
> Best regards,
> Rabih
>
>
>
> On Fri, Apr 26, 2019 at 9:38 AM Cliff Jansen <cliffjan...@gmail.com> wrote:
>>
>> Hi Jeremy,
>>
>> I haven't had a chance to parse your overall description to arrive at
>> a reproducer of your segfault problem.
>>
>> However, the DeliverySettleTest.cpp code you provided runs correctly.
>> The client view of the delivery is settled via
>>
>>   d.accept();  // line 34
>>
>> which results in the disposition frame including "settled=true" sent
>> to the server (class Broker).  This also means that the client locally
>> has determined that (in AMQP terminology) it is now "safe to forget"
>> the disposition.  No changes to the disposition are legal after that,
>> and no callbacks about the disposition would be expected either.
>>
>> On the server side, in both on_tracker_accept() and
>> on_tracker_settle() you would find that t.settled() is true, as you
>> would expect.  The call to
>>
>>   t.settle();  // line 147 Broker.hpp
>>
>> is important to release memory since auto_settle has been turned off.
>> But proton knows that the peer has settled and forgotten the delivery.
>> So it sensibly declines to send a disposition frame (about a forgotten
>> entity) that would be ignored by the client as per the AMQP spec.
>>
>> Note that the above explanation is C++ specific.  If you use lower
>> level C primitives, it is possible to accept a delivery without
>> settling it until later and get closer to what you appear to be
>> striving for in this test.
>>
>> However, I can't help thinking that relying on two wire transfers,
>> which could be arbitrarily slow, is the mechanism you want to pursue
>> to avoid the (lifecycle?) segfault you are trying to address.
>>
>> Perhaps you could post the actual code with the segfault problem for
>> me to look at.
>>
>> On Thu, Apr 25, 2019 at 6:42 AM jeremy <jeremyao...@gmail.com> wrote:
>>
>> > Hello,
>> >
>> > In the client API we're writing, we let a consumer consume messages
>> > asynchronously, receiving a future of a delivery. In the receiver handler,
>> > we keep track of deliveries in a list, as such:
>> >
>> > https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/include/proton/imperative/Receiver.hpp#L59
>> > The user can then get the delivery and accept/reject/release it:
>> >
>> > https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/src/Delivery.cpp#L7
>> >
>> > The proton delivery operations (accept/reject/release) are asynchronous
>> > (added on the work queue:
>> >
>> > https://github.com/apache/qpid-proton/blob/7c0a3387a5096d86541dbddfeb55f36eb0b85dd8/c/src/core/engine.c#L732
>> > )
>> > unless I'm mistaken. In the above delivery code, removing the delivery
>> > object from the list as soon as we go out of scope
>> > (
>> > https://github.com/rabih-mourad/qpid-imperative-proton/blob/master/src/Delivery.cpp#L11
>> > )
>> > results in a random segfault, since at the time the accept is called on the
>> > delivery in the work queue, the delivery would have been already removed
>> > from the list.
>> > As a solution, we thought of implementing on_delivery_settle on the
>> > receiver's side, and removing the delivery there. However, we noticed that
>> > on_delivery_settle is never called (delivery mode set to none on both
>> > sender
>> > and receiver sides, sender auto_settle set to false, receiver auto ack set
>> > to false). I tested with all delivery modes on both sides, and the
>> > on_delivery_settle was never called. I attached the corresponding code.
>> >
>> > Got the following log:
>> >
>> > broker on_container_start
>> > [0x1021a70]:  -> AMQP
>> > [0x1021a70]:0 -> @open(16)
>> > [container-id="b7cdef05-f195-4760-b262-655d538f0419", hostname="127.0.0.1",
>> > channel-max=32767]
>> > [0x7f719c0032d0]:  <- AMQP
>> > [0x7f719c0032d0]:0 <- @open(16)
>> > [container-id="b7cdef05-f195-4760-b262-655d538f0419", hostname="127.0.0.1",
>> > channel-max=32767]
>> > broker on_connection_open
>> > [0x7f719c0032d0]:  -> AMQP
>> > [0x7f719c0032d0]:0 -> @open(16)
>> > [container-id="64a68948-0d32-4c19-89c7-9d62c408e248", channel-max=32767]
>> > [0x1021a70]:  <- AMQP
>> > [0x1021a70]:0 <- @open(16)
>> > [container-id="64a68948-0d32-4c19-89c7-9d62c408e248", channel-max=32767]
>> > [0x1021a70]:0 -> @begin(17) [next-outgoing-id=0,
>> > incoming-window=2147483647,
>> > outgoing-window=2147483647]
>> > [0x1021a70]:0 -> @attach(18) [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25",
>> > handle=0, role=true, snd-settle-mode=2, rcv-settle-mode=0,
>> > source=@source(40) [address="examples", durable=0, timeout=0,
>> > dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false],
>> > initial-delivery-count=0, max-message-size=0]
>> > [0x1021a70]:0 -> @flow(19) [incoming-window=2147483647, next-outgoing-id=0,
>> > outgoing-window=2147483647, handle=0, delivery-count=0, link-credit=10,
>> > drain=false]
>> > [0x7f719c0032d0]:0 <- @begin(17) [next-outgoing-id=0,
>> > incoming-window=2147483647, outgoing-window=2147483647]
>> > [0x7f719c0032d0]:0 <- @attach(18)
>> > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", handle=0, role=true,
>> > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
>> > [address="examples", durable=0, timeout=0, dynamic=false],
>> > target=@target(41) [durable=0, timeout=0, dynamic=false],
>> > initial-delivery-count=0, max-message-size=0]
>> > [0x7f719c0032d0]:0 <- @flow(19) [incoming-window=2147483647,
>> > next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=0,
>> > link-credit=10, drain=false]
>> > broker on_sendable
>> > broker on_sendable
>> > [0x7f719c0032d0]:0 -> @begin(17) [remote-channel=0, next-outgoing-id=0,
>> > incoming-window=2147483647, outgoing-window=2147483647]
>> > [0x7f719c0032d0]:0 -> @attach(18)
>> > [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25", handle=0, role=false,
>> > snd-settle-mode=2, rcv-settle-mode=0, source=@source(40)
>> > [address="examples", durable=0, timeout=0, dynamic=false],
>> > target=@target(41) [durable=0, timeout=0, dynamic=false],
>> > initial-delivery-count=0, max-message-size=0]
>> > [0x7f719c0032d0]:0 -> @transfer(20) [handle=0, delivery-id=0,
>> > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0] (23)
>> > "\x00SpE\x00SsE\x00Sw\xa1\x0amy message"
>> > broker on_sendable
>> > [0x1021a70]:0 <- @begin(17) [remote-channel=0, next-outgoing-id=0,
>> > incoming-window=2147483647, outgoing-window=2147483647]
>> > [0x1021a70]:0 <- @attach(18) [name="aacbdc5d-0b8e-474c-a9e7-bf0c5a1bbd25",
>> > handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0,
>> > source=@source(40) [address="examples", durable=0, timeout=0,
>> > dynamic=false], target=@target(41) [durable=0, timeout=0, dynamic=false],
>> > initial-delivery-count=0, max-message-size=0]
>> > [0x1021a70]:0 <- @transfer(20) [handle=0, delivery-id=0,
>> > delivery-tag=b"\x01\x00\x00\x00\x00\x00\x00\x00", message-format=0] (23)
>> > "\x00SpE\x00SsE\x00Sw\xa1\x0amy message"
>> > my message
>> > on_message settled:0
>> > [0x1021a70]:0 -> @flow(19) [next-incoming-id=1, incoming-window=2147483647,
>> > next-outgoing-id=0, outgoing-window=2147483647, handle=0, delivery-count=1,
>> > link-credit=10, drain=false]
>> > [0x1021a70]:0 -> @disposition(21) [role=true, first=0, settled=true,
>> > state=@accepted(36) []]
>> > [0x7f719c0032d0]:0 <- @flow(19) [next-incoming-id=1,
>> > incoming-window=2147483647, next-outgoing-id=0, outgoing-window=2147483647,
>> > handle=0, delivery-count=1, link-credit=10, drain=false]
>> > [0x7f719c0032d0]:0 <- @disposition(21) [role=true, first=0, settled=true,
>> > state=@accepted(36) []]
>> > broker on_sendable
>> > broker on_tracker_accept
>> > broker on_tracker_settle
>> >
>> >
>> > DeliverySettleTest.cpp
>> > <http://qpid.2158936.n2.nabble.com/file/t396337/DeliverySettleTest.cpp>
>> > Broker.hpp <http://qpid.2158936.n2.nabble.com/file/t396337/Broker.hpp>
>> >
>> > Thanks,
>> > Jeremy
>> >
>> >
>> >
>> > -----
>> > Cheers,
>> > Jeremy
>> > --
>> > Sent from:
>> > http://qpid.2158936.n2.nabble.com/Apache-Qpid-users-f2158936.html
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
>> > For additional commands, e-mail: users-h...@qpid.apache.org
>> >
>> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> For additional commands, e-mail: users-h...@qpid.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to