We wound up resolving this by basically totally subverting a secondary
splitter - we split the first time to do the parallel work and then
route to a second custom splitter that maintains an internal map of
result objects - the map is based on the correlation key and the object
includes properties for totalsplit count for that correlation key,
results received so far, and isForwarded.     Basically the first time a
correlation comes in a result is created into the map for the
correlationid.   If any of the results indicate success we set the
isForwarded to true and return a message in our list of results
indicating the success, however we don't remove the correlation id from
the map until ALL responses are received.   It's not as pretty as an
aggregator but it does seem to work for us.    We've had to do that
elsewhere as well though and considering we seem to be using memory at a
staggering rate my worry is that we're passing around too many maps in
messages that are duplicated when split etc.    Time to pull out the
profiler I guess :)



On Tue, 2010-03-02 at 07:03 +0100, Claus Ibsen wrote:

> On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <[email protected]> wrote:
> > When does 2.3 come out - sounds like what I want, just I'm pretty sure
> > we can't update to something that isn't released yet or at least very
> > close to release
> >
> 
> 2.2 was just recently released. I would think 2.3 is a couple of more
> months away.
> 
> If you want supported and more often released version of Camel then I
> can only recommend taking a look at the FUSE versions.
> 
> 
> >
> > On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote:
> >
> >> Hi
> >>
> >> Try with the new overhauled aggreagtor in 2.3
> >> http://camel.apache.org/aggregator2.html
> >>
> >> It works bette with completion trigger.
> >>
> >>
> >> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <[email protected]> wrote:
> >> > Hi there - with Clause help I've been able to get most of the way to
> >> > where I need to be.   Right now I'm doing a proof of concept with string
> >> > payloads,however in the end the payload will be an object.   Here's what
> >> > I'm attempting
> >> >
> >> >
> >> > I have an incoming message that contains an identifier as well as (N)
> >> > things to do against it.   The (N) things can be done in parallel.    So
> >> > what we are doing is splitting based on the (N) things.     Here's where
> >> > it gets tricky.
> >> > - The first of the (N) things to report success should be sent on while
> >> > the rest of them should be aborted.   We should then forward the success
> >> > on immediately not waiting for timeouts
> >> > - Further, in the event that none of them report success we should
> >> > aggregate until all (N) things have reported failure and then forward
> >> > that single negative result onward.
> >> > - As the (N) things inherently have timeouts built into them it would be
> >> > nice if I didn't have to deal with batchTimeout for the aggregator.
> >> >
> >> >
> >> >
> >> > What I'm seeing now with my prototype is that I can successfully spit
> >> > and process the split things using a threadPoolExecutor.   I provided
> >> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy())
> >> >
> >> >
> >> > Assume each of the split items have a built-in timeout on their work
> >> > effort of 5 seconds
> >> > With that result and without a .batchTimeout(7000L)   I was seeing 2
> >> > results from aggregate,   - 1 almost immediately for the successful
> >> > result and then a second aggregated message that had all the falures
> >> > about 4.5 seconds later.     When I tacked .batchTimeout(7000L) onto
> >> > the .aggregate clause though I got 1 single message that had the success
> >> > and the failures all in one.      This is close, however I guess what
> >> > I'm asking is how can I control from inside the aggregation the decision
> >> > to move forward?    In the splitter I'm already planning on including in
> >> > each split object a sharedobject that can be used to abort any of the
> >> > sibling split objects so I trhink I have a handle on that.
> >> > Basically the reason I need the aggregate mechanism to control the
> >> > continuing on part of the process is that if we're going after say
> >> > 60,000 things then the ability to start work on the successful ones
> >> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout
> >> > is significant.    But I still have to account for a totally negative
> >> > response in the event none of them are successful.
> >> >
> >> > I'm presently looking at creating my own AggregationCollection as it
> >> > seemed to allow me to figure out size of the aggregated collection and I
> >> > can somehow figure out the total number of items split versus how many
> >> > have been aggregated to determine I'm done.   (I thought that info was
> >> > supposed to be in the header somewhere but it doesn't seem to be there)
> >> >
> >> >
> >> > Any insights or redirects are appreciated.
> >> >
> >>
> >>
> >>
> >
> >
> >
> 
> 
> 


Reply via email to