Sure You can of course solve what I've described many ways, but I'll explain using 3 routes as that's what I used.
This first route is the main route I mentioned earlier, so you send your socket messages here and it's multicast to both the aggregator and to the socket. from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator", "someOutboundSocketEndpoint"); This next route will aggregate, both requests and responses are sent here as you envisaged. from("direct:requestResponseAggregator"). .aggregate(header("someCorrellationId"), requestResponseAggregator) .completionSize(2) .completionTimeout(5000) .to("direct:requestResponse"); //Here you can send the "aggregated" message, in my case it's only the response I forward unless there's a timeout, then I forward the request of course. Finally the route that consumes the socket responses. from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator"); //this headerEnricher doesn't have to be a processor, you have many options to add a header. If that's not clear feel free to ask. Taariq On Tue, Aug 16, 2011 at 9:30 PM, James Carman <ja...@carmanconsulting.com> wrote: > Care to share an example? I'm not picturing it. > > On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote: >> Hi James >> >> I did that too for what it's worth. >> I send the message to a route that forwards to both the aggregator and to >> the socket. >> When the response comes in I use an enricher to add the ID to the headers >> and then forward to the aggregator. >> >> Taariq >> >> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote: >> >>> Willem, >>> >>> Thank you for your help. I don't think this is doing exactly what I >>> need, though. The real trick here is the asynchronous nature of the >>> "server" on the other end of this situation. I thought about using an >>> aggregator to make sure the response gets matched up with the request >>> using a correlation id. The aggregator wouldn't aggregate multiple >>> responses together into one, it would just make sure it matches the >>> correct response with its request. Does this sound like a valid >>> approach? If so, how the heck do I go about it? :) >>> >>> Thanks, >>> >>> James >>> >>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> wrote: >>>> Hi James, >>>> >>>> Camel async process engine already provides the way that you want. >>>> You can take a look at the camel-cxf code[1][2] for some example. >>>> >>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup >>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup >>>> >>>> On 8/7/11 1:29 AM, James Carman wrote: >>>>> >>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com> >>>>> wrote: >>>>>> >>>>>> Hi James, >>>>>> >>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I >>>>>> assume want to use camel-netty [1] to send messages to your sever (if you >>>>>> have your own code that does that, you can use it too, but you'd have to >>>>>> write your own Processor or Component). Iiuic, your scenario is >>>>>> converting a >>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange >>>>>> as >>>>>> an async in-out and let your framework (Camel) decompose it and compose >>>>>> it >>>>>> back again. I would not keep threads blocked so I believe your best bet >>>>>> is >>>>>> using the Camel async messaging [2] and Futures (look at the examples >>>>>> using >>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so >>>>>> you'll need a correlationId, which you must have already and something to >>>>>> keep your state. A good bet would be jms [3], or you could write your >>>>>> own. >>>>>> If you used jms you would need to use both a correlationId and a replyTo >>>>>> queue. >>>>>> >>>>>> from("jms:request-queue").to("netty:output?=correlationId"); >>>>>> from("netty:input).to("jms:replyTo-queue") >>>>>> >>>>> >>>>> Perhaps a bit more information might be appropriate here. Eventually, >>>>> I'd like to "expose" this route via web services (using CXF of >>>>> course). So, I would need to either block the request thread, waiting >>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous >>>>> processing stuff (I'm thinking this might help us get more done with >>>>> less http request threads) to do more of a continuation thing. >>>>> >>>>> We already have a correlation id. The "protocol" requires one and the >>>>> server process just echos it back in the response message. >>>>> >>>>>> You may have to play a bit with the correlationId and if you cannot use >>>>>> the same you can do a second transformation/correlation using a >>>>>> claim-check >>>>>> sort of pattern. If you don't want to use jms you can implement your own >>>>>> (in >>>>>> memory) persistence and correlation. You can also use a resequencer [4] >>>>>> if >>>>>> you want to enforce the order. If you use asyncCallback, you get the >>>>>> replies >>>>>> when they become available, and you can control that. >>>>>> >>>>> >>>>> I don't think a resequencer is necessary. I don't want to guarantee >>>>> the ordering. I'm mostly interested in throughput here. So, if a >>>>> message comes in after another, but it can be processed faster, so be >>>>> it. >>>>> >>>>>> It's an interesting scenario, I'll definitely give it more thought, but I >>>>>> hope this helps. >>>>>> Hadrian >>>>>> >>>>> >>>>> You have been very helpful. Thank you for taking the time! >>>>> >>>> >>>> >>>> -- >>>> Willem >>>> ---------------------------------- >>>> FuseSource >>>> Web: http://www.fusesource.com >>>> Blog: http://willemjiang.blogspot.com (English) >>>> http://jnn.javaeye.com (Chinese) >>>> Twitter: willemjiang >>>> Weibo: willemjiang >>>> >> >