I have a receiver that is connecting to an apache qpid queue (java-based, I
believe, though that's probably not relevant). The issue I'm finding is
that the receiver gets messages that are currently in the queue when it
connects (there are often many), but that any new messages in the queue are
not getting to the receiver. If I start a new receiver, it sees the missed
messages just fine - but again, any more that come into the queue after the
connection is established are not seen.

Here's the receiver:

class TimeoutException(Exception):
    pass

def sigalarm_handler(*args):
    raise TimeoutException

def set_sighandler():
    signal.signal(signal.SIGALRM, sigalarm_handler)

class MyReceiver(MessagingHandler):
    def __init__(self, num_messages=0, delete=False):
        super(MyReceiver, self).__init__()

        self.url = '{}:{}@{}/{}'.format(
            Config.XYZ_QUEUE_USER,
            Config.XYZ_QUEUE_PASS,
            Config.XYZ_QUEUE_URL,
            Config.XYZ_QUEUE_NAME)
        self.timeout = timeout
        self.delete = delete

        # Order message handling
        self.expected = num_messages
        self.received = 0
        self.orders = []

    def on_start(self, event):
        # Set the sighandler, so we don't wait forever for incoming messages
        set_sighandler()
        signal.alarm(self.timeout)

        if self.delete:
            event.container.create_receiver(self.url)
        else:
            event.container.create_receiver(self.url, options=Copy())

    def on_message(self, event: proton.Event):
        signal.alarm(0)

        if self.expected == 0 or self.received < self.expected:
            self.orders.append(event.message)
            self.received += 1
            if self.received == self.expected:
                event.receiver.close()
                event.connection.close()
                return

        # Restart the timer - more work to do!
        signal.alarm(self.timeout)

    def poll_for_messages(self):
        try:
            Container(self).run()
        except TimeoutException:
            logger.info('Stopping poll after {}
seconds'.format(self.timeout))

        return self.received

Reply via email to