I would really encourage you to take a look at the smx code for
handling continuations.
We've had quite a hard time to handle race conditions, timeouts etc...
because the continuation has a timeout and when the message is
received back around the timeout, things can become a bit tricky.

https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java

We use one concurrent hash map to associate a message id to a
continuation and multiple synchronization blocks on the continuation
itself.
Also the above code can be used with standard servlet servers (i.e.
when the continuation is a blocking continuation) which is imho a good
thing.

On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin
<[EMAIL PROTECTED]> wrote:
> Hi
>
>>>
>>> I have 10 threads involved, 5 control ones + 5 application ones, I see a
>>> loss of message approximately once in 5 cases. The fact that
>>> cont.resume()
>>> is done virtually immediately after cont.suspend() can explain it.
>>
>> Without seeing your code, I cannot really offer valid suggestions, but
>> I'll
>> try....   :-)
>
> I guess having it all on a branch would be handy then :-)
>
>>
>> One thought was in the Continuation object, record if "resume()" has been
>> called and if it's been callled by the time the stack unwinds back into
>> the
>> Http transport, just re-dispatch immediately.   Either that or have the
>> resume block until the http transport sets a "ready to resume" flag just
>> before it allows the exception to flow back into jetty.
>
> I have 2 tests.
>
> In one test an application server code interacts with a wrapper, both when
> getting a continuation instance and when calling suspend/resume on it (as
> suggested by yourself earlier in this thread). In this case, under the hood,
> an inbound message is associated with a continuation instance before
> suspend() is called on it. Thus even if the resulting exception does not
> reach Jetty Destination in time before continuation.resume() is called by a
> control thread, the message is not lost when the HTTP request is resumed as
> that HTTP request had this continuation instance associated with it at a
> time ContinuationsSupport.getContinuations(request) was called.
>
> In other test which I believe represents an integration scenario with SMX
> better, an application server code calls Jetty
> ContinuationsSupport.getContinuations(request) followed by
> continuation.suspend(). Now, in this case, before a (Jetty RetryRequest)
> runtime exception reaches a catch block in AbstractInvoker (where I try to
> associate a message with continuation), one or two control threads manage to
> squeeze in and call resume() before catch block has even been processed. So
> by the time the wrapped exception reaches JettyDestination a request with a
> resumed continuation has already come back...
>
> Does this explanation for a second case and the associated race condition
> sounds reasonable ?
>
> Cheers, Sergey
>
>
>
>
>
>>
>>
>> Dan
>>
>>>
>>> Cheers, Sergey
>>>
>>> > That said, I'm now trying to inject a message as a custom continuation
>>> > object (while preserving the original one if any, both ways) as early
>>> > as
>>> > possible, in AbstractInvoker, so the time window at which the race
>>> > condition I talked about earlier can cause the loss of the original
>>> > message, is extremely small the time it taked for the
>>> > continuation.suspend() exception to reach a catch block in
>>> > AbstractInvoker.
>>> >
>>> > Cheers, Sergey
>>> >
>>> >> Hi,
>>> >>
>>> >> I did some system testing with Jetty continuations and it's going not
>>> >> too bad. Here's one issue which I've encountered which might or might
>>> >> not be a problem in cases where continuations are ustilized directly
>>> >> (that is without our wrappers), as in case of say ServiceMix CXF
>>> >> binding
>>> >> component.
>>> >>
>>> >> The problem is that when continuation.suspend(timeout) has been
>>> >> called,
>>> >> a resulting RuntimeException might not reach CXF JettyDestination
>>> >> (such
>>> >> that the original message with its phase chain can be preserved until
>>> >> the request is resumed) if some other application thread calls
>>> >> continuation.resume() or continuation suspend timeout expires.
>>> >>
>>> >> In case of ServiceMix the latter is a theoretical possibility at the
>>> >> least. I can see in its code this timeout is configured, but if this
>>> >> timeout is in the region of up to 1 sec or so then it's feasible that
>>> >> with a heavy  workload the race condition described above might come
>>> >> to
>>> >> life.
>>> >>
>>> >> That said, as part of my test, I found that even when such condition
>>> >> occurs, the 'worst' thing which can happen is that a new message and a
>>> >> new chain are created, that is, the request is not resumed from a
>>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new
>>> >> request alltogether, but it all works nonetheless, as all the stack
>>> >> variables used in various interceptors in my given test at least are
>>> >> all
>>> >> obtained from a message. The only downside is that that the work which
>>> >> has already been done earlier as part of handling the suspended
>>> >> request
>>> >> is repeated again by the interceptors. It can cause issues though in
>>> >> cases when some interceptors have sideeffects as part of handling a
>>> >> given input request, say modify a db, etc
>>> >>
>>> >> Now, this race condition can be safely avoided if a wrapper proposed
>>> >> by
>>> >> Dan is used by a server application code as the message can be
>>> >> preserved
>>> >> immediately at a point a user calls suspend on our wrapper, so without
>>> >> further doubts I've prototyped it too. It's not possible for SMX
>>> >> components though
>>> >>
>>> >> Comments ?
>>> >>
>>> >> Cheers, Sergey
>>> >>
>>> >>> I guess my thinking was to tie the continutations directly to the
>>> >>> PhaseInterceptorChain (since that is going to need to know about them
>>> >>> anyway).   However, I suppose it could easily be done with a new
>>> >>> interface. Probably the best thing to do is to stub out a sample
>>> >>> usecase.   So here goes.....
>>> >>>
>>> >>> Lets take a "GreetMe" web service that in the greetMe method will
>>> >>> call
>>> >>> off asynchrously to some JMS service to actually get the result.
>>> >>>
>>> >>> @Resource(name = "jmsClient")
>>> >>> Greeter jmsGreeter
>>> >>> @Resource
>>> >>> WebServiceContext context;
>>> >>> public String greetMe(String arg) {
>>> >>>     ContinuationSupport contSupport = (ContinuationSupport)
>>> >>>              context.get(ContinuationSupport.class.getName());
>>> >>>     if (contSupport == null) {
>>> >>>          //continuations not supported, must wait
>>> >>>          return jmsGreeter.greetMe(arg);
>>> >>>     }
>>> >>>     Continuation cont = contSupport.getContinuation();
>>> >>>     if (cont.isResumed()) {
>>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject();
>>> >>>        return handler.get().getReturn();
>>> >>>     } else {
>>> >>>         AsyncHandler<GreetMeResponse> handler = new Handler(cont);
>>> >>>         jmsGreeter.greetMeAsync(arg, handler);
>>> >>>         cont.suspend(handler);
>>> >>> return null;   //won't actually get here as suspend will throw a
>>> >>> ContinuationException
>>> >>>     }
>>> >>> }
>>> >>>
>>> >>> The Handler would look something like:
>>> >>> class Handler implements AsyncHandler<GreetMeResponse> {
>>> >>> GreetMeResponse resp;
>>> >>>        Continuation cont;
>>> >>> public Handler(Continuation cont) {
>>> >>>            this.cont = cont;
>>> >>>        }
>>> >>>        public void handleResponse(Response<GreetMeLaterResponse>
>>> >>> response) { resp = response.get();
>>> >>>              cont.resume();
>>> >>>       }
>>> >>> }
>>> >>>
>>> >>> Basically, the HTTP/Jetty transport could provide an implementation
>>> >>> of
>>> >>> ContinuationSupport that wrappers the jetty stuff.    JMS could
>>> >>> provide
>>> >>> one that's pretty much a null op.   Transports that cannot support it
>>> >>> (like servlet) just wouldn't provide an implementation.
>>> >>>
>>> >>>
>>> >>> Does that make sense?   Other ideas?
>>> >>>
>>> >>> Dan
>>> >>>
>>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote:
>>> >>>> > No.   We don't want that.   Whatever we do should work for other
>>> >>>> > transports as well like JMS.  Thus, this shouldn't be tied to
>>> >>>> > jetty
>>> >>>> > continuations directly.
>>> >>>>
>>> >>>> No, I'm not suggesting to tie it up to jetty continuations.
>>> >>>> Ex.
>>> >>>>
>>> >>>> try {
>>> >>>>   invoke(); // continuation.suspend() somehow by the code being
>>> >>>> invoked upon }
>>> >>>> catch (RuntimeException ex) {
>>> >>>>
>>> >>>> if
>>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException"))
>>> >>>> throw new SuspendedFault(ex);
>>> >>>>     // or PhaseInterceptorChain.suspend()
>>> >>>> }
>>> >>>> }
>>> >>>>
>>> >>>> > Most likely, we could add a "suspend()" method to
>>> >>>> > PhaseInterceptorChain that would do something very similar and
>>> >>>> > throw
>>> >>>> > a "SuspendException" or something in the same package as
>>> >>>> > PhaseInterceptorChain.
>>> >>>>
>>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call though
>>> >>>> ?
>>> >>>>
>>> >>>> >   That would get propogated
>>> >>>> > back to the JettyDestination that could then call the jetty
>>> >>>> > things.
>>> >>>> >  The JMS transport could just catch it and more or less ignore it.
>>> >>>> >  We'd then have to add a "resume()" method to the chain which
>>> >>>> > would
>>> >>>> > call back onto a listener that the transport provides.   Jetty
>>> >>>> > would
>>> >>>> > just call the jetty resume stuff. JMS would probably put a
>>> >>>> > runnable
>>> >>>> > on the workqueue to restart the chain.
>>> >>>>
>>> >>>> ok
>>> >>>>
>>> >>>> > Also, suspend() would need to check if there is a listener.  If
>>> >>>> > not,
>>> >>>> > it should not throw the exception.   Thus, the servlet transport
>>> >>>> > and
>>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore
>>> >>>> > it.
>>> >>>>
>>> >>>> ok, not sure I understand about the listener but I think I see what
>>> >>>> you mean...
>>> >>>>
>>> >>>> > Basically, this needs to be done in such a way that it CAN work
>>> >>>> > for
>>> >>>> > the non-jetty cases.   However, it also needs to be done in a way
>>> >>>> > that doesn't affect existing transports.
>>> >>>>
>>> >>>> +1
>>> >>>>
>>> >>>> Cheers, Sergey
>>> >>>>
>>> >>>> > Dan
>>> >>>> >
>>> >>>> >> 2. Now, if the above can be figured out, the next problem arises:
>>> >>>> >> when the "trigger" to wake up the continuation occurs
>>> >>>> >>
>>> >>>> >> I think we can can do in JettyDestination omething similar to
>>> >>>> >> what
>>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can
>>> >>>> >> extract from it the original continuation instance or else we can
>>> >>>> >> do ContinuationSupport.getContinuation(request) which should
>>> >>>> >> return
>>> >>>> >> us the instance. At this point we can use it as a ket to store
>>> >>>> >> the
>>> >>>> >> current exchange plus all the other info we may need.
>>> >>>> >>
>>> >>>> >> When the user/application code does continuation.resume(), the
>>> >>>> >> Jetty thread will come back and we will use the
>>> >>>> >> ContinuationSupport.getContinuation(request) to get us the active
>>> >>>> >> continuation and use it to extract the suspended exchange and
>>> >>>> >> proceed from there, say we'll call
>>> >>>> >> PhaseInterceptorPhase.resume(),
>>> >>>> >> etc, something along the lines you suggested
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty much
>>> >>>> >> everything to make sure nothing is stored on the stack and is
>>> >>>> >> "resumable". Once that is done, the rest is relatively easy.
>>> >>>> >>
>>> >>>> >> Yea - probably can be the quite challenging
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> Thoughts ?
>>> >>>> >>
>>> >>>> >> Cheers, Sergey
>>> >>>> >>
>>> >>>> >>
>>> >>>> >>
>>> >>>> >>
>>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations
>>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835
>>> >>>> >> [3]
>>> >>>> >>
>>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126
>>> >>>> >>42361 #ac tion_12642361
>>> >>>> >
>>> >>>> > --
>>> >>>> > Daniel Kulp
>>> >>>> > [EMAIL PROTECTED]
>>> >>>> > http://dankulp.com/blog
>>> >>>
>>> >>> --
>>> >>> Daniel Kulp
>>> >>> [EMAIL PROTECTED]
>>> >>> http://dankulp.com/blog
>>
>>
>>
>> --
>> Daniel Kulp
>> [EMAIL PROTECTED]
>> http://dankulp.com/blog
>
>



-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/
------------------------
Open Source SOA
http://fusesource.com

Reply via email to