Hi Steve,

Have you looked at the Python Proton examples?  There is a server.py in
there that does pretty much the same thing that you are trying to do here.
It might be helpful.

The error you are getting is a "link exception" with no description.  I
suspect there's some issue with the sending queue on whatever intermediary
you are using for this task.

Typically, if you are building a server that sends responses to requests,
you will use the reply-to header in the request as the destination for the
response.  You would also copy the correlation-id header from the request
to the response.

Another possible issue:  You are calling sender.send with "msg_bytes" and
not a Message object.  I don't know what msg_bytes is, but I think it needs
a send() method for it to work.   Maybe you meant
sender.send(Message(body=msg_bytes)).

-Ted

On Tue, Dec 31, 2024 at 6:37 PM Steve Huston <shus...@riverace.com> wrote:

> I am trying to write a python program that can receive a message, form a
> response, and send it back. I am getting errors on sending the response.
> Can someone help me see what I have wrong? The error appears to be -1 with
> no explanation. I'm hoping I just have some misunderstanding of how this is
> supposed to work.
>
> Thank you,
> -Steve
>
> The code is:
>
> import logging
>
> from proton.handlers import MessagingHandler
> from proton.reactor import Container
>
> from emp import EMP_msg
> from transport_echo_request import TransportEchoRequest
> from transport_echo_response import TransportEchoResponse
>
> logger = logging.getLogger("ricochet")
>
> class Plugin(MessagingHandler):
>     def __init__(self):
>         super(Plugin, self).__init__()
>
>     def setup(self, config):
>         self.broker = config['broker-url']
>         self.recv_queue = config['recv-queue']
>         self.send_queue = config['send-queue']
>         self.message_to_send = None
>
>     def on_start(self, event):
>         conn = event.container.connect(self.broker)
>         event.container.create_receiver(conn, self.recv_queue)
>         self.sender = event.container.create_sender(conn, self.send_queue)
>         logger.info("sender %r", self.sender)
>
>     def on_message(self, event):
>         bytes = event.message.body   # Should be transport echo request -
> translate?
>         transport =
> event.message.properties['TransportUsed'].tobytes().decode("utf-8")
>         logger.debug("body: %s", bytes.hex())
>
>         msg = EMP_msg()
>         ret = msg.decode(bytes)
>         logger.debug("EMP header decode: %s", ret)
>         if msg.msg_type_id != 4000 or msg.msg_ver != 1:
>             logger.error("Received message type %dv%d not supported.",
> msg.msg_type_id, msg.msg_ver)
>             return
>         request = TransportEchoRequest()
>         try:
>             request.decode(bytes)
>             request.decode_body()
>         except Exception as e:
>             logger.error("Decoding transport echo request: %s", e)
>             return
>         self.on_transport_echo_request(request, transport)
>         self.release(event.delivery)
>
>     def on_sendable(self, event):
>         logger.info("Sendable.")
>         if self.message_to_send:
>             event.sender.send(self.message_to_send.msg_bytes)
>             self.message_to_send = None
>         return super().on_sendable(event)
>
>     # These are local calls, not framework callbacks as the above are.
>     def on_transport_echo_request(self, request, transport):
>         logger.info("transport_echo from %s np %d on %s",
> request.src_addr, request.QoS_net_pref, transport)
>         response = TransportEchoResponse(transport, request)
>         response.encode_body()
>         response.encode()
>         self.sender.send(response.msg_bytes)
>         #self.message_to_send = response
>         #self.sender.offered(1)
>
> When I run this (and the other side that initiates a request) I get:
>
> INFO:ricochet:starting
> INFO:ricochet:sender <proton._endpoints.Sender 0x10ff88c20 ~
> 0x7fe6b88530a0>
> INFO:proton:Connecting to Url('amqp://localhost:8000')...
> INFO:proton:Connected to localhost
> INFO:ricochet:Sendable.
> INFO:ricochet:transport_echo from round-trip np 0 on att
> Traceback (most recent call last):
>   File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 87, in
> <module>
>     main()
>     ~~~~^^
>   File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 83, in
> main
>     asyncio.run(ricochet.run())
>     ~~~~~~~~~~~^^^^^^^^^^^^^^^^
>   File 
> "/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py",
> line 194, in run
>     return runner.run(main)
>            ~~~~~~~~~~^^^^^^
>   File 
> "/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py",
> line 118, in run
>     return self._loop.run_until_complete(task)
>            ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
>   File 
> "/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py",
> line 721, in run_until_complete
>     return future.result()
>            ~~~~~~~~~~~~~^^
>   File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 76, in
> run
>     await asyncio.to_thread(self.run_in_thread())
>                             ~~~~~~~~~~~~~~~~~~^^
>   File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 73, in
> run_in_thread
>     Container(self).run()
>     ~~~~~~~~~~~~~~~~~~~^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_reactor.py",
> line 197, in run
>     while self.process():
>           ~~~~~~~~~~~~^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_reactor.py",
> line 260, in process
>     event.dispatch(handler)
>     ~~~~~~~~~~~~~~^^^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line
> 160, in dispatch
>     self.dispatch(h, type)
>     ~~~~~~~~~~~~~^^^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line
> 160, in dispatch
>     self.dispatch(h, type)
>     ~~~~~~~~~~~~~^^^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line
> 157, in dispatch
>     _dispatch(handler, type.method, self)
>     ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line
> 128, in _dispatch
>     m(*args)
>     ~^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_handlers.py",
> line 256, in on_delivery
>     self.on_message(event)
>     ~~~~~~~~~~~~~~~^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_handlers.py",
> line 281, in on_message
>     _dispatch(self.delegate, 'on_message', event)
>     ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", line
> 128, in _dispatch
>     m(*args)
>     ~^^^^^^^
>   File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 52, in
> on_message
>     self.on_transport_echo_request(request, transport)
>     ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
>   File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 68, in
> on_transport_echo_request
>     self.sender.send(response.msg_bytes)
>     ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py",
> line 1180, in send
>     return self.stream(obj)
>            ~~~~~~~~~~~^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py",
> line 1163, in stream
>     return self._check(pn_link_send(self._impl, data))
>            ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File
> "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py",
> line 739, in _check
>     raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
> proton._exceptions.LinkException: [-1]: None
>
> The sender code is below. It does send the first message, but never gets a
> response - the above is supposed to send it but the send fails.
>
> # Send one transport echo request and wait for a response
>
> import logging
> from proton import Message
> from proton.handlers import MessagingHandler
> from proton.reactor import Container
>
> from transport_echo_request import TransportEchoRequest
> from transport_echo_response import TransportEchoResponse
> import emp
>
> logger = logging.getLogger("round_trip")
>
> class RoundTrip(MessagingHandler):
>     def __init__(self, server, to_ricochet_address, from_ricochet_address):
>         super(RoundTrip, self).__init__()
>         self.server = server
>         self.send_address = to_ricochet_address
>         self.recv_address = from_ricochet_address
>         logger.info("Setting up - broker %s", server)
>
>     def on_start(self, event):
>         logger.debug("on_start")
>         conn = event.container.connect(self.server)
>         event.container.create_receiver(conn, self.recv_address)
>         event.container.create_sender(conn, self.send_address)
>
>     def on_sendable(self, event):
>         logger.debug("on_sendable")
>         req = TransportEchoRequest()
>         req.data_integrity_flag = emp.EMP_INTEG_NONE
>         req.src_addr = "round-trip"
>         req.dest_addr = "ricochet"
>         req.guid = bytes(16)
>         req.encode_body()
>         req_bytes = req.encode()
>         m = Message(memoryview(req_bytes))
>         # The 'TransportUsed' property type was discovered by
> experimentation - use itcm-ping to
>         # send a message to the ricochet plugin and print the class of the
> value.
>         m.properties = { 'TransportUsed': memoryview(b'att') }
>         event.sender.send(m)
>        # event.sender.close()
>
>     def on_message(self, event):
>         logger.debug("on_message")
>         msg = emp.EMP_msg()
>         msg.decode(event.message.body)
>         logger.debug("msg type %d, vers %d", msg.msg_type_id, msg.msg_ver)
>         #assert 4001 == msg.msg_type_id
>         #assert 1 == msg.msg_ver
>         resp = TransportEchoResponse()
>         resp.decode(event.message.body)
>         resp.decode_body()
>         print("Response from ", resp.src_addr, " to ", resp.dest_addr, "
> used ", resp.sending_transport)
>         event.receiver.close()
>         event.connection.close()
>
> logging.basicConfig(level=logging.DEBUG)
> Container(RoundTrip("localhost:8000", "in", "out")).run()
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> For additional commands, e-mail: users-h...@qpid.apache.org
>
>

Reply via email to