No worries!  Thank you for your help.  It helped me understand a bit
more about how these aggregators work..  However, I still don't
understand how to take care of my problem.  I guess I'm going to have
to roll my own processor or something.

On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com> wrote:
> Hmmm.
> Maybe others can help with that if it's possible, I haven't had to wrestle 
> with it.
>
> In my case it is actually a cxf service too, but it's asynchronous  and I 
> send the response once I have it, indicating either timeout or the actual 
> response.
>
> Sorry I responded to your question without going back to see your other posts.
>
> Taariq
>
> On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:
>
>> In my case, the originating request comes from CXF.  How do I send the
>> aggregated response back to CXF?
>>
>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote:
>>> The consumer that handles the aggregated/timed-out request or response.
>>>
>>> I have to resend a few times if it's the request, I simply feed it back 
>>> into "direct:socketRequestRoute" with the header for the number of retry 
>>> attempts incremented.
>>> If it's the response I can forward to some process.
>>>
>>> Taariq
>>>
>>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> 
>>> wrote:
>>>
>>>> What's listening on the:
>>>>
>>>> to("direct:requestResponse")
>>>>
>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>>> Sure
>>>>>
>>>>> You can of course solve what I've described many ways, but I'll
>>>>> explain using 3 routes as that's what I used.
>>>>>
>>>>> This first route is the main route I mentioned earlier, so you send
>>>>> your socket messages here and it's multicast to both the aggregator
>>>>> and to the socket.
>>>>>
>>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>>  "someOutboundSocketEndpoint");
>>>>>
>>>>>
>>>>> This next route will aggregate, both requests and responses are sent
>>>>> here as you envisaged.
>>>>> from("direct:requestResponseAggregator").
>>>>>                .aggregate(header("someCorrellationId"),
>>>>> requestResponseAggregator)
>>>>>                .completionSize(2)
>>>>>                .completionTimeout(5000)
>>>>>                .to("direct:requestResponse"); //Here you can send the
>>>>> "aggregated" message, in my case it's only the response I forward
>>>>> unless there's a timeout, then I forward the request of course.
>>>>>
>>>>> Finally the route that consumes the socket responses.
>>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>>> options to add a header.
>>>>>
>>>>> If that's not clear feel free to ask.
>>>>>
>>>>> Taariq
>>>>>
>>>>>
>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>>> <ja...@carmanconsulting.com> wrote:
>>>>>> Care to share an example?  I'm not picturing it.
>>>>>>
>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>>>>> Hi James
>>>>>>>
>>>>>>> I did that too for what it's worth.
>>>>>>> I send the message to a route that forwards to both the aggregator and 
>>>>>>> to the socket.
>>>>>>> When the response comes in I use an enricher to add the ID to the 
>>>>>>> headers and then forward to the aggregator.
>>>>>>>
>>>>>>> Taariq
>>>>>>>
>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Willem,
>>>>>>>>
>>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> James
>>>>>>>>
>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> 
>>>>>>>> wrote:
>>>>>>>>> Hi James,
>>>>>>>>>
>>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>>>
>>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>>>
>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>>>
>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com>
>>>>>>>>>>  wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi James,
>>>>>>>>>>>
>>>>>>>>>>> I hope I understand your scenario correctly. Here are a few 
>>>>>>>>>>> thoughts. I
>>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever 
>>>>>>>>>>> (if you
>>>>>>>>>>> have your own code that does that, you can use it too, but you'd 
>>>>>>>>>>> have to
>>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is 
>>>>>>>>>>> converting a
>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your 
>>>>>>>>>>> exchange as
>>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and 
>>>>>>>>>>> compose it
>>>>>>>>>>> back again. I would not keep threads blocked so I believe your best 
>>>>>>>>>>> bet is
>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the 
>>>>>>>>>>> examples using
>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is 
>>>>>>>>>>> stateless so
>>>>>>>>>>> you'll need a correlationId, which you must have already and 
>>>>>>>>>>> something to
>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write 
>>>>>>>>>>> your own.
>>>>>>>>>>> If you used jms you would need to use both a correlationId and a 
>>>>>>>>>>> replyTo
>>>>>>>>>>> queue.
>>>>>>>>>>>
>>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Perhaps a bit more information might be appropriate here.  
>>>>>>>>>> Eventually,
>>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>>> course).  So, I would need to either block the request thread, 
>>>>>>>>>> waiting
>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>>>
>>>>>>>>>> We already have a correlation id.  The "protocol" requires one and 
>>>>>>>>>> the
>>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>>>
>>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot 
>>>>>>>>>>> use
>>>>>>>>>>> the same you can do a second transformation/correlation using a 
>>>>>>>>>>> claim-check
>>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement 
>>>>>>>>>>> your own (in
>>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer 
>>>>>>>>>>> [4] if
>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get 
>>>>>>>>>>> the replies
>>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, 
>>>>>>>>>>> but I
>>>>>>>>>>> hope this helps.
>>>>>>>>>>> Hadrian
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Willem
>>>>>>>>> ----------------------------------
>>>>>>>>> FuseSource
>>>>>>>>> Web: http://www.fusesource.com
>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>>> Twitter: willemjiang
>>>>>>>>> Weibo: willemjiang
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to