Manuel Teira escribió:
I'm sad to confirm this behaviour with the last changes:

1.-Start the remote broker.
2.-Start the activemq broker with a queued bridge.
3.-Send a message to the bridged queue: The message is bridged correctly.
4.-Stop the remote broker.
5.-Send a message to the bridged queue. It fails on QueueBridge.sendMessage as the producer is closed.
6.-Send a new message to the bridged queue. After the last changes, it tries to call 'restartProducer' but it fails, because the remote broker is down.
7.-Start the remote broker.
8.-Send a new message to the bridged queue. restartProducer is called again, the producer and its connection are successfully recreated. But ONLY the new message reaches the remote broker. I don't see any attempt to send the old ones. In the JMX console, I can see, for this queue:

ConsumerCount: 1
DequeueCount: 4
EnqueueCount: 4
QueueSize: 0

I think that perhaps this problem was created by  the attempt to implement  reconnections:

To be able to reconnect to the remote broker, DestinationBridge is capturing any exception that could be produced during message delivering (onMessage). Tipically RuntimeExceptions like IllegalStateException (if the consumer is closed) or a NullPointerException in a recently corrected bug.
I think that before those changes, these RuntimeExceptions probably were reaching the code in ActiveMQMessageConsumer.dispatch method (upper in the stack), where  we can see:

try {
    listener.onMessage(message);
    afterMessageIsConsumed(md, false);
} catch (RuntimeException e) {
    if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
        // Redeliver the message
    } else {
        // Transacted or Client ack: Deliver the next message.
        afterMessageIsConsumed(md, false);
    }
    log.warn("Exception while processing message: " + e, e);
}


As far as the exception is not reaching this level, message delivering is assumed successful. I think that this is what is happening as far as I never see the log "Exception while processing message..." when the bridged remote broker is down and I send a message.

So, I've added a dirty and fast fix to DestinationBridge.onMessage(), just rethrowing any catched exception as a RuntimeException:

catch (Exception e) {
    log.error("failed to forward message on attempt: " + (++attempt) + " reason: " + e + " message: " + message, e);
    if (maximumRetries > 0 && attempt >= maximumRetries) {
        try {
            stop();
        }
        catch (Exception e1) {
            log.warn("Failed to stop cleanly", e1);
        }
  }
  throw new RuntimeException(e);
}

Now, the exception is reaching this level (I see the exception log now) but the message is not redelivered, I think that it's caused because the condition:

session.isDupsOkAcknowledge() || session.isAutoAcknowledge()

is not reached.

So, my question is:

Why the session with the remote broker is not reaching these conditions? Do this mean that for a remote bridged broker, message send failures are never retried?


Regards.


Reply via email to