In case it's still useful, an example of creating an anonymous sender (used
here for the response part of a request/response interaction):

https://github.com/amqphub/equipage/blob/master/qpid-proton-python/respond.py#L44
(define
the anonymous sender)
https://github.com/amqphub/equipage/blob/master/qpid-proton-python/respond.py#L62
(set
the address on the message before sending)

I have two other thoughts, basically extensions of what Robbie said:

- You might try constraining the queue of events you use for inter-thread
communication.  You could do that by having a size-limited queue for the
messages to send, and then use the ApplicationEvent to tell the other
thread that "there are some messages to send".

- There are some known performance issues when Proton's internal data
structures get large.  That could happen in the scenario you are
describing.  If this is the case, it will help a lot to process smaller
batches of data.

https://issues.apache.org/jira/browse/PROTON-659
https://issues.apache.org/jira/browse/PROTON-2151


On Thu, Jul 30, 2020 at 8:49 AM Francesco Raviglione <
[email protected]> wrote:

> Dear Robbie,
> I'm very sorry for the late reply, but I had a few very busy days at work.
>
> Thank you very much for your reply.
>
> I will try to narrow down the issue and perform more tests, looking,
> especially, if "high latency" peaks are always followed by "low latency"
> measurements, as if transfers were batched.
> In general, from what I observed so far, the "high latency" was typically
> assuming the form of "peaks" during the execution of each test with a
> producer and a consumer, more or less suddenly followed by "normal", lower,
> latency values, but more investigation could be surely useful to try to
> better understand the problem.
>
> Thank you very much also for the information about the possibility
> of opening a single "anonymous" sender. I wasn't aware about this feature,
> which I will surely test soon.
> If the broker I'm using is supporting it (but I really think so) and
> everything is working well in my use case, I will update and optimize my
> code by relying on a single sender.
>
> Thank you very much,
> Francesco
>
>
> Il giorno lun 20 lug 2020 alle ore 12:23 Robbie Gemmell <
> [email protected]> ha scritto:
>
> > I'd wonder whether if you are piling up lots of events for different
> > senders then it's potentially (again, I haven't used the Injector
> > bits, and Python isn't particularly my thing) just that many such
> > events are all piled up, and all processed before the resulting IO can
> > then be, effectively batching things up. That might be observed as
> > 'higher latency' for the earlier sends, tapering to 'lower latency'
> > for the later ones in a sequence processed together. When you switch
> > to using a BlockingConnection, you presumably change it to processing
> > a single case at a time in full and see 'lower latency', by virtue of
> > doing and measuring rather different things.
> >
> > Or it could be something completely different. Or there could be some
> > problem/issue, potentially even on the server too, hard to speculate.
> > If you think there is one, try to narrow it down and supply a
> > reproducer someone could look at.
> >
> > As an aside: rather than having hundreds of senders, if the server you
> > are using supports it and your overall use case suits it, you can also
> > open a single sender to the 'anonymous relay' by omitting an address
> > when creating it (I believe by using None in Python) and then have
> > each message specifically carry its destination address (I believe
> > passing address=<value> when creating the Message object in Python)
> >
> > Robbie
> >
> > On Fri, 17 Jul 2020 at 19:21, Francesco Raviglione
> > <[email protected]> wrote:
> > >
> > > Dear Robbie, dear Gordon,
> > > Thank you very much for your replies and for your very useful and
> helpful
> > > suggestions.
> > > Looking better at the "db_send.py" and "tx_recv_interactive.py"
> > examples, I
> > > was finally able to code a working example by using the same loop as
> > > before, but adding an EventInjector with:
> > > data_event=EventInjector()
> > > amqp_container=Container(AMQPHandler(arg1,arg2))
> > > amqp_container.selectable(ue_data_event)
> > >
> > > Then, by defining a new event inside the event loop (the AMQPHandler
> > class):
> > > def on_data_event:
> > > <tab> # Manage event...
> > >
> > > And then by "provoking" the event from another loop, which waits from
> > data
> > > from a pipe and calls "data_event.trigger()" when new data is available
> > to
> > > be sent via AMQP:
> > > while True:
> > > <tab># Read from pipe
> > > <tab>data_to_be_sent_via_AMQP=pipe_end.recv() # blocking recv()
> > > <tab># ...
> > >
> >
> <tab>data_event.trigger(ApplicationEvent("data_event",subject=data_to_be_sent_via_AMQP))
> > >
> > > Everything seems to work fine now.
> > >
> > > I noticed that, however, especially when managing more senders (even
> > quite
> > > a lot of them, reaching > 100) inside the same event loop (inside the
> > same
> > > AMQPHandler class, using the same connection), the per-message delay
> > > between when data is produced and when it can be consumed/it is
> > > successfully consumed by an external consumer, is significantly higher,
> > > sometimes, that what I can achieve with a Python's BlockingConnection.
> > > In particular, sometimes it is even higher than 2 or 3 seconds.
> > > Is this due to the event loop being "too busy" managing a lot of
> senders,
> > > and thus delaying the actual transmission of data?
> > >
> > > As I needed to write a client application trying to ensure the lowest
> > > producer-to-consumer latency possible, I ended up in trying another
> (much
> > > less regular) solution, with BlockingConnection(), that I just reported
> > as
> > > a reply to the other thread (
> > >
> >
> http://qpid.2158936.n2.nabble.com/Getting-local-idle-timeout-expired-on-blocking-connection-possible-bug-td7692985.html
> > > ).
> > >
> > > Thank you very much again,
> > > Francesco
> > >
> > > Il giorno ven 17 lug 2020 alle ore 12:14 Robbie Gemmell <
> > > [email protected]> ha scritto:
> > >
> > > > On Thu, 16 Jul 2020 at 14:36, Francesco Raviglione
> > > > <[email protected]> wrote:
> > > > >
> > > > > Dear Adrian,
> > > > > Thank you very much for your reply.
> > > > >
> > > > > I tried following your suggestion, by attempting to find a way to
> > call an
> > > > > external function reading the data from an external source, and
> > returning
> > > > > it when available.
> > > > >
> > > > > However, unfortunately, it seems to be slightly easier to control
> the
> > > > > behaviour of the event loop in C than in Pyhton.
> > > > > In particular, in Python, if I call any blocking function (even a
> > > > > time.sleep(0.1), sleeping for just 100 ms) inside "on_sendable",
> the
> > > > > messages are only buffered and never sent (do you know why? Is the
> > > > > "on_sendable" handler supposed to be executed as fast as possible
> > without
> > > > > blocking to have the messages being sent immediately?).
> > > >
> > > > There is only one thread, it is used for running the callbacks and
> > > > performing the IO. Anynew IO is processed after the handlers return,
> > > > so if you block the handlers, you block the IO thread, meaning it
> > > > can't do any work and thus cant send anything.
> > > >
> > > > > If, instead, I just do not block and send the data only if
> available
> > (for
> > > > > instance by implementing a mechanism equivalent to a non-blocking
> > read
> > > > from
> > > > > a pipe, with poll(), letting "on_sendable" finish without sending
> > > > anything,
> > > > > if there is no data available), only one "on_sendable" event is
> > generated
> > > > > and the event loop does not seem to return any other event which I
> > could
> > > > > use to send the data, which may be now available (for example, the
> > > > > equivalent of C's PN_DELIVERY does not seem to exist).
> > > > >
> > > >
> > > > The on_sendable is generated when credit to send initially arrives
> > > > from the server, or does again later. I believe it's also
> > > > 'regenerated' locally when you send, thus using some of the credit
> (so
> > > > long as some credit remains). As neither occurred, no more
> on_sendable
> > > > calls.
> > > >
> > > > I believe the EventInjector was the intended route for things like
> > > > this, performing/provoking work from outwith the container thread:
> > > >
> > > >
> >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/docs/proton.reactor.html#proton.reactor.EventInjector
> > > >
> > > > Its used in these examples:
> > > >
> > > >
> >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_send.py.html
> > > >
> > > >
> >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_recv.py.html
> > > >
> > > >
> >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/tx_recv_interactive.py.html
> > > >
> > > > You can also provoke your own callbacks by scheduling, so an
> > > > alternative might be scheduling (while using the container thread,
> not
> > > > your own) a task of your own to periodically run. Example:
> > > >
> > > >
> >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/recurring_timer.py.html
> > > >
> > > >
> > > > > Do you know if there are other ways to solve this issue?
> > > > > When you were "yielding" the control of the main loop to an
> external
> > > > > function, was it blocking in some way, waiting for data to be
> > available?
> > > > >
> > > > > Thank you very much,
> > > > > Francesco
> > > > >
> > > > >
> > > > > Il giorno mer 15 lug 2020 alle ore 18:40 Adrian Florea <
> > > > [email protected]>
> > > > > ha scritto:
> > > > >
> > > > > > Hi Francesco,
> > > > > >
> > > > > > I achieved similar behavior but using Qpid Proton-C (AMQP
> send-only
> > > > > > program).
> > > > > >
> > > > > > After initialiazing proton and getting the first FLOW event, I
> > simply
> > > > > > "yield" main event loop control to an external/looping function
> > that
> > > > can
> > > > > > read from a different source and then send the same message to an
> > AMQP
> > > > > > destination.
> > > > > >
> > > > > > It may seem hard in the beggining but once you get a hold on how
> to
> > > > > > integrate proton events loop into your program loop, it has good
> > > > chances to
> > > > > > work.
> > > > > >
> > > > > > Adrian
> > > > > >
> > > > > >
> > > > > > On Wed, Jul 15, 2020, 11:22 AM Francesco Raviglione <
> > > > > > [email protected]> wrote:
> > > > > >
> > > > > > > Dear all,
> > > > > > > I'm trying to use the Python version of Qpid Proton to send
> data
> > to
> > > > an
> > > > > > AMQP
> > > > > > > 1.0 broker.
> > > > > > > I've seen that several examples are sending data which was
> > > > > > > available/defined before the AMQP event loop is started.
> > > > > > > However, I need to send data only when it becomes available
> from
> > an
> > > > > > > external process (for example by waiting for it on a Pipe, with
> > > > > > > self.amqp_pipe_end.recv()), and then send it immediately to the
> > > > broker.
> > > > > > > If I try to wait on a pipe inside "on_sendable", no message is
> > > > actually
> > > > > > > transferred to the broker, as the event loop is "blocked"
> waiting
> > > > for new
> > > > > > > data and cannot manage properly the AMQP 1.0 connection (is
> this
> > > > > > correct?).
> > > > > > > How could I achieve the desired result? How can I make the loop
> > wait
> > > > for
> > > > > > > external data and then immediately send it?
> > > > > > >
> > > > > > > Thank you very much in advance,
> > > > > > > Francesco Raviglione
> > > > > > >
> > > >
> > > > ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail: [email protected]
> > > > For additional commands, e-mail: [email protected]
> > > >
> > > >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]
> >
> >
>

Reply via email to