Hi Steve, Have you looked at the Python Proton examples? There is a server.py in there that does pretty much the same thing that you are trying to do here. It might be helpful.
The error you are getting is a "link exception" with no description. I suspect there's some issue with the sending queue on whatever intermediary you are using for this task. Typically, if you are building a server that sends responses to requests, you will use the reply-to header in the request as the destination for the response. You would also copy the correlation-id header from the request to the response. Another possible issue: You are calling sender.send with "msg_bytes" and not a Message object. I don't know what msg_bytes is, but I think it needs a send() method for it to work. Maybe you meant sender.send(Message(body=msg_bytes)). -Ted On Tue, Dec 31, 2024 at 6:37 PM Steve Huston <shus...@riverace.com> wrote: > I am trying to write a python program that can receive a message, form a > response, and send it back. I am getting errors on sending the response. > Can someone help me see what I have wrong? The error appears to be -1 with > no explanation. I'm hoping I just have some misunderstanding of how this is > supposed to work. > > Thank you, > -Steve > > The code is: > > import logging > > from proton.handlers import MessagingHandler > from proton.reactor import Container > > from emp import EMP_msg > from transport_echo_request import TransportEchoRequest > from transport_echo_response import TransportEchoResponse > > logger = logging.getLogger("ricochet") > > class Plugin(MessagingHandler): > def __init__(self): > super(Plugin, self).__init__() > > def setup(self, config): > self.broker = config['broker-url'] > self.recv_queue = config['recv-queue'] > self.send_queue = config['send-queue'] > self.message_to_send = None > > def on_start(self, event): > conn = event.container.connect(self.broker) > event.container.create_receiver(conn, self.recv_queue) > self.sender = event.container.create_sender(conn, self.send_queue) > logger.info("sender %r", self.sender) > > def on_message(self, event): > bytes = event.message.body # Should be transport echo request - > translate? > transport = > event.message.properties['TransportUsed'].tobytes().decode("utf-8") > logger.debug("body: %s", bytes.hex()) > > msg = EMP_msg() > ret = msg.decode(bytes) > logger.debug("EMP header decode: %s", ret) > if msg.msg_type_id != 4000 or msg.msg_ver != 1: > logger.error("Received message type %dv%d not supported.", > msg.msg_type_id, msg.msg_ver) > return > request = TransportEchoRequest() > try: > request.decode(bytes) > request.decode_body() > except Exception as e: > logger.error("Decoding transport echo request: %s", e) > return > self.on_transport_echo_request(request, transport) > self.release(event.delivery) > > def on_sendable(self, event): > logger.info("Sendable.") > if self.message_to_send: > event.sender.send(self.message_to_send.msg_bytes) > self.message_to_send = None > return super().on_sendable(event) > > # These are local calls, not framework callbacks as the above are. > def on_transport_echo_request(self, request, transport): > logger.info("transport_echo from %s np %d on %s", > request.src_addr, request.QoS_net_pref, transport) > response = TransportEchoResponse(transport, request) > response.encode_body() > response.encode() > self.sender.send(response.msg_bytes) > #self.message_to_send = response > #self.sender.offered(1) > > When I run this (and the other side that initiates a request) I get: > > INFO:ricochet:starting > INFO:ricochet:sender <proton._endpoints.Sender 0x10ff88c20 ~ > 0x7fe6b88530a0> > INFO:proton:Connecting to Url('amqp://localhost:8000')... > INFO:proton:Connected to localhost > INFO:ricochet:Sendable. > INFO:ricochet:transport_echo from round-trip np 0 on att > Traceback (most recent call last): > File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 87, in > <module> > main() > ~~~~^^ > File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 83, in > main > asyncio.run(ricochet.run()) > ~~~~~~~~~~~^^^^^^^^^^^^^^^^ > File > "/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", > line 194, in run > return runner.run(main) > ~~~~~~~~~~^^^^^^ > File > "/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", > line 118, in run > return self._loop.run_until_complete(task) > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^ > File > "/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", > line 721, in run_until_complete > return future.result() > ~~~~~~~~~~~~~^^ > File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 76, in > run > await asyncio.to_thread(self.run_in_thread()) > ~~~~~~~~~~~~~~~~~~^^ > File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 73, in > run_in_thread > Container(self).run() > ~~~~~~~~~~~~~~~~~~~^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_reactor.py", > line 197, in run > while self.process(): > ~~~~~~~~~~~~^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_reactor.py", > line 260, in process > event.dispatch(handler) > ~~~~~~~~~~~~~~^^^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line > 160, in dispatch > self.dispatch(h, type) > ~~~~~~~~~~~~~^^^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line > 160, in dispatch > self.dispatch(h, type) > ~~~~~~~~~~~~~^^^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line > 157, in dispatch > _dispatch(handler, type.method, self) > ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line > 128, in _dispatch > m(*args) > ~^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_handlers.py", > line 256, in on_delivery > self.on_message(event) > ~~~~~~~~~~~~~~~^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_handlers.py", > line 281, in on_message > _dispatch(self.delegate, 'on_message', event) > ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line > 128, in _dispatch > m(*args) > ~^^^^^^^ > File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 52, in > on_message > self.on_transport_echo_request(request, transport) > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^ > File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 68, in > on_transport_echo_request > self.sender.send(response.msg_bytes) > ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", > line 1180, in send > return self.stream(obj) > ~~~~~~~~~~~^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", > line 1163, in stream > return self._check(pn_link_send(self._impl, data)) > ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File > "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", > line 739, in _check > raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl)))) > proton._exceptions.LinkException: [-1]: None > > The sender code is below. It does send the first message, but never gets a > response - the above is supposed to send it but the send fails. > > # Send one transport echo request and wait for a response > > import logging > from proton import Message > from proton.handlers import MessagingHandler > from proton.reactor import Container > > from transport_echo_request import TransportEchoRequest > from transport_echo_response import TransportEchoResponse > import emp > > logger = logging.getLogger("round_trip") > > class RoundTrip(MessagingHandler): > def __init__(self, server, to_ricochet_address, from_ricochet_address): > super(RoundTrip, self).__init__() > self.server = server > self.send_address = to_ricochet_address > self.recv_address = from_ricochet_address > logger.info("Setting up - broker %s", server) > > def on_start(self, event): > logger.debug("on_start") > conn = event.container.connect(self.server) > event.container.create_receiver(conn, self.recv_address) > event.container.create_sender(conn, self.send_address) > > def on_sendable(self, event): > logger.debug("on_sendable") > req = TransportEchoRequest() > req.data_integrity_flag = emp.EMP_INTEG_NONE > req.src_addr = "round-trip" > req.dest_addr = "ricochet" > req.guid = bytes(16) > req.encode_body() > req_bytes = req.encode() > m = Message(memoryview(req_bytes)) > # The 'TransportUsed' property type was discovered by > experimentation - use itcm-ping to > # send a message to the ricochet plugin and print the class of the > value. > m.properties = { 'TransportUsed': memoryview(b'att') } > event.sender.send(m) > # event.sender.close() > > def on_message(self, event): > logger.debug("on_message") > msg = emp.EMP_msg() > msg.decode(event.message.body) > logger.debug("msg type %d, vers %d", msg.msg_type_id, msg.msg_ver) > #assert 4001 == msg.msg_type_id > #assert 1 == msg.msg_ver > resp = TransportEchoResponse() > resp.decode(event.message.body) > resp.decode_body() > print("Response from ", resp.src_addr, " to ", resp.dest_addr, " > used ", resp.sending_transport) > event.receiver.close() > event.connection.close() > > logging.basicConfig(level=logging.DEBUG) > Container(RoundTrip("localhost:8000", "in", "out")).run() > > --------------------------------------------------------------------- > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > For additional commands, e-mail: users-h...@qpid.apache.org > >