Hi all!

Using camel 2.12.5 problem is reproducible (same for latest 2.11, latest
2.13, latest 2.14, latest 2.15).
Using camel 2.10.7 (i.e. latest 2.10) problem is NOT reproducible.
Problem appears only if a LevelDB or HawtDB repo is used. In memory default
aggregator works fine.
Reproduced by unit test.
Route and unit tests in the end.

Repo definition:

HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1",
"target/data/hawtdb.dat");
repo.setDeadLetterUri(dlq.getEndpointUri());
repo.setMaximumRedeliveries(3);

Problems: 
1. Wrong number of aggregated messages (configured to fire for
completionSize=2 or timeout=10000 millis)
2. Error Handling of HawtDB/ LevelDB is not working (i.e. no retries, no
message goes to DLQ)

Any ideas??

Many thanks!!!

---------------------------

Unit test output regarding (1):

The unit test sends 4 messages and expects 2 aggregated messages (i.e. 2
incoming messages per group). But, aggregator outputs 4 messages. The last
two are NOT correct and have EMPTY header
"in.header.CamelAggregatedCompletedBy".

INFO  09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer
with FILE REF=1 cancelled]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Aggregation Completion reason=size]
INFO  09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer
with FILE REF=2 cancelled]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Aggregation Completion reason=size]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Ignore - Aggregation Completion Reason Not Expected=]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Ignore - Aggregation Completion Reason Not Expected=]

----------------------------

Route:

                LastMessageAggregationStrategy aggregationStratery = new
LastMessageAggregationStrategy();
                
                from(timerRouteFrom).routeId("timerRoute")
                    .aggregate(header(TimerRoute.FILE_REF_HEADER),
aggregationStratery).completionSize(2).completionTimeout(timeout).aggregationRepository(repo)
                    .choice()
                        .when(simple("${in.header.CamelAggregatedCompletedBy}
contains 'timeout'"))
                                .log(LoggingLevel.ERROR, timeoutMessage)
                                                .log(LoggingLevel.ERROR, 
"Timeout threshold (millis): " + timeout)
                                                .log(LoggingLevel.ERROR, "File 
Ref= ${in.header." + FILE_REF_HEADER
+"}")
                                                .log(LoggingLevel.DEBUG, 
"Sending to: " + timeoutUri)
                                                .setHeader("contentType", 
constant("text/html"))
                                                
.setHeader(TIMEOUT_THRESHOLD_HEADER, constant(timeout))
                                    .to(freeMarkerTemplate).id("freemarkerId")
                                    .log(LoggingLevel.DEBUG, "Emailing Exchange 
body: ${body}")
                                    .to(timeoutUri)
                                
.when(simple("${in.header.CamelAggregatedCompletedBy} contains
'size'"))    
                                .log(LoggingLevel.INFO, "Timer with FILE 
REF=${in.header."
+ FILE_REF_HEADER +"} cancelled")
                                .log(LoggingLevel.DEBUG, "Aggregation Completion
reason=${in.header.CamelAggregatedCompletedBy}")
                                .to(successUri)
                        .otherwise()
                                .log(LoggingLevel.DEBUG, "Ignore - Aggregation 
Completion
Reason Not Expected=${in.header.CamelAggregatedCompletedBy}")
                        .end()
                .end();


        class LastMessageAggregationStrategy implements AggregationStrategy {
         
            public Exchange aggregate(Exchange oldExchange, Exchange 
newExchange) {      
                return newExchange;
            }
        }

----------------------

Unit Test (aggregation size=2, so expecting 2 aggregated messages):

    public void happyPath() throws InterruptedException {
        
        successUri.expectedMessageCount(2);
        timeoutUri.expectedMessageCount(0);
        
        Map<String, Object> headers1 = new HashMap<String, Object>();
        headers1.put(TimerRoute.FILE_REF_HEADER, "1");
        
        Map<String, Object> headers2 = new HashMap<String, Object>();
        headers2.put(TimerRoute.FILE_REF_HEADER, "2");
        
        for(int i=0;i<2;i++) 
                fromTemplate.sendBodyAndHeaders("dummy body1..", headers1);
        
        for(int i=0;i<2;i++) 
                fromTemplate.sendBodyAndHeaders("dummy body2..", headers2);

        Thread.sleep(5000);
        
        assertMockEndpointsSatisfied();
        
    }

    public void testAggregatorRetries() throws Exception {
                
        context.getRouteDefinition("timerRoute").adviceWith(context, new
AdviceWithRouteBuilder() {
            @Override
            public void configure() throws Exception {
                
                interceptSendToEndpoint(timeoutUri.getEndpointUri())
                        .skipSendToOriginalEndpoint()
                        .throwException(new IOException("This a simulated 
exception
Timeout!"));
                
                interceptSendToEndpoint(successUri.getEndpointUri())
                .skipSendToOriginalEndpoint()
                .throwException(new IOException("This a simulated exception
Success!")); 
            }
        });
                
        successUri.expectedMessageCount(0);
        timeoutUri.expectedMessageCount(0);
        dlq.expectedMessageCount(1);
        
        Map<String, Object> headers1 = new HashMap<String, Object>();
        headers1.put(TimerRoute.FILE_REF_HEADER, "1");
        
        for(int i=0;i<2;i++) 
                fromTemplate.sendBodyAndHeaders("dummy body1..", headers1);
        
        Thread.sleep(5000);
        
        assertMockEndpointsSatisfied(); 
    }

        protected RouteBuilder createRouteBuilder() throws Exception {
                
                // create the repo
                HawtDBAggregationRepository repo = new
HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");
                repo.setDeadLetterUri(dlq.getEndpointUri());
                repo.setMaximumRedeliveries(3);
                // create the route that will be tested
                TimerRoute routePutToTest = new TimerRoute("direct:from", 
"Expired!!!!" ,
repo, 10 * 1000, successUri.getEndpointUri(),
                                timeoutUri.getEndpointUri());
                
                return routePutToTest;  
        }



        



--
View this message in context: 
http://camel.465427.n5.nabble.com/Aggregator-LevelDB-or-HawtDB-for-persistency-incorrect-behavior-tp5765524.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to