Hi Claus!

Sorry but I do not understand. My correlation expression is simply returning 
the value of "key" (see class "CorrelationExpression" below of my initial 
mail). Debugging I see the first list contains two elements as expected. As 
soon the correlation key changes camel correctly starts with a fresh list, but 
I assume camel keeps all lists in memory until the completionPredicate returns 
"true" (?).

But as I do not have any state my completionPredicate only sees the current 
exchange which is also the first item in the "group exchange" property (after 
correlation key changed), so there is no chance to detect aggregation of the 
previous block is completed.

Using eagerCheckCompletion I see the message before any correlation/aggegation, 
so again no chance to detect a correlation key change as the "group-exchange" 
property is never set.

A correlationTimeout causes unpredictable results, too, because a timeout can 
occur any time even in the middle of aggregating hugh blocks (this is not 
theoretical, I encounterred this behavior already while testing large block 
sizes).

The correlation/aggregation works as expected from my point of view, but 
completing aggregated items is the issue without making use of a timeout.

Thanks for your answer

Jens


Von meinem iPhone gesendet

> Am 07.12.2014 um 08:55 schrieb Claus Ibsen <claus.ib...@gmail.com>:
> 
> Hi
> 
> What is your correlation expression?
> 
> Sounds like you should use an expression that matches the key from the
> map, eg so you have one group for 1. And another group for 2.
> 
> Then the aggregate is just to add the bodies into a list.
> 
> 
>> On Sun, Dec 7, 2014 at 1:38 AM, Jens Breitenstein <mailingl...@j-b-s.de> 
>> wrote:
>> Hi Camel Users!
>> 
>> I am struggling with aggregation in general and hopefully you can give me a
>> hint:
>> 
>> Basically my message body contains a Map<String, String>. Now I need to
>> aggregate messages to List<Map<String, String>> based on a certain map key
>> value.
>> So lets assume we have
>> 
>> message 1: map { key = "1", val = "A" }
>> message 2: map { key = "1", val = "B" }
>> message 3: map { key = "2", val = "C" }
>> 
>> as result I need two messages:
>> 
>> message 1: List<Map<String, String>> { { key = "1", val = "A" }, { key =
>> "1", val = "B" } }
>> message 2: List<Map<String, String>> { { key = "2", val = "C" } }
>> 
>> Therefore I wrote a simple class to handle my correlation key:
>> 
>> public class CorrellationExpression
>> implements Expression
>> {
>>    @Override public <T> T evaluate(final Exchange exchange, final Class<T>
>> type)
>>    {
>> final Map<String, String> entity = (Map<String,
>> String>)exchange.getIn().getBody();
>> // this cast is ugly, use camel type-converter instead
>> final T t = (T) (null == entity ? "-" : entity.get("key").toString());
>> 
>> return t;
>>    }
>> }
>> 
>> When debugging my code, camel identifies the correlation key and the
>> aggregated list is created correctly (checked via
>> "message.getProperty(Exchange.GROUPED_EXCHANGE)").
>> Unfortunately camel does not send out the aggregated message after the
>> correlation key changed, but the List property only contains items belonging
>> to the correlation key.
>> I also tried "eagerCheckCompletion", but this is my message before
>> aggregation thus the "grouped_exchange" property is not set. So regardless
>> what I try it's not possible for me to decide if completion is complete.
>> Using "timeout" is no solution as I noticed a timeout can also occur in the
>> middle of aggregation which leads to an invalid aggregation result (same
>> keys spread accross two messages).
>> 
>> Any idea to handle aggregation without timeout is welcome.
>> 
>> Thanks in advance
>> 
>> Jens
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cib...@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> hawtio: http://hawt.io/
> fabric8: http://fabric8.io/

Reply via email to