I would really encourage you to take a look at the smx code for handling continuations. We've had quite a hard time to handle race conditions, timeouts etc... because the continuation has a timeout and when the message is received back around the timeout, things can become a bit tricky.
https://svn.apache.org/repos/asf/servicemix/components/bindings/servicemix-http/trunk/src/main/java/org/apache/servicemix/http/endpoints/HttpConsumerEndpoint.java We use one concurrent hash map to associate a message id to a continuation and multiple synchronization blocks on the continuation itself. Also the above code can be used with standard servlet servers (i.e. when the continuation is a blocking continuation) which is imho a good thing. On Tue, Nov 11, 2008 at 6:51 PM, Sergey Beryozkin <[EMAIL PROTECTED]> wrote: > Hi > >>> >>> I have 10 threads involved, 5 control ones + 5 application ones, I see a >>> loss of message approximately once in 5 cases. The fact that >>> cont.resume() >>> is done virtually immediately after cont.suspend() can explain it. >> >> Without seeing your code, I cannot really offer valid suggestions, but >> I'll >> try.... :-) > > I guess having it all on a branch would be handy then :-) > >> >> One thought was in the Continuation object, record if "resume()" has been >> called and if it's been callled by the time the stack unwinds back into >> the >> Http transport, just re-dispatch immediately. Either that or have the >> resume block until the http transport sets a "ready to resume" flag just >> before it allows the exception to flow back into jetty. > > I have 2 tests. > > In one test an application server code interacts with a wrapper, both when > getting a continuation instance and when calling suspend/resume on it (as > suggested by yourself earlier in this thread). In this case, under the hood, > an inbound message is associated with a continuation instance before > suspend() is called on it. Thus even if the resulting exception does not > reach Jetty Destination in time before continuation.resume() is called by a > control thread, the message is not lost when the HTTP request is resumed as > that HTTP request had this continuation instance associated with it at a > time ContinuationsSupport.getContinuations(request) was called. > > In other test which I believe represents an integration scenario with SMX > better, an application server code calls Jetty > ContinuationsSupport.getContinuations(request) followed by > continuation.suspend(). Now, in this case, before a (Jetty RetryRequest) > runtime exception reaches a catch block in AbstractInvoker (where I try to > associate a message with continuation), one or two control threads manage to > squeeze in and call resume() before catch block has even been processed. So > by the time the wrapped exception reaches JettyDestination a request with a > resumed continuation has already come back... > > Does this explanation for a second case and the associated race condition > sounds reasonable ? > > Cheers, Sergey > > > > > >> >> >> Dan >> >>> >>> Cheers, Sergey >>> >>> > That said, I'm now trying to inject a message as a custom continuation >>> > object (while preserving the original one if any, both ways) as early >>> > as >>> > possible, in AbstractInvoker, so the time window at which the race >>> > condition I talked about earlier can cause the loss of the original >>> > message, is extremely small the time it taked for the >>> > continuation.suspend() exception to reach a catch block in >>> > AbstractInvoker. >>> > >>> > Cheers, Sergey >>> > >>> >> Hi, >>> >> >>> >> I did some system testing with Jetty continuations and it's going not >>> >> too bad. Here's one issue which I've encountered which might or might >>> >> not be a problem in cases where continuations are ustilized directly >>> >> (that is without our wrappers), as in case of say ServiceMix CXF >>> >> binding >>> >> component. >>> >> >>> >> The problem is that when continuation.suspend(timeout) has been >>> >> called, >>> >> a resulting RuntimeException might not reach CXF JettyDestination >>> >> (such >>> >> that the original message with its phase chain can be preserved until >>> >> the request is resumed) if some other application thread calls >>> >> continuation.resume() or continuation suspend timeout expires. >>> >> >>> >> In case of ServiceMix the latter is a theoretical possibility at the >>> >> least. I can see in its code this timeout is configured, but if this >>> >> timeout is in the region of up to 1 sec or so then it's feasible that >>> >> with a heavy workload the race condition described above might come >>> >> to >>> >> life. >>> >> >>> >> That said, as part of my test, I found that even when such condition >>> >> occurs, the 'worst' thing which can happen is that a new message and a >>> >> new chain are created, that is, the request is not resumed from a >>> >> 'suspended' ServiceInvokerInterceptor, but starts as if it was a new >>> >> request alltogether, but it all works nonetheless, as all the stack >>> >> variables used in various interceptors in my given test at least are >>> >> all >>> >> obtained from a message. The only downside is that that the work which >>> >> has already been done earlier as part of handling the suspended >>> >> request >>> >> is repeated again by the interceptors. It can cause issues though in >>> >> cases when some interceptors have sideeffects as part of handling a >>> >> given input request, say modify a db, etc >>> >> >>> >> Now, this race condition can be safely avoided if a wrapper proposed >>> >> by >>> >> Dan is used by a server application code as the message can be >>> >> preserved >>> >> immediately at a point a user calls suspend on our wrapper, so without >>> >> further doubts I've prototyped it too. It's not possible for SMX >>> >> components though >>> >> >>> >> Comments ? >>> >> >>> >> Cheers, Sergey >>> >> >>> >>> I guess my thinking was to tie the continutations directly to the >>> >>> PhaseInterceptorChain (since that is going to need to know about them >>> >>> anyway). However, I suppose it could easily be done with a new >>> >>> interface. Probably the best thing to do is to stub out a sample >>> >>> usecase. So here goes..... >>> >>> >>> >>> Lets take a "GreetMe" web service that in the greetMe method will >>> >>> call >>> >>> off asynchrously to some JMS service to actually get the result. >>> >>> >>> >>> @Resource(name = "jmsClient") >>> >>> Greeter jmsGreeter >>> >>> @Resource >>> >>> WebServiceContext context; >>> >>> public String greetMe(String arg) { >>> >>> ContinuationSupport contSupport = (ContinuationSupport) >>> >>> context.get(ContinuationSupport.class.getName()); >>> >>> if (contSupport == null) { >>> >>> //continuations not supported, must wait >>> >>> return jmsGreeter.greetMe(arg); >>> >>> } >>> >>> Continuation cont = contSupport.getContinuation(); >>> >>> if (cont.isResumed()) { >>> >>> AsyncHandler<GreetMeResponse> handler = cont.getObject(); >>> >>> return handler.get().getReturn(); >>> >>> } else { >>> >>> AsyncHandler<GreetMeResponse> handler = new Handler(cont); >>> >>> jmsGreeter.greetMeAsync(arg, handler); >>> >>> cont.suspend(handler); >>> >>> return null; //won't actually get here as suspend will throw a >>> >>> ContinuationException >>> >>> } >>> >>> } >>> >>> >>> >>> The Handler would look something like: >>> >>> class Handler implements AsyncHandler<GreetMeResponse> { >>> >>> GreetMeResponse resp; >>> >>> Continuation cont; >>> >>> public Handler(Continuation cont) { >>> >>> this.cont = cont; >>> >>> } >>> >>> public void handleResponse(Response<GreetMeLaterResponse> >>> >>> response) { resp = response.get(); >>> >>> cont.resume(); >>> >>> } >>> >>> } >>> >>> >>> >>> Basically, the HTTP/Jetty transport could provide an implementation >>> >>> of >>> >>> ContinuationSupport that wrappers the jetty stuff. JMS could >>> >>> provide >>> >>> one that's pretty much a null op. Transports that cannot support it >>> >>> (like servlet) just wouldn't provide an implementation. >>> >>> >>> >>> >>> >>> Does that make sense? Other ideas? >>> >>> >>> >>> Dan >>> >>> >>> >>> On Friday 24 October 2008 9:58:08 am Sergey Beryozkin wrote: >>> >>>> > No. We don't want that. Whatever we do should work for other >>> >>>> > transports as well like JMS. Thus, this shouldn't be tied to >>> >>>> > jetty >>> >>>> > continuations directly. >>> >>>> >>> >>>> No, I'm not suggesting to tie it up to jetty continuations. >>> >>>> Ex. >>> >>>> >>> >>>> try { >>> >>>> invoke(); // continuation.suspend() somehow by the code being >>> >>>> invoked upon } >>> >>>> catch (RuntimeException ex) { >>> >>>> >>> >>>> if >>> >>>> (ex.getClass().getName().equals("jetty.JettyContinuationException")) >>> >>>> throw new SuspendedFault(ex); >>> >>>> // or PhaseInterceptorChain.suspend() >>> >>>> } >>> >>>> } >>> >>>> >>> >>>> > Most likely, we could add a "suspend()" method to >>> >>>> > PhaseInterceptorChain that would do something very similar and >>> >>>> > throw >>> >>>> > a "SuspendException" or something in the same package as >>> >>>> > PhaseInterceptorChain. >>> >>>> >>> >>>> When do we trigger this PhaseInterceptorChain.suspend() call though >>> >>>> ? >>> >>>> >>> >>>> > That would get propogated >>> >>>> > back to the JettyDestination that could then call the jetty >>> >>>> > things. >>> >>>> > The JMS transport could just catch it and more or less ignore it. >>> >>>> > We'd then have to add a "resume()" method to the chain which >>> >>>> > would >>> >>>> > call back onto a listener that the transport provides. Jetty >>> >>>> > would >>> >>>> > just call the jetty resume stuff. JMS would probably put a >>> >>>> > runnable >>> >>>> > on the workqueue to restart the chain. >>> >>>> >>> >>>> ok >>> >>>> >>> >>>> > Also, suspend() would need to check if there is a listener. If >>> >>>> > not, >>> >>>> > it should not throw the exception. Thus, the servlet transport >>> >>>> > and >>> >>>> > CORBA stuff that couldn't do this would pretty much just ignore >>> >>>> > it. >>> >>>> >>> >>>> ok, not sure I understand about the listener but I think I see what >>> >>>> you mean... >>> >>>> >>> >>>> > Basically, this needs to be done in such a way that it CAN work >>> >>>> > for >>> >>>> > the non-jetty cases. However, it also needs to be done in a way >>> >>>> > that doesn't affect existing transports. >>> >>>> >>> >>>> +1 >>> >>>> >>> >>>> Cheers, Sergey >>> >>>> >>> >>>> > Dan >>> >>>> > >>> >>>> >> 2. Now, if the above can be figured out, the next problem arises: >>> >>>> >> when the "trigger" to wake up the continuation occurs >>> >>>> >> >>> >>>> >> I think we can can do in JettyDestination omething similar to >>> >>>> >> what >>> >>>> >> is done in SMX. When getting a SuspendedFault exception, we can >>> >>>> >> extract from it the original continuation instance or else we can >>> >>>> >> do ContinuationSupport.getContinuation(request) which should >>> >>>> >> return >>> >>>> >> us the instance. At this point we can use it as a ket to store >>> >>>> >> the >>> >>>> >> current exchange plus all the other info we may need. >>> >>>> >> >>> >>>> >> When the user/application code does continuation.resume(), the >>> >>>> >> Jetty thread will come back and we will use the >>> >>>> >> ContinuationSupport.getContinuation(request) to get us the active >>> >>>> >> continuation and use it to extract the suspended exchange and >>> >>>> >> proceed from there, say we'll call >>> >>>> >> PhaseInterceptorPhase.resume(), >>> >>>> >> etc, something along the lines you suggested >>> >>>> >> >>> >>>> >> >>> >>>> >> 3. Basically, to do this "right", we'd need to audit pretty much >>> >>>> >> everything to make sure nothing is stored on the stack and is >>> >>>> >> "resumable". Once that is done, the rest is relatively easy. >>> >>>> >> >>> >>>> >> Yea - probably can be the quite challenging >>> >>>> >> >>> >>>> >> >>> >>>> >> Thoughts ? >>> >>>> >> >>> >>>> >> Cheers, Sergey >>> >>>> >> >>> >>>> >> >>> >>>> >> >>> >>>> >> >>> >>>> >> [1] http://docs.codehaus.org/display/JETTY/Continuations >>> >>>> >> [2] https://issues.apache.org/jira/browse/CXF-1835 >>> >>>> >> [3] >>> >>>> >> >>> >>>> >> https://issues.apache.org/jira/browse/CXF-1835?focusedCommentId=126 >>> >>>> >>42361 #ac tion_12642361 >>> >>>> > >>> >>>> > -- >>> >>>> > Daniel Kulp >>> >>>> > [EMAIL PROTECTED] >>> >>>> > http://dankulp.com/blog >>> >>> >>> >>> -- >>> >>> Daniel Kulp >>> >>> [EMAIL PROTECTED] >>> >>> http://dankulp.com/blog >> >> >> >> -- >> Daniel Kulp >> [EMAIL PROTECTED] >> http://dankulp.com/blog > > -- Cheers, Guillaume Nodet ------------------------ Blog: http://gnodet.blogspot.com/ ------------------------ Open Source SOA http://fusesource.com