Hi

Great catch.

Btw there is a typo in the constant for the attribute :)


On Thu, Jul 22, 2010 at 9:24 PM,  <krass...@apache.org> wrote:
> Author: krasserm
> Date: Thu Jul 22 19:24:24 2010
> New Revision: 966815
>
> URL: http://svn.apache.org/viewvc?rev=966815&view=rev
> Log:
> Closes CAMEL-2986: IllegalStateException in CamelContinuationServlet under 
> heavy load
>
> Added:
>    
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
> Modified:
>    
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
>    
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
>
> Modified: 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=966815&r1=966814&r2=966815&view=diff
> ==============================================================================
> --- 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
>  (original)
> +++ 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
>  Thu Jul 22 19:24:24 2010
> @@ -38,6 +38,8 @@ import org.eclipse.jetty.continuation.Co
>  */
>  public class CamelContinuationServlet extends CamelServlet {
>
> +    static final String EXCHANGE_ATRRIBUTE_NAME = "CamelExchange";
> +
>     private static final long serialVersionUID = 1L;
>
>     @Override
> @@ -50,15 +52,15 @@ public class CamelContinuationServlet ex
>                 return;
>             }
>
> -            // are we suspended?
> -            if (consumer.isSuspended()) {
> +            final Continuation continuation = 
> ContinuationSupport.getContinuation(request);
> +
> +            // are we suspended and a request is dispatched initially?
> +            if (consumer.isSuspended() && continuation.isInitial()) {
>                 
> response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
>                 return;
>             }
>
> -            final Continuation continuation = 
> ContinuationSupport.getContinuation(request);
>             if (continuation.isInitial()) {
> -
>                 // a new request so create an exchange
>                 final Exchange exchange = new 
> DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
>                 if (consumer.getEndpoint().isBridgeEndpoint()) {
> @@ -69,44 +71,28 @@ public class CamelContinuationServlet ex
>                 }
>                 exchange.setIn(new HttpMessage(exchange, request, response));
>
> +                if (log.isTraceEnabled()) {
> +                    log.trace("Suspending continuation of exchangeId: " + 
> exchange.getExchangeId());
> +                }
> +                continuation.suspend();
> +
>                 // use the asynchronous API to process the exchange
> -                boolean sync = 
> consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
> +                consumer.getAsyncProcessor().process(exchange, new 
> AsyncCallback() {
>                     public void done(boolean doneSync) {
> -                        // we only have to handle async completion
> -                        if (doneSync) {
> -                            return;
> -                        }
> -
> -                        // we should resume the continuation now that we are 
> done asynchronously
>                         if (log.isTraceEnabled()) {
>                             log.trace("Resuming continuation of exchangeId: " 
> + exchange.getExchangeId());
>                         }
> -                        continuation.setAttribute("CamelExchange", exchange);
> +                        // resume processing after both, sync and async 
> callbacks
> +                        continuation.setAttribute(EXCHANGE_ATRRIBUTE_NAME, 
> exchange);
>                         continuation.resume();
>                     }
>                 });
> -
> -                if (!sync) {
> -                    // wait for the exchange to get processed.
> -                    // this might block until it completes or it might 
> return via an exception and
> -                    // then this method is re-invoked once the the exchange 
> has finished processing
> -                    if (log.isTraceEnabled()) {
> -                        log.trace("Suspending continuation of exchangeId: " 
> + exchange.getExchangeId());
> -                    }
> -                    continuation.suspend(response);
> -                    return;
> -                }
> -
> -                // now lets output to the response
> -                if (log.isTraceEnabled()) {
> -                    log.trace("Writing response of exchangeId: " + 
> exchange.getExchangeId());
> -                }
> -                consumer.getBinding().writeResponse(exchange, response);
>                 return;
>             }
>
>             if (continuation.isResumed()) {
> -                Exchange exchange = (Exchange) 
> continuation.getAttribute("CamelExchange");
> +                // a re-dispatched request containing the processing result
> +                Exchange exchange = (Exchange) 
> continuation.getAttribute(EXCHANGE_ATRRIBUTE_NAME);
>                 if (log.isTraceEnabled()) {
>                     log.trace("Resuming continuation of exchangeId: " + 
> exchange.getExchangeId());
>                 }
> @@ -116,8 +102,10 @@ public class CamelContinuationServlet ex
>                     log.trace("Writing response of exchangeId: " + 
> exchange.getExchangeId());
>                 }
>                 consumer.getBinding().writeResponse(exchange, response);
> -                return;
>             }
> +        } catch (IOException e) {
> +            log.error("Error processing request", e);
> +            throw e;
>         } catch (Exception e) {
>             log.error("Error processing request", e);
>             throw new ServletException(e);
>
> Added: 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java?rev=966815&view=auto
> ==============================================================================
> --- 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
>  (added)
> +++ 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
>  Thu Jul 22 19:24:24 2010
> @@ -0,0 +1,43 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.component.jetty;
> +
> +import java.io.IOException;
> +
> +import javax.servlet.FilterChain;
> +import javax.servlet.ServletException;
> +import javax.servlet.ServletRequest;
> +import javax.servlet.ServletResponse;
> +
> +import org.eclipse.jetty.servlets.MultiPartFilter;
> +
> +/**
> + * A multipart filter that processes only initially dispatched requests.
> + * Re-dispatched requests are ignored.
> + */
> +class CamelMultipartFilter extends MultiPartFilter {
> +
> +   �...@override
> +    public void doFilter(ServletRequest request, ServletResponse response, 
> FilterChain chain) throws IOException, ServletException {
> +        if 
> (request.getAttribute(CamelContinuationServlet.EXCHANGE_ATRRIBUTE_NAME) == 
> null) {
> +            super.doFilter(request, response, chain);
> +        } else {
> +            chain.doFilter(request, response);
> +        }
> +    }
> +
> +}
>
> Modified: 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
> URL: 
> http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java?rev=966815&r1=966814&r2=966815&view=diff
> ==============================================================================
> --- 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
>  (original)
> +++ 
> camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
>  Thu Jul 22 19:24:24 2010
> @@ -55,7 +55,6 @@ import org.eclipse.jetty.server.ssl.SslS
>  import org.eclipse.jetty.servlet.FilterHolder;
>  import org.eclipse.jetty.servlet.ServletContextHandler;
>  import org.eclipse.jetty.servlet.ServletHolder;
> -import org.eclipse.jetty.servlets.MultiPartFilter;
>  import org.eclipse.jetty.util.component.LifeCycle;
>  import org.eclipse.jetty.util.thread.QueuedThreadPool;
>  import org.eclipse.jetty.util.thread.ThreadPool;
> @@ -599,7 +598,7 @@ public class JettyHttpComponent extends
>             }
>             context.setAttribute("javax.servlet.context.tempdir", file);
>         }
> -        filterHolder.setFilter(new MultiPartFilter());
> +        filterHolder.setFilter(new CamelMultipartFilter());
>         //add the default MultiPartFilter filter for it
>         context.addFilter(filterHolder, "/*", 0);
>         context.addServlet(holder, "/*");
>
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Reply via email to