On Tue, Mar 2, 2010 at 4:09 PM, Andrew Chandler <[email protected]> wrote:
> We wound up resolving this by basically totally subverting a secondary
> splitter - we split the first time to do the parallel work and then
> route to a second custom splitter that maintains an internal map of
> result objects - the map is based on the correlation key and the object
> includes properties for totalsplit count for that correlation key,
> results received so far, and isForwarded.     Basically the first time a
> correlation comes in a result is created into the map for the
> correlationid.   If any of the results indicate success we set the
> isForwarded to true and return a message in our list of results
> indicating the success, however we don't remove the correlation id from
> the map until ALL responses are received.   It's not as pretty as an
> aggregator but it does seem to work for us.    We've had to do that
> elsewhere as well though and considering we seem to be using memory at a
> staggering rate my worry is that we're passing around too many maps in
> messages that are duplicated when split etc.    Time to pull out the
> profiler I guess :)
>

Thanks for sharing your solution.

Where do you store those Maps? I would assume the copy is cheap as its
just a reference copy, and not a deep copy.


>
>
> On Tue, 2010-03-02 at 07:03 +0100, Claus Ibsen wrote:
>
>> On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <[email protected]> wrote:
>> > When does 2.3 come out - sounds like what I want, just I'm pretty sure
>> > we can't update to something that isn't released yet or at least very
>> > close to release
>> >
>>
>> 2.2 was just recently released. I would think 2.3 is a couple of more
>> months away.
>>
>> If you want supported and more often released version of Camel then I
>> can only recommend taking a look at the FUSE versions.
>>
>>
>> >
>> > On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:
>> >
>> >> Hi
>> >>
>> >> Try with the new overhauled aggreagtor in 2.3
>> >> http://camel.apache.org/aggregator2.html
>> >>
>> >> It works bette with completion trigger.
>> >>
>> >>
>> >> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <[email protected]> wrote:
>> >> > Hi there - with Clause help I've been able to get most of the way to
>> >> > where I need to be.   Right now I'm doing a proof of concept with string
>> >> > payloads,however in the end the payload will be an object.   Here's what
>> >> > I'm attempting
>> >> >
>> >> >
>> >> > I have an incoming message that contains an identifier as well as (N)
>> >> > things to do against it.   The (N) things can be done in parallel.    So
>> >> > what we are doing is splitting based on the (N) things.     Here's where
>> >> > it gets tricky.
>> >> > - The first of the (N) things to report success should be sent on while
>> >> > the rest of them should be aborted.   We should then forward the success
>> >> > on immediately not waiting for timeouts
>> >> > - Further, in the event that none of them report success we should
>> >> > aggregate until all (N) things have reported failure and then forward
>> >> > that single negative result onward.
>> >> > - As the (N) things inherently have timeouts built into them it would be
>> >> > nice if I didn't have to deal with batchTimeout for the aggregator.
>> >> >
>> >> >
>> >> >
>> >> > What I'm seeing now with my prototype is that I can successfully spit
>> >> > and process the split things using a threadPoolExecutor.   I provided
>> >> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
>> >> >
>> >> >
>> >> > Assume each of the split items have a built-in timeout on their work
>> >> > effort of 5 seconds
>> >> > With that result and without a .batchTimeout(7000L)   I was seeing 2
>> >> > results from aggregate,   - 1 almost immediately for the successful
>> >> > result and then a second aggregated message that had all the falures
>> >> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
>> >> > the .aggregate clause though I got 1 single message that had the success
>> >> > and the failures all in one.      This is close, however I guess what
>> >> > I'm asking is how can I control from inside the aggregation the decision
>> >> > to move forward?    In the splitter I'm already planning on including in
>> >> > each split object a sharedobject that can be used to abort any of the
>> >> > sibling split objects so I trhink I have a handle on that.
>> >> > Basically the reason I need the aggregate mechanism to control the
>> >> > continuing on part of the process is that if we're going after say
>> >> > 60,000 things then the ability to start work on the successful ones
>> >> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
>> >> > is significant.    But I still have to account for a totally negative
>> >> > response in the event none of them are successful.
>> >> >
>> >> > I'm presently looking at creating my own AggregationCollection as it
>> >> > seemed to allow me to figure out size of the aggregated collection and I
>> >> > can somehow figure out the total number of items split versus how many
>> >> > have been aggregated to determine I'm done.   (I thought that info was
>> >> > supposed to be in the header somewhere but it doesn't seem to be there)
>> >> >
>> >> >
>> >> > Any insights or redirects are appreciated.
>> >> >
>> >>
>> >>
>> >>
>> >
>> >
>> >
>>
>>
>>
>
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Reply via email to