Hmmm.
Maybe others can help with that if it's possible, I haven't had to wrestle with 
it.

In my case it is actually a cxf service too, but it's asynchronous  and I send 
the response once I have it, indicating either timeout or the actual response.

Sorry I responded to your question without going back to see your other posts.

Taariq

On 16 Aug 2011, at 10:33 PM, James Carman <ja...@carmanconsulting.com> wrote:

> In my case, the originating request comes from CXF.  How do I send the
> aggregated response back to CXF?
> 
> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote:
>> The consumer that handles the aggregated/timed-out request or response.
>> 
>> I have to resend a few times if it's the request, I simply feed it back into 
>> "direct:socketRequestRoute" with the header for the number of retry attempts 
>> incremented.
>> If it's the response I can forward to some process.
>> 
>> Taariq
>> 
>> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>> 
>>> What's listening on the:
>>> 
>>> to("direct:requestResponse")
>>> 
>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>> Sure
>>>> 
>>>> You can of course solve what I've described many ways, but I'll
>>>> explain using 3 routes as that's what I used.
>>>> 
>>>> This first route is the main route I mentioned earlier, so you send
>>>> your socket messages here and it's multicast to both the aggregator
>>>> and to the socket.
>>>> 
>>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>>  "someOutboundSocketEndpoint");
>>>> 
>>>> 
>>>> This next route will aggregate, both requests and responses are sent
>>>> here as you envisaged.
>>>> from("direct:requestResponseAggregator").
>>>>                .aggregate(header("someCorrellationId"),
>>>> requestResponseAggregator)
>>>>                .completionSize(2)
>>>>                .completionTimeout(5000)
>>>>                .to("direct:requestResponse"); //Here you can send the
>>>> "aggregated" message, in my case it's only the response I forward
>>>> unless there's a timeout, then I forward the request of course.
>>>> 
>>>> Finally the route that consumes the socket responses.
>>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>>   //this headerEnricher doesn't have to be a processor, you have many
>>>> options to add a header.
>>>> 
>>>> If that's not clear feel free to ask.
>>>> 
>>>> Taariq
>>>> 
>>>> 
>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>>> <ja...@carmanconsulting.com> wrote:
>>>>> Care to share an example?  I'm not picturing it.
>>>>> 
>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>>>> Hi James
>>>>>> 
>>>>>> I did that too for what it's worth.
>>>>>> I send the message to a route that forwards to both the aggregator and 
>>>>>> to the socket.
>>>>>> When the response comes in I use an enricher to add the ID to the 
>>>>>> headers and then forward to the aggregator.
>>>>>> 
>>>>>> Taariq
>>>>>> 
>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Willem,
>>>>>>> 
>>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>>> responses together into one, it would just make sure it matches the
>>>>>>> correct response with its request.  Does this sound like a valid
>>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> James
>>>>>>> 
>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> 
>>>>>>> wrote:
>>>>>>>> Hi James,
>>>>>>>> 
>>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>> 
>>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>> 
>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>> 
>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com>
>>>>>>>>>  wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi James,
>>>>>>>>>> 
>>>>>>>>>> I hope I understand your scenario correctly. Here are a few 
>>>>>>>>>> thoughts. I
>>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever 
>>>>>>>>>> (if you
>>>>>>>>>> have your own code that does that, you can use it too, but you'd 
>>>>>>>>>> have to
>>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is 
>>>>>>>>>> converting a
>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your 
>>>>>>>>>> exchange as
>>>>>>>>>> an async in-out and let your framework (Camel) decompose it and 
>>>>>>>>>> compose it
>>>>>>>>>> back again. I would not keep threads blocked so I believe your best 
>>>>>>>>>> bet is
>>>>>>>>>> using the Camel async messaging [2] and Futures (look at the 
>>>>>>>>>> examples using
>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless 
>>>>>>>>>> so
>>>>>>>>>> you'll need a correlationId, which you must have already and 
>>>>>>>>>> something to
>>>>>>>>>> keep your state. A good bet would be jms [3], or you could write 
>>>>>>>>>> your own.
>>>>>>>>>> If you used jms you would need to use both a correlationId and a 
>>>>>>>>>> replyTo
>>>>>>>>>> queue.
>>>>>>>>>> 
>>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>> 
>>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>>> server process just echos it back in the response message.
>>>>>>>>> 
>>>>>>>>>> You may have to play a bit with the correlationId and if you cannot 
>>>>>>>>>> use
>>>>>>>>>> the same you can do a second transformation/correlation using a 
>>>>>>>>>> claim-check
>>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your 
>>>>>>>>>> own (in
>>>>>>>>>> memory) persistence and correlation. You can also use a resequencer 
>>>>>>>>>> [4] if
>>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the 
>>>>>>>>>> replies
>>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>>> it.
>>>>>>>>> 
>>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, 
>>>>>>>>>> but I
>>>>>>>>>> hope this helps.
>>>>>>>>>> Hadrian
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Willem
>>>>>>>> ----------------------------------
>>>>>>>> FuseSource
>>>>>>>> Web: http://www.fusesource.com
>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>>> Twitter: willemjiang
>>>>>>>> Weibo: willemjiang
>>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 

Reply via email to