Hi All,
Did a bit of digging and the issue appears to be coming from upstream on the
producer. Here is the route configuration for it:
from("direct:archive")
.log("Receiving ${body}")
.aggregate(header("aggregationkey"))
.aggregationStrategyRef(aggregationStrategy)
.aggregationRepositoryRef(repoName)
.completionSize(batchSize)
.completionTimeout(batchTimeout)
.log("Sending out ${body}")
.to(archiveProcessor)
.split(body())
.log("processing Split: ${body}")
.process(this::process)
.choice()
.when(body().regex(".*deliveryType:BASIC}")).setHeader("rabbitmq.DELIVERY_MODE",
simple("2")).to(outputEndpoint).to(outputBasickEndpoint).to(notificationRouterEndpoint)
.when(body().regex(".*deliveryType:PRIORITY}")).to(outputEndpoint).to(outputPriorityEndpoint).to(priorityNotificationRouterEndpoint)
.endChoice()
.end();
Issue stems from the "notificationRouterEndpoint" receiving a message that has
had the DELIVERY_MODE header stripped out somewhere along the way. The other
headers I'm setting on the exchange seem to be still there when I get the
message directly from the RabbitMQ console but the Delivery Mode property seems
to be gone. The above is my last attempt at forcing it to stick to the message
but to avail.
Oddly enough the header seems to be there after the split and before the
choice/when block:
private void process(Exchange exchange) {
logger.info("RabbitMQ Persistence header after archiving: " +
exchange.getIn().getHeader("rabbitmq.DELIVERY_MODE"));
}
Gives me this in the logs:
[2019-11-05 21:33:30,822 Camel (camel-1) thread #2 - AggregateTimeoutChecker]
INFO ie.vhi.cch.builders.BatchArchiveFeedbackRouteBuilder - RabbitMQ
Persistence header after archiving: 2
By the time it gets to the rabbit queue the value is null. This is the rabbit
queue config for notificationRouterEndpoint:
notificationRouterEndpoint:
rabbitmq://vm1/notificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=notificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=3&threadPoolSize=3&exchangePattern=InOnly&channelPoolMaxSize=3
&addresses=vm1,vm2,vm3
The idea here is to send out notifications that something has been archived
(batching the archive items helps with the load on the Archive Service). The
splitting is there to facilitate individual notification processing for each
archived item (since they can go different ways and also fail in all sorts of
different ways).
Any help/advice/hint on the above would be greatly appreciated.
Thanks and Regards,
Valdis
From: Valdis Andersons
Sent: 05 November 2019 15:59
To: [email protected]
Subject: camel-rabbitmq sheduled route acks all messages when shutting down
Hi All,
Not sure I understand the issue fully, but I have scheduled route with this
config (it's throttled as well):
CronScheduledRoutePolicy emailSchedulingPolicy = new CronScheduledRoutePolicy();
emailSchedulingPolicy.setRouteStartTime(emailNotificationsStartSchedule);
emailSchedulingPolicy.setRouteStopTime(emailNotificationsStopSchedule);
Processor exceptionLoggingProcessor = (exchange) ->
logger.error("EmailNotificationRoute Error handled in camel.",
exchange.getProperty(Exchange.EXCEPTION_CAUGHT));
setErrorHandlerBuilder(deadLetterChannel(deadLetterQueue)
.onExceptionOccurred(exceptionLoggingProcessor));
onException(Exception.class)
.logExhaustedMessageHistory(true)
.useOriginalMessage();
from(emailNotificationInputEndpoint)
.routeId(EMAIL_NOTIFICATIONS).routePolicy(emailSchedulingPolicy).autoStartup(autoStartup)
.filter(notificationFilterPredicate)
.throttle(maxEmailNotificationsThroughput)
.timePeriodMillis(timePeriodMillisForThrottle)
.process(emailNotificationProcessor)
.to(emailNotificationOutEndpoint)
.end();
The schedule is like this:
startSchedule: 0 0 10 ? * TUE-FRI,SUN *
stopSchedule: 0 0 15 ? * TUE-FRI,SUN *
So at 3pm the thing did shut down as expected but there was an error thrown in
the logs (only thing I can see going wrong in the logs):
2019-11-05 15:00:11,358 [Camel (camel-1) thread #163 - ShutdownTask] ERROR
o.a.c.c.r.RabbitConsumer - Thread Interrupted!
The consumer config is this:
rabbitmq://vm1/emailnotificationExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=emailnotificationQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=9&threadPoolSize=9&channelPoolMaxSize=9&prefetchCount=1&prefetchEnabled=true&requestTimeout=90000&addresses=vm1,vm2,vm3
It's a clustered environment with the vm3 being a federated node. When the
route shut down this happened:
[cid:[email protected]]
All of the 2k+ messages got somehow ack'd even though I have autoAck=false and
this doesn't happen on a normal shutdown of the application. Am I missing
something here, anyone would be able to explain why that could be happening?
I'm a bit at a loss here. I was expecting the remaining messages to be just
left on the queue after the route shut down (as prefetch is set to be only 1).
Best Regards and Thanks,
Valdis