Hi Guys,

I'm having a confusing problem with split aggregate.

An exchange comes in from a SEDA endpoint.
I then set a property.
Then split the body (a list of keys) and use an ArrayListAggregator.
After the split-aggregate, I then retrieve the property (which happens to be a 
map that I use to cross reference against the body elements).

Now this works just fine with a single thread, but when I increase the 
concurrentConsumers some of the exchanges are throwing NPE because somehow the 
next exchange consumed by the seda endpoint is having it’s property set on the 
other exchange. I’m not sure how this is happening as I thought such changes on 
the exchanges would be thread local

Here is my route:



fromF("seda:%s?concurrentConsumers=%s", jobName, concurrentConsumers)
.choice()
.when(hasResultPredicate)
   .to(mainQuery)
    .endChoice()
    .end()
.process(exchange -> {
Map<Integer, PlayerDetailsDTO> playersMap = new HashMap<Integer, 
PlayerDetailsDTO>();
List<Integer> playerIds = new ArrayList<Integer>();


List<PlayerDetailsDTO> playerDetailsList = exchange.getIn().getBody(List.class);

//message.setJobRecords(playerDetailsList);

for(PlayerDetailsDTO playerDetailsDto : playerDetailsList) {

playersMap.put(playerDetailsDto.getPlayerKey(), playerDetailsDto);
playerIds.add(playerDetailsDto.getPlayerKey());

}
exchange.setProperty(PLAYERS_MAP, playersMap);
exchange.getIn().setBody(playerIds);


})
.choice()
.when(hasResultPredicate)
.split(body())
.to(supplementaryQuery)
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionPredicate(splitSizeBatchPredicate)
.to("direct:suppProcessing")
.process(exchange -> {
Map<Integer,PlayerDetailsDTO> playersMap =
exchange.getProperty(RudJobResultProcessor.PLAYERS_MAP, Map.class);


List<List<PlayerStatusHistoryDTO>> statusHistoryListOfList =
exchange.getIn().getBody(List.class);


statusHistoryListOfList.stream().forEach(statusHistoryList -> {
if(!statusHistoryList.isEmpty()) {
PlayerDetailsDTO player = 
playersMap.get(statusHistoryList.get(0).getPlayerKey());
player.setPlayerStatusHistory(statusHistoryList);// <—- NPE occurring here when 
2 consumers as player == null
}
});})
.end()
.endChoice()
.end()
.to(SqlJobControllerRoute.POST_MESSAGE_SEDA);


I inlined the processors for completeness. Any suggestions would be great.


Many thanks,
Naseem

p.s. I sent a message earlier about db isolation levels but in fact the problem 
is during the split so disregard it.


Get Outlook for Android<https://aka.ms/ghei36>

Reply via email to