In case it's still useful, an example of creating an anonymous sender (used here for the response part of a request/response interaction):
https://github.com/amqphub/equipage/blob/master/qpid-proton-python/respond.py#L44 (define the anonymous sender) https://github.com/amqphub/equipage/blob/master/qpid-proton-python/respond.py#L62 (set the address on the message before sending) I have two other thoughts, basically extensions of what Robbie said: - You might try constraining the queue of events you use for inter-thread communication. You could do that by having a size-limited queue for the messages to send, and then use the ApplicationEvent to tell the other thread that "there are some messages to send". - There are some known performance issues when Proton's internal data structures get large. That could happen in the scenario you are describing. If this is the case, it will help a lot to process smaller batches of data. https://issues.apache.org/jira/browse/PROTON-659 https://issues.apache.org/jira/browse/PROTON-2151 On Thu, Jul 30, 2020 at 8:49 AM Francesco Raviglione < [email protected]> wrote: > Dear Robbie, > I'm very sorry for the late reply, but I had a few very busy days at work. > > Thank you very much for your reply. > > I will try to narrow down the issue and perform more tests, looking, > especially, if "high latency" peaks are always followed by "low latency" > measurements, as if transfers were batched. > In general, from what I observed so far, the "high latency" was typically > assuming the form of "peaks" during the execution of each test with a > producer and a consumer, more or less suddenly followed by "normal", lower, > latency values, but more investigation could be surely useful to try to > better understand the problem. > > Thank you very much also for the information about the possibility > of opening a single "anonymous" sender. I wasn't aware about this feature, > which I will surely test soon. > If the broker I'm using is supporting it (but I really think so) and > everything is working well in my use case, I will update and optimize my > code by relying on a single sender. > > Thank you very much, > Francesco > > > Il giorno lun 20 lug 2020 alle ore 12:23 Robbie Gemmell < > [email protected]> ha scritto: > > > I'd wonder whether if you are piling up lots of events for different > > senders then it's potentially (again, I haven't used the Injector > > bits, and Python isn't particularly my thing) just that many such > > events are all piled up, and all processed before the resulting IO can > > then be, effectively batching things up. That might be observed as > > 'higher latency' for the earlier sends, tapering to 'lower latency' > > for the later ones in a sequence processed together. When you switch > > to using a BlockingConnection, you presumably change it to processing > > a single case at a time in full and see 'lower latency', by virtue of > > doing and measuring rather different things. > > > > Or it could be something completely different. Or there could be some > > problem/issue, potentially even on the server too, hard to speculate. > > If you think there is one, try to narrow it down and supply a > > reproducer someone could look at. > > > > As an aside: rather than having hundreds of senders, if the server you > > are using supports it and your overall use case suits it, you can also > > open a single sender to the 'anonymous relay' by omitting an address > > when creating it (I believe by using None in Python) and then have > > each message specifically carry its destination address (I believe > > passing address=<value> when creating the Message object in Python) > > > > Robbie > > > > On Fri, 17 Jul 2020 at 19:21, Francesco Raviglione > > <[email protected]> wrote: > > > > > > Dear Robbie, dear Gordon, > > > Thank you very much for your replies and for your very useful and > helpful > > > suggestions. > > > Looking better at the "db_send.py" and "tx_recv_interactive.py" > > examples, I > > > was finally able to code a working example by using the same loop as > > > before, but adding an EventInjector with: > > > data_event=EventInjector() > > > amqp_container=Container(AMQPHandler(arg1,arg2)) > > > amqp_container.selectable(ue_data_event) > > > > > > Then, by defining a new event inside the event loop (the AMQPHandler > > class): > > > def on_data_event: > > > <tab> # Manage event... > > > > > > And then by "provoking" the event from another loop, which waits from > > data > > > from a pipe and calls "data_event.trigger()" when new data is available > > to > > > be sent via AMQP: > > > while True: > > > <tab># Read from pipe > > > <tab>data_to_be_sent_via_AMQP=pipe_end.recv() # blocking recv() > > > <tab># ... > > > > > > <tab>data_event.trigger(ApplicationEvent("data_event",subject=data_to_be_sent_via_AMQP)) > > > > > > Everything seems to work fine now. > > > > > > I noticed that, however, especially when managing more senders (even > > quite > > > a lot of them, reaching > 100) inside the same event loop (inside the > > same > > > AMQPHandler class, using the same connection), the per-message delay > > > between when data is produced and when it can be consumed/it is > > > successfully consumed by an external consumer, is significantly higher, > > > sometimes, that what I can achieve with a Python's BlockingConnection. > > > In particular, sometimes it is even higher than 2 or 3 seconds. > > > Is this due to the event loop being "too busy" managing a lot of > senders, > > > and thus delaying the actual transmission of data? > > > > > > As I needed to write a client application trying to ensure the lowest > > > producer-to-consumer latency possible, I ended up in trying another > (much > > > less regular) solution, with BlockingConnection(), that I just reported > > as > > > a reply to the other thread ( > > > > > > http://qpid.2158936.n2.nabble.com/Getting-local-idle-timeout-expired-on-blocking-connection-possible-bug-td7692985.html > > > ). > > > > > > Thank you very much again, > > > Francesco > > > > > > Il giorno ven 17 lug 2020 alle ore 12:14 Robbie Gemmell < > > > [email protected]> ha scritto: > > > > > > > On Thu, 16 Jul 2020 at 14:36, Francesco Raviglione > > > > <[email protected]> wrote: > > > > > > > > > > Dear Adrian, > > > > > Thank you very much for your reply. > > > > > > > > > > I tried following your suggestion, by attempting to find a way to > > call an > > > > > external function reading the data from an external source, and > > returning > > > > > it when available. > > > > > > > > > > However, unfortunately, it seems to be slightly easier to control > the > > > > > behaviour of the event loop in C than in Pyhton. > > > > > In particular, in Python, if I call any blocking function (even a > > > > > time.sleep(0.1), sleeping for just 100 ms) inside "on_sendable", > the > > > > > messages are only buffered and never sent (do you know why? Is the > > > > > "on_sendable" handler supposed to be executed as fast as possible > > without > > > > > blocking to have the messages being sent immediately?). > > > > > > > > There is only one thread, it is used for running the callbacks and > > > > performing the IO. Anynew IO is processed after the handlers return, > > > > so if you block the handlers, you block the IO thread, meaning it > > > > can't do any work and thus cant send anything. > > > > > > > > > If, instead, I just do not block and send the data only if > available > > (for > > > > > instance by implementing a mechanism equivalent to a non-blocking > > read > > > > from > > > > > a pipe, with poll(), letting "on_sendable" finish without sending > > > > anything, > > > > > if there is no data available), only one "on_sendable" event is > > generated > > > > > and the event loop does not seem to return any other event which I > > could > > > > > use to send the data, which may be now available (for example, the > > > > > equivalent of C's PN_DELIVERY does not seem to exist). > > > > > > > > > > > > > The on_sendable is generated when credit to send initially arrives > > > > from the server, or does again later. I believe it's also > > > > 'regenerated' locally when you send, thus using some of the credit > (so > > > > long as some credit remains). As neither occurred, no more > on_sendable > > > > calls. > > > > > > > > I believe the EventInjector was the intended route for things like > > > > this, performing/provoking work from outwith the container thread: > > > > > > > > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/docs/proton.reactor.html#proton.reactor.EventInjector > > > > > > > > Its used in these examples: > > > > > > > > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_send.py.html > > > > > > > > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_recv.py.html > > > > > > > > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/tx_recv_interactive.py.html > > > > > > > > You can also provoke your own callbacks by scheduling, so an > > > > alternative might be scheduling (while using the container thread, > not > > > > your own) a task of your own to periodically run. Example: > > > > > > > > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/recurring_timer.py.html > > > > > > > > > > > > > Do you know if there are other ways to solve this issue? > > > > > When you were "yielding" the control of the main loop to an > external > > > > > function, was it blocking in some way, waiting for data to be > > available? > > > > > > > > > > Thank you very much, > > > > > Francesco > > > > > > > > > > > > > > > Il giorno mer 15 lug 2020 alle ore 18:40 Adrian Florea < > > > > [email protected]> > > > > > ha scritto: > > > > > > > > > > > Hi Francesco, > > > > > > > > > > > > I achieved similar behavior but using Qpid Proton-C (AMQP > send-only > > > > > > program). > > > > > > > > > > > > After initialiazing proton and getting the first FLOW event, I > > simply > > > > > > "yield" main event loop control to an external/looping function > > that > > > > can > > > > > > read from a different source and then send the same message to an > > AMQP > > > > > > destination. > > > > > > > > > > > > It may seem hard in the beggining but once you get a hold on how > to > > > > > > integrate proton events loop into your program loop, it has good > > > > chances to > > > > > > work. > > > > > > > > > > > > Adrian > > > > > > > > > > > > > > > > > > On Wed, Jul 15, 2020, 11:22 AM Francesco Raviglione < > > > > > > [email protected]> wrote: > > > > > > > > > > > > > Dear all, > > > > > > > I'm trying to use the Python version of Qpid Proton to send > data > > to > > > > an > > > > > > AMQP > > > > > > > 1.0 broker. > > > > > > > I've seen that several examples are sending data which was > > > > > > > available/defined before the AMQP event loop is started. > > > > > > > However, I need to send data only when it becomes available > from > > an > > > > > > > external process (for example by waiting for it on a Pipe, with > > > > > > > self.amqp_pipe_end.recv()), and then send it immediately to the > > > > broker. > > > > > > > If I try to wait on a pipe inside "on_sendable", no message is > > > > actually > > > > > > > transferred to the broker, as the event loop is "blocked" > waiting > > > > for new > > > > > > > data and cannot manage properly the AMQP 1.0 connection (is > this > > > > > > correct?). > > > > > > > How could I achieve the desired result? How can I make the loop > > wait > > > > for > > > > > > > external data and then immediately send it? > > > > > > > > > > > > > > Thank you very much in advance, > > > > > > > Francesco Raviglione > > > > > > > > > > > > > > > --------------------------------------------------------------------- > > > > To unsubscribe, e-mail: [email protected] > > > > For additional commands, e-mail: [email protected] > > > > > > > > > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: [email protected] > > For additional commands, e-mail: [email protected] > > > > >
