I dont think you have looked at multithreaded examples closely enough?
That's exactly what it says it's doing and looks to me to do (well, it
also has a 3rd non-container thread for processing received messages).

On Thu, 26 Nov 2020 at 14:30, Francesco Raviglione
<francescorav.es...@gmail.com> wrote:
>
> Dear Robbie,
> First of all, sorry for the very late reply.
>
> Thank you very much for your reply and for the references to the C++ Work
> Queues.
> I have been quite busy in the past days with other projects, but I will
> definitely look more in detail into Work Queues and their usage with Qpid
> Proton C++.
>
> Looking at the examples, however, they do not seem to tackle the case in
> which work is added to the work queue from an external, non-Qpid Proton,
> thread.
> I assume I will need to find a way to create the work queue from the sender
> with "&s.work_queue()", when the sender is opened (in "on_sender_open()",
> like in the examples), and then make the proton::work_queue object
> available outside the Qpid Proton class to be able to "inject" work from
> other external threads (would, maybe, making the "proton::work_queue"
> public work and, ensure, at the same time, thread safety?).
>
> Thank you very much,
> Francesco Raviglione
>
>
>
> Il giorno mer 18 nov 2020 alle ore 11:25 Robbie Gemmell <
> robbie.gemm...@gmail.com> ha scritto:
>
> > On Tue, 17 Nov 2020 at 19:59, Francesco Raviglione
> > <francescorav.es...@gmail.com> wrote:
> > >
> > > Dear all,
> > > I'm experiencing some issues in writing an AMQP client with Qpid Proton
> > C++.
> > > My client should only send messages to a particular queue on an ActiveMQ
> > > broker and it is not supposed to receive any message over that
> > connection.
> > > The client should not send the messages as soon as it is started (with
> > > "proton::container(AMQP_client).run();"), but it should wait for the data
> > > to be provided by an external thread, which may become available even
> > after
> > > some minutes (I cannot tell in advance when the data will be available,
> > and
> > > there may be even a long time between two consecutive chunks of available
> > > data).
> > >
> > > If I try to supply the AMQP client loop with new data through a pipe (on
> > > which data is written by the external thread), I can write an
> > "AMQP_client"
> > > class like the following:
> > >
> > > void AMQP_client::on_container_start(proton::container& c) {
> > >      c.connect(broker_address);
> > > }
> > >
> > > void AMQP_client::on_connection_open(proton::connection& c) {
> > >      c.open_sender(queue_name);
> > > }
> > >
> > > void AMQP_client::on_sendable(proton::sender &s) {
> > >      uint8_t buffer[1024];
> > >      int bufsize;
> > >      proton::message amqp_msg;
> > >
> > >      // Wait for new data to be sent (wait for data to be written on the
> > > pipe)
> > >      if((bufsize=read(pipe_read_end,&buffer,1024))==-1) {
> > >          perror("read() error");
> > >          return;
> > >      }
> > >
> > >      amqp_msg.body(proton::binary(buffer,buffer+bufsize));
> > >
> > >      s.send(amqp_msg);
> > > }
> > >
> > > In this case, however, "on_sendable" blocks on the read() operation and,
> > if
> > > the data becomes available few minutes after, the broker closes the
> > > connection as the client loop is completely blocked and cannot even send
> > > the heartbeat messages.
> > >
> >
> > Yes, as the container thread is also responsible for performing the
> > IO. By blocking it, you simply stop it doing anything at all for the
> > connection (and any others in the container), both processing of
> > [not-]arriving data and sending of any more, such as for heartbeats if
> > not actual messaging work. So when the thread is eventually unblocked,
> > its likely going to find either it needs to disconnect the peer for
> > not sending the client heartbeats (if requested to) or live traffic in
> > time to satisfy the clients timeout, or the client has itself already
> > been disconnected by the peer for not sending the peer heartbeats (if
> > requested to) or live traffic in time to satisfy the peers timeout
> > (the idle timeouts operate independently in each direction).
> >
> >
> > > If, instead, I do not block on the read() operation (for instance I
> > read()
> > > with a timeout, by using poll()), "on_sendable" is triggered only once
> > and
> > > I cannot find any other event to trigger the transmission of a message
> > when
> > > data becomes available.
> > >
> > > I know that, in Python, I could solve this issue for instance by relying
> > on
> > > "EventInjector", but I'm unable to find a similar solution with the C++
> > > version of the library (I would prefer to stick with C++, in this case,
> > and
> > > not to fall back to Qpid Proton C).
> > >
> > > Do you know how I can solve this problem? Is there a way to "inject"
> > > external aperiodic events/data to be sent via AMQP?
> > >
> >
> > Hopefully those with more/any clue about the C++ bits can hopefully
> > provide a better answer, but...
> >
> > I believe that is what
> >
> > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/api/classproton_1_1work__queue.html
> > is aimed at. An example with multiple threads using it is at
> >
> > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html
> > ,
> > and
> > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/scheduled_send.cpp.html
> > also makes use of it, though only from the single container thread
> > with some scheduling.
> >
> >
> >
> >
> > > Thank you very much in advance,
> > > Francesco Raviglione
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> > For additional commands, e-mail: users-h...@qpid.apache.org
> >
> >

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to