Hello,
In my application I have a route that consumes a JMS queue, then aggregate on a
header and finally process. The issue is that the consumption of the queue is
stopped until the processing is done. The queue contains 25 millions messages
(600K unique messages based on aggregation rule), aggregation is configured to
complete on timeout (30 min) with the UseLatestAggregationStrategy. Persistence
is handled with LevelDB. Paralllel processing is enabled.
What I see is the consumption of 2-3 millions messages during the 30 min, and
then the consumption stops after the aggregator thread starts the processing.
The consumption resumes after all exchanges have been processed. In my case the
processing takes days, so no consumption of the queue.
I try to reproduce with a simpler unit test. By analysing the log I find the
same kind of issue:
13:15:00.852 [main] INFO route1 - Received message 0 - 0
13:15:03.861 [main] INFO route1 - Received message 0 - 174288
13:15:03.876 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process
message 0 - 4
13:15:04.880 [main] INFO route1 - Received message 0 - 266292
13:15:04.987 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process
message 0 - 10
13:15:06.159 [Camel (camel-1) thread #8 - Aggregator] INFO route1 - Process
message 0 - 16858
13:15:06.159 [main] INFO route1 - Received message 0 - 266293 <----- 1s wait
with last message 266292
13:15:07.178 [main] INFO route1 - Received message 0 - 368107
13:15:08.065 [Camel (camel-1) thread #6 - Aggregator] INFO route1 - Process
message 0 - 17882
13:15:23.241 [Camel (camel-1) thread #12 - Aggregator] INFO route1 - Process
message 0 - 199001
13:15:23.241 [main] INFO route1 - Received message 0 - 368108 <----- 16s wait
with last message 368107
Is this something that can be mitigated? Am I missing something in the route?
public class AggregationTest extends CamelTestSupport {
@Test
public void testMock() throws Exception {
for (int duplicate = 0; duplicate < 2; duplicate++) {
for (int i = 0; i < 400000; i++) {
template.sendBodyAndHeader("direct:start",
String.format("%d - %d", duplicate, i),
"JMSXGroupID", String.valueOf(i));
}
}
NotifyBuilder notify = new NotifyBuilder(context)
.from("mock:result")
.create();
boolean done = notify.matches(30, TimeUnit.SECONDS);
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() {
from("direct:start")
.log("Received message ${body}")
.aggregate(header("JMSXGroupID"), new
UseLatestAggregationStrategy())
.completionTimeout(3000)
.parallelProcessing()
.log("Process message ${body}")
.to("mock:result");
}
};
}
}
Thanks
Sydney