Sure

You can of course solve what I've described many ways, but I'll
explain using 3 routes as that's what I used.

This first route is the main route I mentioned earlier, so you send
your socket messages here and it's multicast to both the aggregator
and to the socket.

from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
 "someOutboundSocketEndpoint");


This next route will aggregate, both requests and responses are sent
here as you envisaged.
from("direct:requestResponseAggregator").
                .aggregate(header("someCorrellationId"),
requestResponseAggregator)
                .completionSize(2)
                .completionTimeout(5000)
                .to("direct:requestResponse"); //Here you can send the
"aggregated" message, in my case it's only the response I forward
unless there's a timeout, then I forward the request of course.

Finally the route that consumes the socket responses.
from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
   //this headerEnricher doesn't have to be a processor, you have many
options to add a header.

If that's not clear feel free to ask.

Taariq


On Tue, Aug 16, 2011 at 9:30 PM, James Carman
<ja...@carmanconsulting.com> wrote:
> Care to share an example?  I'm not picturing it.
>
> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote:
>> Hi James
>>
>> I did that too for what it's worth.
>> I send the message to a route that forwards to both the aggregator and to 
>> the socket.
>> When the response comes in I use an enricher to add the ID to the headers 
>> and then forward to the aggregator.
>>
>> Taariq
>>
>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote:
>>
>>> Willem,
>>>
>>> Thank you for your help.  I don't think this is doing exactly what I
>>> need, though.  The real trick here is the asynchronous nature of the
>>> "server" on the other end of this situation.  I thought about using an
>>> aggregator to make sure the response gets matched up with the request
>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>> responses together into one, it would just make sure it matches the
>>> correct response with its request.  Does this sound like a valid
>>> approach?  If so, how the heck do I go about it? :)
>>>
>>> Thanks,
>>>
>>> James
>>>
>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> wrote:
>>>> Hi James,
>>>>
>>>> Camel async process engine already provides the way that you want.
>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>
>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>
>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>
>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com>
>>>>>  wrote:
>>>>>>
>>>>>> Hi James,
>>>>>>
>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I
>>>>>> assume want to use camel-netty [1] to send messages to your sever (if you
>>>>>> have your own code that does that, you can use it too, but you'd have to
>>>>>> write your own Processor or Component). Iiuic, your scenario is 
>>>>>> converting a
>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your exchange 
>>>>>> as
>>>>>> an async in-out and let your framework (Camel) decompose it and compose 
>>>>>> it
>>>>>> back again. I would not keep threads blocked so I believe your best bet 
>>>>>> is
>>>>>> using the Camel async messaging [2] and Futures (look at the examples 
>>>>>> using
>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so
>>>>>> you'll need a correlationId, which you must have already and something to
>>>>>> keep your state. A good bet would be jms [3], or you could write your 
>>>>>> own.
>>>>>> If you used jms you would need to use both a correlationId and a replyTo
>>>>>> queue.
>>>>>>
>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>
>>>>>
>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>> course).  So, I would need to either block the request thread, waiting
>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>> less http request threads) to do more of a continuation thing.
>>>>>
>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>> server process just echos it back in the response message.
>>>>>
>>>>>> You may have to play a bit with the correlationId and if you cannot use
>>>>>> the same you can do a second transformation/correlation using a 
>>>>>> claim-check
>>>>>> sort of pattern. If you don't want to use jms you can implement your own 
>>>>>> (in
>>>>>> memory) persistence and correlation. You can also use a resequencer [4] 
>>>>>> if
>>>>>> you want to enforce the order. If you use asyncCallback, you get the 
>>>>>> replies
>>>>>> when they become available, and you can control that.
>>>>>>
>>>>>
>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>> message comes in after another, but it can be processed faster, so be
>>>>> it.
>>>>>
>>>>>> It's an interesting scenario, I'll definitely give it more thought, but I
>>>>>> hope this helps.
>>>>>> Hadrian
>>>>>>
>>>>>
>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>
>>>>
>>>>
>>>> --
>>>> Willem
>>>> ----------------------------------
>>>> FuseSource
>>>> Web: http://www.fusesource.com
>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>         http://jnn.javaeye.com (Chinese)
>>>> Twitter: willemjiang
>>>> Weibo: willemjiang
>>>>
>>
>

Reply via email to