I am trying to write a python program that can receive a message, form a
response, and send it back. I am getting errors on sending the response. Can
someone help me see what I have wrong? The error appears to be -1 with no
explanation. I'm hoping I just have some misunderstanding of how this is
supposed to work.
Thank you,
-Steve
The code is:
import logging
from proton.handlers import MessagingHandler
from proton.reactor import Container
from emp import EMP_msg
from transport_echo_request import TransportEchoRequest
from transport_echo_response import TransportEchoResponse
logger = logging.getLogger("ricochet")
class Plugin(MessagingHandler):
def __init__(self):
super(Plugin, self).__init__()
def setup(self, config):
self.broker = config['broker-url']
self.recv_queue = config['recv-queue']
self.send_queue = config['send-queue']
self.message_to_send = None
def on_start(self, event):
conn = event.container.connect(self.broker)
event.container.create_receiver(conn, self.recv_queue)
self.sender = event.container.create_sender(conn, self.send_queue)
logger.info("sender %r", self.sender)
def on_message(self, event):
bytes = event.message.body # Should be transport echo request -
translate?
transport =
event.message.properties['TransportUsed'].tobytes().decode("utf-8")
logger.debug("body: %s", bytes.hex())
msg = EMP_msg()
ret = msg.decode(bytes)
logger.debug("EMP header decode: %s", ret)
if msg.msg_type_id != 4000 or msg.msg_ver != 1:
logger.error("Received message type %dv%d not supported.",
msg.msg_type_id, msg.msg_ver)
return
request = TransportEchoRequest()
try:
request.decode(bytes)
request.decode_body()
except Exception as e:
logger.error("Decoding transport echo request: %s", e)
return
self.on_transport_echo_request(request, transport)
self.release(event.delivery)
def on_sendable(self, event):
logger.info("Sendable.")
if self.message_to_send:
event.sender.send(self.message_to_send.msg_bytes)
self.message_to_send = None
return super().on_sendable(event)
# These are local calls, not framework callbacks as the above are.
def on_transport_echo_request(self, request, transport):
logger.info("transport_echo from %s np %d on %s", request.src_addr,
request.QoS_net_pref, transport)
response = TransportEchoResponse(transport, request)
response.encode_body()
response.encode()
self.sender.send(response.msg_bytes)
#self.message_to_send = response
#self.sender.offered(1)
When I run this (and the other side that initiates a request) I get:
INFO:ricochet:starting
INFO:ricochet:sender <proton._endpoints.Sender 0x10ff88c20 ~ 0x7fe6b88530a0>
INFO:proton:Connecting to Url('amqp://localhost:8000')...
INFO:proton:Connected to localhost
INFO:ricochet:Sendable.
INFO:ricochet:transport_echo from round-trip np 0 on att
Traceback (most recent call last):
File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 87, in
<module>
main()
~~~~^^
File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 83, in main
asyncio.run(ricochet.run())
~~~~~~~~~~~^^^^^^^^^^^^^^^^
File
"/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py",
line 194, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File
"/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py",
line 118, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File
"/usr/local/Cellar/[email protected]/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py",
line 721, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 76, in run
await asyncio.to_thread(self.run_in_thread())
~~~~~~~~~~~~~~~~~~^^
File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 73, in
run_in_thread
Container(self).run()
~~~~~~~~~~~~~~~~~~~^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_reactor.py",
line 197, in run
while self.process():
~~~~~~~~~~~~^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_reactor.py",
line 260, in process
event.dispatch(handler)
~~~~~~~~~~~~~~^^^^^^^^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py",
line 160, in dispatch
self.dispatch(h, type)
~~~~~~~~~~~~~^^^^^^^^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py",
line 160, in dispatch
self.dispatch(h, type)
~~~~~~~~~~~~~^^^^^^^^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py",
line 157, in dispatch
_dispatch(handler, type.method, self)
~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py",
line 128, in _dispatch
m(*args)
~^^^^^^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_handlers.py",
line 256, in on_delivery
self.on_message(event)
~~~~~~~~~~~~~~~^^^^^^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_handlers.py",
line 281, in on_message
_dispatch(self.delegate, 'on_message', event)
~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py",
line 128, in _dispatch
m(*args)
~^^^^^^^
File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 52, in
on_message
self.on_transport_echo_request(request, transport)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 68, in
on_transport_echo_request
self.sender.send(response.msg_bytes)
~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
File
"/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", line
1180, in send
return self.stream(obj)
~~~~~~~~~~~^^^^^
File
"/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", line
1163, in stream
return self._check(pn_link_send(self._impl, data))
~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", line
739, in _check
raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
proton._exceptions.LinkException: [-1]: None
The sender code is below. It does send the first message, but never gets a
response - the above is supposed to send it but the send fails.
# Send one transport echo request and wait for a response
import logging
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
from transport_echo_request import TransportEchoRequest
from transport_echo_response import TransportEchoResponse
import emp
logger = logging.getLogger("round_trip")
class RoundTrip(MessagingHandler):
def __init__(self, server, to_ricochet_address, from_ricochet_address):
super(RoundTrip, self).__init__()
self.server = server
self.send_address = to_ricochet_address
self.recv_address = from_ricochet_address
logger.info("Setting up - broker %s", server)
def on_start(self, event):
logger.debug("on_start")
conn = event.container.connect(self.server)
event.container.create_receiver(conn, self.recv_address)
event.container.create_sender(conn, self.send_address)
def on_sendable(self, event):
logger.debug("on_sendable")
req = TransportEchoRequest()
req.data_integrity_flag = emp.EMP_INTEG_NONE
req.src_addr = "round-trip"
req.dest_addr = "ricochet"
req.guid = bytes(16)
req.encode_body()
req_bytes = req.encode()
m = Message(memoryview(req_bytes))
# The 'TransportUsed' property type was discovered by experimentation -
use itcm-ping to
# send a message to the ricochet plugin and print the class of the
value.
m.properties = { 'TransportUsed': memoryview(b'att') }
event.sender.send(m)
# event.sender.close()
def on_message(self, event):
logger.debug("on_message")
msg = emp.EMP_msg()
msg.decode(event.message.body)
logger.debug("msg type %d, vers %d", msg.msg_type_id, msg.msg_ver)
#assert 4001 == msg.msg_type_id
#assert 1 == msg.msg_ver
resp = TransportEchoResponse()
resp.decode(event.message.body)
resp.decode_body()
print("Response from ", resp.src_addr, " to ", resp.dest_addr, " used
", resp.sending_transport)
event.receiver.close()
event.connection.close()
logging.basicConfig(level=logging.DEBUG)
Container(RoundTrip("localhost:8000", "in", "out")).run()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]