I have a receiver that is connecting to an apache qpid queue (java-based, I
believe, though that's probably not relevant). The issue I'm finding is
that the receiver gets messages that are currently in the queue when it
connects (there are often many), but that any new messages in the queue are
not getting to the receiver. If I start a new receiver, it sees the missed
messages just fine - but again, any more that come into the queue after the
connection is established are not seen.
Here's the receiver:
class TimeoutException(Exception):
pass
def sigalarm_handler(*args):
raise TimeoutException
def set_sighandler():
signal.signal(signal.SIGALRM, sigalarm_handler)
class MyReceiver(MessagingHandler):
def __init__(self, num_messages=0, delete=False):
super(MyReceiver, self).__init__()
self.url = '{}:{}@{}/{}'.format(
Config.XYZ_QUEUE_USER,
Config.XYZ_QUEUE_PASS,
Config.XYZ_QUEUE_URL,
Config.XYZ_QUEUE_NAME)
self.timeout = timeout
self.delete = delete
# Order message handling
self.expected = num_messages
self.received = 0
self.orders = []
def on_start(self, event):
# Set the sighandler, so we don't wait forever for incoming messages
set_sighandler()
signal.alarm(self.timeout)
if self.delete:
event.container.create_receiver(self.url)
else:
event.container.create_receiver(self.url, options=Copy())
def on_message(self, event: proton.Event):
signal.alarm(0)
if self.expected == 0 or self.received < self.expected:
self.orders.append(event.message)
self.received += 1
if self.received == self.expected:
event.receiver.close()
event.connection.close()
return
# Restart the timer - more work to do!
signal.alarm(self.timeout)
def poll_for_messages(self):
try:
Container(self).run()
except TimeoutException:
logger.info('Stopping poll after {}
seconds'.format(self.timeout))
return self.received