Hi All,

I've searched for this topic now a fair bit and I'm not quite getting a 
conclusive answer to my issue.

What we've noticed the last while is that in some cases under high enough load 
with multiple consumers on the Aggregator route some messages get overwritten 
in the aggregation result with a following message. Seems like a very random 
and intermittent event and I'm not quite sure how to replicate that with a test 
case as I'm not 100% sure of the mechanics behind how the Aggregator is 
treating multiple consumers. Since we've started seeing this issue there was a 
synchronized block put around the aggregation result list modifications and 
that seems to have done absolutely nothing in terms of reducing or eliminating 
the items overwriting each other from time to time.

Here is my setup:

RabbitMQ endpoint configuration.

inputEndpoint: 
rabbitmq://batchExchange?connectionFactory=rabbitConnectionFactory&autoDelete=false&queue=batchrouteQueue&exchangeType=topic&autoAck=false&bridgeEndpoint=true&concurrentConsumers=15&threadPoolSize=15&channelPoolMaxSize=15&prefetchCount=1&prefetchEnabled=true,vmsys301.vhihealthcare.net:5671

The route (goes through an abstract route config for retry and exception 
handling).

from("direct:" + getServiceName())
                                                                .log("Receiving 
${body}")
                                                                
.process(this::process) //sets the aggregation header
                                                                .choice()
                                                                                
.when(header(AGGREGATIONKEY).contains("ONLINE "))
                                                                                
.aggregate(header(AGGREGATIONKEY))
                                                                                
.aggregationStrategyRef(aggregationStrategy)
                                                                                
.aggregationRepositoryRef(batchOnlineRepo)
                                                                                
.completionSize(batchSize)
                                                                                
.completionTimeout(onlineBatchTimeout)
                                                                                
.log("Sending out Online ${body}")
                                                                
.to(outputEndpoint).endChoice()
                                                                                
.otherwise()
                                                                                
.aggregate(header(AGGREGATIONKEY))
                                                                                
.aggregationStrategyRef(aggregationStrategy)
                                                                                
.aggregationRepositoryRef(batchRepo)
                                                                                
.completionSize(batchSize)
                                                                                
.completionTimeout(batchTimeout)
                                                                                
.log("Sending out ${body}")
                                                                
.to(outputEndpoint).endChoice().end();

The AggregationStrategy

public class BatchAggregationStrategy implements AggregationStrategy {
                final Logger logger = LoggerFactory.getLogger(this.getClass());

                public Exchange aggregate(Exchange oldExchange, Exchange 
newExchange) {

                                String aggregationkey = 
newExchange.getIn().getHeader("aggregationkey", String.class);

                                if (oldExchange == null) {
                                                Object documentPayload = 
newExchange.getIn().getBody(Object.class);
                                                if (documentPayload instanceof 
DocumentPayloadBatch) {
                                                                
logger.debug("Skip aggregation  ["+aggregationkey+"](this is a recovered 
exchange, just return it): " + newExchange.getIn());
                                                                return 
newExchange;
                                                } else {
                                                                
DocumentPayloadBatch documentPayloadList = new DocumentPayloadBatch();
                                                                
synchronized(documentPayloadList) {
                                                                                
documentPayloadList.add((DocumentPayload) documentPayload);
                                                                                
newExchange.getIn().setBody(documentPayloadList);
                                                                                
logger.debug("First item of aggregation  ["+aggregationkey+"](old exchange 
=null): " + newExchange.getIn());
                                                                }
                                                                return 
newExchange;
                                                }
                                }


                                DocumentPayload documentPayload = 
newExchange.getIn().getBody(DocumentPayload.class);
                                DocumentPayloadBatch documentPayloadList = 
oldExchange.getIn().getBody(DocumentPayloadBatch.class);
                                synchronized(documentPayloadList) {

                                                                
documentPayloadList.add(documentPayload);
                                                                
logger.debug("Payload with uuid " + documentPayload.getUuid()
                                                                                
                + " added to list for batch sending. Have "
                                                                                
                + documentPayloadList.size() + " payloads in list 
["+aggregationkey+"] now .");
                                                                } //for some 
reason documentPayloadList doesn't get updated with the latest entry sometimes 
(1 in 5000 requests or so on average)


                                return oldExchange;
                }

}

Given the above aggregation strategy code, should I be also synchronizing the 
oldExchange.getIn().getBody(DocumentPayloadBatch.class) call? Would the 
Aggregator be even compatible with multiple consumers like in the above setup? 
I can reduce it down to a single consumer, it's a fairly quick aggregation 
anyway, though I would prefer to keep my 15 consumers of course.

Due to the sporadic and intermittent nature of the issue I'm leaning to think 
it's to do with the consumers racing each other to update the aggregation 
result, but then the resulting list updates are already in synchronized blocks. 
The remaining part of the aggregation that's outside of said block is the 
exchange updates and reads themselves. The updated exchange with the new item 
hasn't been returned yet before another consumer grabs a version of it and 
starts its own update (not sure if that's even possible though).


Thanks,
Valdis
Vhi Group DAC (Vhi) is a holding company for insurance and healthcare services, 
which include Vhi Healthcare DAC, Vhi Insurance DAC, Vhi Health Services DAC 
and Vhi Investments DAC. Vhi Healthcare DAC trading as Vhi Healthcare and Vhi 
Insurance DAC trading as Vhi Insurance are regulated by the Central Bank of 
Ireland. Vhi Healthcare is tied to Vhi Insurance DAC for health insurance in 
Ireland which is underwritten by Vhi Insurance DAC. Vhi Healthcare is tied to 
Zurich Life Assurance plc for Vhi Life Term Insurance and Vhi Mortgage 
Protection which are underwritten by Zurich Life Assurance plc. Vhi Healthcare 
is tied to Collinson Insurance Solutions Europe Limited for MultiTrip Travel 
Insurance and Vhi Dental Insurance which are underwritten by Great Lakes 
Insurance SE and for Vhi International Health Insurance which is underwritten 
by Collinson Insurance Europe Limited. For more information about the Vhi Group 
please go to: https://www.vhi.ie/about-vhi.

Tá Vhi Group DAC (Vhi) ina chuideachta sealbhaíochta le haghaidh seirbhísí 
árachais agus seirbhísí cúram sláinte, lena n-áirítear Vhi Healthcare DAC, Vhi 
Insurance DAC, Vhi Health Services DAC agus Vhi Investments DAC. Déanann Banc 
Ceannais na hÉireann rialáil ar Vhi Healthcare DAC, ag trádáil dó mar Vhi 
Healthcare, agus ar Vhi Insurance DAC, ag trádáil dó mar Vhi Insurance. Tá Vhi 
Healthcare ceangailte le Vhi Insurance DAC le haghaidh árachas sláinte in 
Éirinn, rud atá frithgheallta ag Vhi Insurance DAC. Tá Vhi Healthcare 
ceangailte le Zurich Life Assurance plc le haghaidh Árachais Saoil de chuid Vhi 
agus Árachas Cosanta Morgáiste de chuid Vhi atá frithgheallta ag Zurich Life 
Assurance plc. Tá Vhi Healthcare ceangailte le Collinson Insurance Solutions 
Europe Limited le haghaidh Árachas Taistil Ilturais agus Árachas Fiaclóireachta 
de chuid Vhi atá frithgheallta ag Great Lakes Insurance SE agus le haghaidh 
Árachas Sláinte Idirnáisiúnta de chuid Vhi atá frithgheallta ag Collinson 
Insurance Europe Limited. Chun tuilleadh faisnéise a fháil faoi Ghrúpa Vhi, 
tabhair cuairt ar: https://www.vhi.ie/about-vhi.

This e-mail and any files transmitted with it contain information which may be 
confidential and which may also be privileged and is intended solely for the 
use of the individual or entity to whom it is addressed. Unless you are the 
intended recipient you may not copy or use it, or disclose it to anyone else. 
Any opinions expressed are that of the individual and not necessarily that of 
the Vhi Group. If you have received this e-mail in error please notify the 
sender by return.

Reply via email to