No worries! Thank you for your help. It helped me understand a bit more about how these aggregators work.. However, I still don't understand how to take care of my problem. I guess I'm going to have to roll my own processor or something.
On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com> wrote: > Hmmm. > Maybe others can help with that if it's possible, I haven't had to wrestle > with it. > > In my case it is actually a cxf service too, but it's asynchronous and I > send the response once I have it, indicating either timeout or the actual > response. > > Sorry I responded to your question without going back to see your other posts. > > Taariq > > On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote: > >> In my case, the originating request comes from CXF. How do I send the >> aggregated response back to CXF? >> >> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote: >>> The consumer that handles the aggregated/timed-out request or response. >>> >>> I have to resend a few times if it's the request, I simply feed it back >>> into "direct:socketRequestRoute" with the header for the number of retry >>> attempts incremented. >>> If it's the response I can forward to some process. >>> >>> Taariq >>> >>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> >>> wrote: >>> >>>> What's listening on the: >>>> >>>> to("direct:requestResponse") >>>> >>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> wrote: >>>>> Sure >>>>> >>>>> You can of course solve what I've described many ways, but I'll >>>>> explain using 3 routes as that's what I used. >>>>> >>>>> This first route is the main route I mentioned earlier, so you send >>>>> your socket messages here and it's multicast to both the aggregator >>>>> and to the socket. >>>>> >>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator", >>>>> "someOutboundSocketEndpoint"); >>>>> >>>>> >>>>> This next route will aggregate, both requests and responses are sent >>>>> here as you envisaged. >>>>> from("direct:requestResponseAggregator"). >>>>> .aggregate(header("someCorrellationId"), >>>>> requestResponseAggregator) >>>>> .completionSize(2) >>>>> .completionTimeout(5000) >>>>> .to("direct:requestResponse"); //Here you can send the >>>>> "aggregated" message, in my case it's only the response I forward >>>>> unless there's a timeout, then I forward the request of course. >>>>> >>>>> Finally the route that consumes the socket responses. >>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator"); >>>>> //this headerEnricher doesn't have to be a processor, you have many >>>>> options to add a header. >>>>> >>>>> If that's not clear feel free to ask. >>>>> >>>>> Taariq >>>>> >>>>> >>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman >>>>> <ja...@carmanconsulting.com> wrote: >>>>>> Care to share an example? I'm not picturing it. >>>>>> >>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote: >>>>>>> Hi James >>>>>>> >>>>>>> I did that too for what it's worth. >>>>>>> I send the message to a route that forwards to both the aggregator and >>>>>>> to the socket. >>>>>>> When the response comes in I use an enricher to add the ID to the >>>>>>> headers and then forward to the aggregator. >>>>>>> >>>>>>> Taariq >>>>>>> >>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Willem, >>>>>>>> >>>>>>>> Thank you for your help. I don't think this is doing exactly what I >>>>>>>> need, though. The real trick here is the asynchronous nature of the >>>>>>>> "server" on the other end of this situation. I thought about using an >>>>>>>> aggregator to make sure the response gets matched up with the request >>>>>>>> using a correlation id. The aggregator wouldn't aggregate multiple >>>>>>>> responses together into one, it would just make sure it matches the >>>>>>>> correct response with its request. Does this sound like a valid >>>>>>>> approach? If so, how the heck do I go about it? :) >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> James >>>>>>>> >>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> >>>>>>>> wrote: >>>>>>>>> Hi James, >>>>>>>>> >>>>>>>>> Camel async process engine already provides the way that you want. >>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example. >>>>>>>>> >>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup >>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup >>>>>>>>> >>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote: >>>>>>>>>> >>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi James, >>>>>>>>>>> >>>>>>>>>>> I hope I understand your scenario correctly. Here are a few >>>>>>>>>>> thoughts. I >>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever >>>>>>>>>>> (if you >>>>>>>>>>> have your own code that does that, you can use it too, but you'd >>>>>>>>>>> have to >>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is >>>>>>>>>>> converting a >>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your >>>>>>>>>>> exchange as >>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and >>>>>>>>>>> compose it >>>>>>>>>>> back again. I would not keep threads blocked so I believe your best >>>>>>>>>>> bet is >>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the >>>>>>>>>>> examples using >>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is >>>>>>>>>>> stateless so >>>>>>>>>>> you'll need a correlationId, which you must have already and >>>>>>>>>>> something to >>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write >>>>>>>>>>> your own. >>>>>>>>>>> If you used jms you would need to use both a correlationId and a >>>>>>>>>>> replyTo >>>>>>>>>>> queue. >>>>>>>>>>> >>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId"); >>>>>>>>>>> from("netty:input).to("jms:replyTo-queue") >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Perhaps a bit more information might be appropriate here. >>>>>>>>>> Eventually, >>>>>>>>>> I'd like to "expose" this route via web services (using CXF of >>>>>>>>>> course). So, I would need to either block the request thread, >>>>>>>>>> waiting >>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous >>>>>>>>>> processing stuff (I'm thinking this might help us get more done with >>>>>>>>>> less http request threads) to do more of a continuation thing. >>>>>>>>>> >>>>>>>>>> We already have a correlation id. The "protocol" requires one and >>>>>>>>>> the >>>>>>>>>> server process just echos it back in the response message. >>>>>>>>>> >>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot >>>>>>>>>>> use >>>>>>>>>>> the same you can do a second transformation/correlation using a >>>>>>>>>>> claim-check >>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement >>>>>>>>>>> your own (in >>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer >>>>>>>>>>> [4] if >>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get >>>>>>>>>>> the replies >>>>>>>>>>> when they become available, and you can control that. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I don't think a resequencer is necessary. I don't want to guarantee >>>>>>>>>> the ordering. I'm mostly interested in throughput here. So, if a >>>>>>>>>> message comes in after another, but it can be processed faster, so be >>>>>>>>>> it. >>>>>>>>>> >>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, >>>>>>>>>>> but I >>>>>>>>>>> hope this helps. >>>>>>>>>>> Hadrian >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> You have been very helpful. Thank you for taking the time! >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Willem >>>>>>>>> ---------------------------------- >>>>>>>>> FuseSource >>>>>>>>> Web: http://www.fusesource.com >>>>>>>>> Blog: http://willemjiang.blogspot.com (English) >>>>>>>>> http://jnn.javaeye.com (Chinese) >>>>>>>>> Twitter: willemjiang >>>>>>>>> Weibo: willemjiang >>>>>>>>> >>>>>>> >>>>>> >>>>> >>> >