Whoever is facing this problem and cannot use async schedulling because the
endpoint is transacted, I've came up with a workaround, in my case I'm using
RabbitMQ as the from Endpoint:

This is my route configuration:

        from(fromEndpoint) // AMQP INPUT QUEUE
            .routeId(consumerRouteId)
            .to(DIRECT_SEND_DESTINATION);

        from(retryEndpoint) //AMQP RETRY QUEUE
            .errorHandler(deadLetterChannel) // a Dead Letter Channel with
Redelivery configuration
            .routeId(retryRouteid)
            .setHeader(MESSAGE_RETRY_HEADER, constant(true))
            .to(DIRECT_SEND_DESTINATION);

        from(DIRECT_SEND_DESTINATION) // Direct endpoints that work as a
funnel between the Input and Retry queue
            .doTry()
                ...
                .process(sendToDestinationProcessor) //If this throws an
exception we will decide what to do in the catch
                ...
            .doCatch(Exception.class)
                .process(redeliveryExceptionRethrowProcessor)
                .to(retryEndpoint) // Is the first time we hear about this
message, so we send (the original) to the retry queue
            .end();

And the redeliveryExceptionRethrowProcessor will decide if it needs to
rethrow the exception or not based on a header that is present just in the
Retry Messages. Note that the retryEndpoint route has the errorHandler
configuration, so it will handle redelivery from that queue only, not from
the Input queue.
This way, redelivery attempts are not blocking anymore, and we don't use an
async scheduler where messages can be lost in case of an outage.

public class RedeliveryExceptionRethrowProcessor implements Processor {

    private final String header;

    public RedeliveryExceptionRethrowProcessor(String header) {
        this.header = header;
    }

    @Override
    public void process(Exchange exchange) throws Exception {
        Boolean rethrowException = exchange.getIn().getHeader(header,
Boolean.class);
        if (rethrowException != null && rethrowException) {
            throw exchange.getProperty(Exchange.EXCEPTION_CAUGHT,
Exception.class);
        }
        exchange.setIn(exchange.getUnitOfWork().getOriginalInMessage());
    }

}

//If we are not rethrowing the exception, is the first time we see this
message, so it's going to be sent to the retry queue.
//For it to work, we need to use the original message, so any changes in the
exchange while trying to process it would be ignored



--
View this message in context: 
http://camel.465427.n5.nabble.com/Message-blocks-route-until-all-redelivery-attempts-are-exhausted-tp472319p5784827.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to