Hi All,

Did a bit of digging and the issue appears to be coming from upstream on the 
producer. Here is the route configuration for it:

from("direct:archive")
.log("Receiving ${body}")
.aggregate(header("aggregationkey"))
.aggregationStrategyRef(aggregationStrategy)
.aggregationRepositoryRef(repoName)
.completionSize(batchSize)
.completionTimeout(batchTimeout)
.log("Sending out ${body}")
.to(archiveProcessor)
      .split(body())
      .log("processing Split: ${body}")
      .process(this::process)
      .choice()
      
.when(body().regex(".*deliveryType:BASIC}")).setHeader("rabbitmq.DELIVERY_MODE",
 
simple("2")).to(outputEndpoint).to(outputBasickEndpoint).to(notificationRouterEndpoint)
      
.when(body().regex(".*deliveryType:PRIORITY}")).to(outputEndpoint).to(outputPriorityEndpoint).to(priorityNotificationRouterEndpoint)
      .endChoice()
.end();

Issue stems from the "notificationRouterEndpoint" receiving a message that has 
had the DELIVERY_MODE header stripped out somewhere along the way. The other 
headers I'm setting on the exchange seem to be still there when I get the 
message directly from the RabbitMQ console but the Delivery Mode property seems 
to be gone. The above is my last attempt at forcing it to stick to the message 
but to avail.
Oddly enough the header seems to be there after the split and before the 
choice/when block:

private void process(Exchange exchange) {
    logger.info("RabbitMQ Persistence header after archiving: " + 
exchange.getIn().getHeader("rabbitmq.DELIVERY_MODE"));
}

Gives me this in the logs:

[2019-11-05 21:33:30,822 Camel (camel-1) thread #2 - AggregateTimeoutChecker] 
INFO  ie.vhi.cch.builders.BatchArchiveFeedbackRouteBuilder - RabbitMQ 
Persistence header after archiving: 2

By the time it gets to the rabbit queue the value is null. This is the rabbit 
queue config for notificationRouterEndpoint:

notificationRouterEndpoint: 
rabbitmq://vm1/notificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=notificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&exchangePattern=InOnly&channelPoolMaxSize=3
&addresses=vm1,vm2,vm3

The idea here is to send out notifications that something has been archived 
(batching the archive items helps with the load on the Archive Service). The 
splitting is there to facilitate individual notification processing for each 
archived item (since they can go different ways and also fail in all sorts of 
different ways).

Any help/advice/hint on the above would be greatly appreciated.


Thanks and Regards,
Valdis

From: Valdis Andersons
Sent: 05 November 2019 15:59
To: users@camel.apache.org
Subject: camel-rabbitmq sheduled route acks all messages when shutting down

Hi All,

Not sure I understand the issue fully, but I have scheduled route with this 
config (it's throttled as well):

CronScheduledRoutePolicy emailSchedulingPolicy = new CronScheduledRoutePolicy();
emailSchedulingPolicy.setRouteStartTime(emailNotificationsStartSchedule);
emailSchedulingPolicy.setRouteStopTime(emailNotificationsStopSchedule);

Processor exceptionLoggingProcessor = (exchange) ->
        logger.error("EmailNotificationRoute Error handled in camel.", 
exchange.getProperty(Exchange.EXCEPTION_CAUGHT));

setErrorHandlerBuilder(deadLetterChannel(deadLetterQueue)
        .onExceptionOccurred(exceptionLoggingProcessor));

onException(Exception.class)
        .logExhaustedMessageHistory(true)
        .useOriginalMessage();

from(emailNotificationInputEndpoint)
        
.routeId(EMAIL_NOTIFICATIONS).routePolicy(emailSchedulingPolicy).autoStartup(autoStartup)
        .filter(notificationFilterPredicate)
        .throttle(maxEmailNotificationsThroughput)
        .timePeriodMillis(timePeriodMillisForThrottle)
        .process(emailNotificationProcessor)

        .to(emailNotificationOutEndpoint)
        .end();

The schedule is like this:

    startSchedule: 0 0 10 ? * TUE-FRI,SUN *
    stopSchedule: 0 0 15 ? * TUE-FRI,SUN *

So at 3pm the thing did shut down as expected but there was an error thrown in 
the logs (only thing I can see going wrong in the logs):

2019-11-05 15:00:11,358 [Camel (camel-1) thread #163 - ShutdownTask] ERROR 
o.a.c.c.r.RabbitConsumer  - Thread Interrupted!

The consumer config is this:

rabbitmq://vm1/emailnotificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailnotificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=9&threadPoolSize=9&channelPoolMaxSize=9&prefetchCount=1&prefetchEnabled=true&requestTimeout=90000&addresses=vm1,vm2,vm3

It's a clustered environment with the vm3 being a federated node. When the 
route shut down this happened:

[cid:image001.png@01D59421.B32A1990]

All of the 2k+ messages got somehow ack'd even though I have autoAck=false and 
this doesn't happen on a normal shutdown of the application. Am I missing 
something here, anyone would be able to explain why that could be happening? 
I'm a bit at a loss here. I was expecting the remaining messages to be just 
left on the queue after the route shut down (as prefetch is set to be only 1).


Best Regards and Thanks,
Valdis







Reply via email to