Hi

This sounds like a not so good design with using leveldb as a temporary
storage.

Instead I would look at designing so you can aggregate in smaller batches
and send back to the JMS broker, and have 2nd routes consume from that to
continue routing.

What Camel version do you use?



On Sat, Sep 3, 2022 at 5:29 PM Sydney Henrard <shenr...@smartwavesa.com>
wrote:

> Hello,
>
> In my application I have a route that consumes a JMS queue, then aggregate
> on a header and finally process. The issue is that the consumption of the
> queue is stopped until the processing is done. The queue contains 25
> millions messages (600K unique messages based on aggregation rule),
> aggregation is configured to complete on timeout (30 min) with the
> UseLatestAggregationStrategy. Persistence is handled with LevelDB.
> Paralllel processing is enabled.
>
> What I see is the consumption of 2-3 millions messages during the 30 min,
> and then the consumption stops after the aggregator thread starts the
> processing. The consumption resumes after all exchanges have been
> processed. In my case the processing takes days, so no consumption of the
> queue.
>
> I try to reproduce with a simpler unit test. By analysing the log I find
> the same kind of issue:
>
> 13:15:00.852 [main] INFO route1 - Received message 0 - 0
>
> 13:15:03.861 [main] INFO route1 - Received message 0 - 174288
> 13:15:03.876 [Camel (camel-1) thread #6 - Aggregator] INFO route1 -
> Process message 0 - 4
>
> 13:15:04.880 [main] INFO route1 - Received message 0 - 266292
> 13:15:04.987 [Camel (camel-1) thread #6 - Aggregator] INFO route1 -
> Process message 0 - 10
>
> 13:15:06.159 [Camel (camel-1) thread #8 - Aggregator] INFO route1 -
> Process message 0 - 16858
> 13:15:06.159 [main] INFO route1 - Received message 0 - 266293  <----- 1s
> wait with last message 266292
>
> 13:15:07.178 [main] INFO route1 - Received message 0 - 368107
>
> 13:15:08.065 [Camel (camel-1) thread #6 - Aggregator] INFO route1 -
> Process message 0 - 17882
>
> 13:15:23.241 [Camel (camel-1) thread #12 - Aggregator] INFO route1 -
> Process message 0 - 199001
> 13:15:23.241 [main] INFO route1 - Received message 0 - 368108  <----- 16s
> wait with last message 368107
>
> Is this something that can be mitigated? Am I missing something in the
> route?
>
> public class AggregationTest extends CamelTestSupport {
>
>     @Test
>     public void testMock() throws Exception {
>         for (int duplicate = 0; duplicate < 2; duplicate++) {
>             for (int i = 0; i < 400000; i++) {
>                 template.sendBodyAndHeader("direct:start",
>                         String.format("%d - %d", duplicate, i),
>                         "JMSXGroupID", String.valueOf(i));
>             }
>         }
>         NotifyBuilder notify = new NotifyBuilder(context)
>                 .from("mock:result")
>                 .create();
>
>         boolean done = notify.matches(30, TimeUnit.SECONDS);
>     }
>
>     @Override
>     protected RoutesBuilder createRouteBuilder() throws Exception {
>         return new RouteBuilder() {
>             @Override
>             public void configure() {
>                 from("direct:start")
>                         .log("Received message ${body}")
>                         .aggregate(header("JMSXGroupID"), new
> UseLatestAggregationStrategy())
>                         .completionTimeout(3000)
>                         .parallelProcessing()
>                         .log("Process message ${body}")
>                         .to("mock:result");
>             }
>         };
>     }
> }
>
> Thanks
> Sydney
>


-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to