Hi Ebe,

the aggregator as you defined it has no notion of completeness of a file. It can only aggregate using the correlation expression and the aggregationstrategy. It will send out the aggregated exchange as soon as either the timeout is reached or the completionsize is reached. So when you parse a small file the timeout will trigger when you parse a large file the completionsize will trigger. I guess what you want is to recombine the messages from the whole file again into one exchange. I just read on the splitter docs that the last exchange is marked with the property "CamelSplitComplete" true. So this could help to know when the new exchange is complete.
I think you should be able to use the completionPredicate:

.aggregate(header("CamelFileName"), new 
MyAggregationStrategy()).completionPredicate(property("CamelSplitComplete"))

So the aggregator knows when the file is finished. (Not sure if my snippet is 
correct .. just typed it in the mail).


Your implementation with the builder will not speed up things as you convert to a string in the end. It should rather look like this:

        @Override
        public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
         if (exchange1 == null) {
                exchange1 = new DefaultExchange();
                exchange1.getIn().setBody(new StringBuilder());
         }
        builder = exchange1.getIn().getBody(StringBuilder.class);
        builder.append(exchange2.getIn().getBody(String.class));
        return exchange1;
        }

So the idea is that on the first time you create a new exchange with a StringBuilder in the body. Then you append each message body to this StringBuilder.

Christian



Am 28.10.2011 20:56, schrieb ebinsingh:
Thanks a lot Christian.

I just noticed that adding an aggregater slows down the process and it's
huge.
Without the aggregation, the time taken between parsing each line of data is
in nanoseconds (very negligible), but as I add the aggregarion to it, the
time taken by between parsing each line of data goes up to 30 milliseconds.

Also if I do not have the below executer statement, the process terminates
in the middle of iterating through the file.
".executorService(threadPool)"


Not sure if I have something totally wrong. Please advise.

        
from("file:C:\\camelProject\\data\\inbox?fileName=someFile.txt&delete=true")
                .log("Starting to process big file: ${header.CamelFileName}")
                .split(body().tokenize("\n")).streaming()
                .executorService(threadPool)
                .bean(MyParser.class,"parseString")
                .aggregate(header("CamelFileName"), new
MyAggregationStrategy()).completionSize(2000)
                .to("jms:queue:test_request_3");

As suggested I am using StringBuilder in my aggregation statergy.

        StringBuilder builder = new StringBuilder();
        @Override
        public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
         if (exchange1 == null) {
             return exchange2;
         }

builder.append(exchange1.getIn().getBody(String.class)).append(exchange2.getIn().getBody(String.class));
         exchange1.getIn().setBody(builder.toString());
         return exchange1;
        }


--
View this message in context: 
http://camel.465427.n5.nabble.com/Spliter-in-Camel-tp4940967p4946971.html
Sent from the Camel - Users mailing list archive at Nabble.com.


--

Christian Schneider
http://www.liquid-reality.de

Open Source Architect
Talend Application Integration Division http://www.talend.com

Reply via email to