Hi all! Using camel 2.12.5 problem is reproducible (same for latest 2.11, latest 2.13, latest 2.14, latest 2.15). Using camel 2.10.7 (i.e. latest 2.10) problem is NOT reproducible. Problem appears only if a LevelDB or HawtDB repo is used. In memory default aggregator works fine. Reproduced by unit test. Route and unit tests in the end.
Repo definition: HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat"); repo.setDeadLetterUri(dlq.getEndpointUri()); repo.setMaximumRedeliveries(3); Problems: 1. Wrong number of aggregated messages (configured to fire for completionSize=2 or timeout=10000 millis) 2. Error Handling of HawtDB/ LevelDB is not working (i.e. no retries, no message goes to DLQ) Any ideas?? Many thanks!!! --------------------------- Unit test output regarding (1): The unit test sends 4 messages and expects 2 aggregated messages (i.e. 2 incoming messages per group). But, aggregator outputs 4 messages. The last two are NOT correct and have EMPTY header "in.header.CamelAggregatedCompletedBy". INFO 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer with FILE REF=1 cancelled] DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72] [Aggregation Completion reason=size] INFO 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer with FILE REF=2 cancelled] DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72] [Aggregation Completion reason=size] DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72] [Ignore - Aggregation Completion Reason Not Expected=] DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72] [Ignore - Aggregation Completion Reason Not Expected=] ---------------------------- Route: LastMessageAggregationStrategy aggregationStratery = new LastMessageAggregationStrategy(); from(timerRouteFrom).routeId("timerRoute") .aggregate(header(TimerRoute.FILE_REF_HEADER), aggregationStratery).completionSize(2).completionTimeout(timeout).aggregationRepository(repo) .choice() .when(simple("${in.header.CamelAggregatedCompletedBy} contains 'timeout'")) .log(LoggingLevel.ERROR, timeoutMessage) .log(LoggingLevel.ERROR, "Timeout threshold (millis): " + timeout) .log(LoggingLevel.ERROR, "File Ref= ${in.header." + FILE_REF_HEADER +"}") .log(LoggingLevel.DEBUG, "Sending to: " + timeoutUri) .setHeader("contentType", constant("text/html")) .setHeader(TIMEOUT_THRESHOLD_HEADER, constant(timeout)) .to(freeMarkerTemplate).id("freemarkerId") .log(LoggingLevel.DEBUG, "Emailing Exchange body: ${body}") .to(timeoutUri) .when(simple("${in.header.CamelAggregatedCompletedBy} contains 'size'")) .log(LoggingLevel.INFO, "Timer with FILE REF=${in.header." + FILE_REF_HEADER +"} cancelled") .log(LoggingLevel.DEBUG, "Aggregation Completion reason=${in.header.CamelAggregatedCompletedBy}") .to(successUri) .otherwise() .log(LoggingLevel.DEBUG, "Ignore - Aggregation Completion Reason Not Expected=${in.header.CamelAggregatedCompletedBy}") .end() .end(); class LastMessageAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { return newExchange; } } ---------------------- Unit Test (aggregation size=2, so expecting 2 aggregated messages): public void happyPath() throws InterruptedException { successUri.expectedMessageCount(2); timeoutUri.expectedMessageCount(0); Map<String, Object> headers1 = new HashMap<String, Object>(); headers1.put(TimerRoute.FILE_REF_HEADER, "1"); Map<String, Object> headers2 = new HashMap<String, Object>(); headers2.put(TimerRoute.FILE_REF_HEADER, "2"); for(int i=0;i<2;i++) fromTemplate.sendBodyAndHeaders("dummy body1..", headers1); for(int i=0;i<2;i++) fromTemplate.sendBodyAndHeaders("dummy body2..", headers2); Thread.sleep(5000); assertMockEndpointsSatisfied(); } public void testAggregatorRetries() throws Exception { context.getRouteDefinition("timerRoute").adviceWith(context, new AdviceWithRouteBuilder() { @Override public void configure() throws Exception { interceptSendToEndpoint(timeoutUri.getEndpointUri()) .skipSendToOriginalEndpoint() .throwException(new IOException("This a simulated exception Timeout!")); interceptSendToEndpoint(successUri.getEndpointUri()) .skipSendToOriginalEndpoint() .throwException(new IOException("This a simulated exception Success!")); } }); successUri.expectedMessageCount(0); timeoutUri.expectedMessageCount(0); dlq.expectedMessageCount(1); Map<String, Object> headers1 = new HashMap<String, Object>(); headers1.put(TimerRoute.FILE_REF_HEADER, "1"); for(int i=0;i<2;i++) fromTemplate.sendBodyAndHeaders("dummy body1..", headers1); Thread.sleep(5000); assertMockEndpointsSatisfied(); } protected RouteBuilder createRouteBuilder() throws Exception { // create the repo HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat"); repo.setDeadLetterUri(dlq.getEndpointUri()); repo.setMaximumRedeliveries(3); // create the route that will be tested TimerRoute routePutToTest = new TimerRoute("direct:from", "Expired!!!!" , repo, 10 * 1000, successUri.getEndpointUri(), timeoutUri.getEndpointUri()); return routePutToTest; } -- View this message in context: http://camel.465427.n5.nabble.com/Aggregator-LevelDB-or-HawtDB-for-persistency-incorrect-behavior-tp5765524.html Sent from the Camel - Users mailing list archive at Nabble.com.