Hi Great catch.
Btw there is a typo in the constant for the attribute :) On Thu, Jul 22, 2010 at 9:24 PM, <[email protected]> wrote: > Author: krasserm > Date: Thu Jul 22 19:24:24 2010 > New Revision: 966815 > > URL: http://svn.apache.org/viewvc?rev=966815&view=rev > Log: > Closes CAMEL-2986: IllegalStateException in CamelContinuationServlet under > heavy load > > Added: > > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java > Modified: > > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java > > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java > > Modified: > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=966815&r1=966814&r2=966815&view=diff > ============================================================================== > --- > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java > (original) > +++ > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java > Thu Jul 22 19:24:24 2010 > @@ -38,6 +38,8 @@ import org.eclipse.jetty.continuation.Co > */ > public class CamelContinuationServlet extends CamelServlet { > > + static final String EXCHANGE_ATRRIBUTE_NAME = "CamelExchange"; > + > private static final long serialVersionUID = 1L; > > @Override > @@ -50,15 +52,15 @@ public class CamelContinuationServlet ex > return; > } > > - // are we suspended? > - if (consumer.isSuspended()) { > + final Continuation continuation = > ContinuationSupport.getContinuation(request); > + > + // are we suspended and a request is dispatched initially? > + if (consumer.isSuspended() && continuation.isInitial()) { > > response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE); > return; > } > > - final Continuation continuation = > ContinuationSupport.getContinuation(request); > if (continuation.isInitial()) { > - > // a new request so create an exchange > final Exchange exchange = new > DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut); > if (consumer.getEndpoint().isBridgeEndpoint()) { > @@ -69,44 +71,28 @@ public class CamelContinuationServlet ex > } > exchange.setIn(new HttpMessage(exchange, request, response)); > > + if (log.isTraceEnabled()) { > + log.trace("Suspending continuation of exchangeId: " + > exchange.getExchangeId()); > + } > + continuation.suspend(); > + > // use the asynchronous API to process the exchange > - boolean sync = > consumer.getAsyncProcessor().process(exchange, new AsyncCallback() { > + consumer.getAsyncProcessor().process(exchange, new > AsyncCallback() { > public void done(boolean doneSync) { > - // we only have to handle async completion > - if (doneSync) { > - return; > - } > - > - // we should resume the continuation now that we are > done asynchronously > if (log.isTraceEnabled()) { > log.trace("Resuming continuation of exchangeId: " > + exchange.getExchangeId()); > } > - continuation.setAttribute("CamelExchange", exchange); > + // resume processing after both, sync and async > callbacks > + continuation.setAttribute(EXCHANGE_ATRRIBUTE_NAME, > exchange); > continuation.resume(); > } > }); > - > - if (!sync) { > - // wait for the exchange to get processed. > - // this might block until it completes or it might > return via an exception and > - // then this method is re-invoked once the the exchange > has finished processing > - if (log.isTraceEnabled()) { > - log.trace("Suspending continuation of exchangeId: " > + exchange.getExchangeId()); > - } > - continuation.suspend(response); > - return; > - } > - > - // now lets output to the response > - if (log.isTraceEnabled()) { > - log.trace("Writing response of exchangeId: " + > exchange.getExchangeId()); > - } > - consumer.getBinding().writeResponse(exchange, response); > return; > } > > if (continuation.isResumed()) { > - Exchange exchange = (Exchange) > continuation.getAttribute("CamelExchange"); > + // a re-dispatched request containing the processing result > + Exchange exchange = (Exchange) > continuation.getAttribute(EXCHANGE_ATRRIBUTE_NAME); > if (log.isTraceEnabled()) { > log.trace("Resuming continuation of exchangeId: " + > exchange.getExchangeId()); > } > @@ -116,8 +102,10 @@ public class CamelContinuationServlet ex > log.trace("Writing response of exchangeId: " + > exchange.getExchangeId()); > } > consumer.getBinding().writeResponse(exchange, response); > - return; > } > + } catch (IOException e) { > + log.error("Error processing request", e); > + throw e; > } catch (Exception e) { > log.error("Error processing request", e); > throw new ServletException(e); > > Added: > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java?rev=966815&view=auto > ============================================================================== > --- > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java > (added) > +++ > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java > Thu Jul 22 19:24:24 2010 > @@ -0,0 +1,43 @@ > +/** > + * Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > +package org.apache.camel.component.jetty; > + > +import java.io.IOException; > + > +import javax.servlet.FilterChain; > +import javax.servlet.ServletException; > +import javax.servlet.ServletRequest; > +import javax.servlet.ServletResponse; > + > +import org.eclipse.jetty.servlets.MultiPartFilter; > + > +/** > + * A multipart filter that processes only initially dispatched requests. > + * Re-dispatched requests are ignored. > + */ > +class CamelMultipartFilter extends MultiPartFilter { > + > + �...@override > + public void doFilter(ServletRequest request, ServletResponse response, > FilterChain chain) throws IOException, ServletException { > + if > (request.getAttribute(CamelContinuationServlet.EXCHANGE_ATRRIBUTE_NAME) == > null) { > + super.doFilter(request, response, chain); > + } else { > + chain.doFilter(request, response); > + } > + } > + > +} > > Modified: > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java > URL: > http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java?rev=966815&r1=966814&r2=966815&view=diff > ============================================================================== > --- > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java > (original) > +++ > camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java > Thu Jul 22 19:24:24 2010 > @@ -55,7 +55,6 @@ import org.eclipse.jetty.server.ssl.SslS > import org.eclipse.jetty.servlet.FilterHolder; > import org.eclipse.jetty.servlet.ServletContextHandler; > import org.eclipse.jetty.servlet.ServletHolder; > -import org.eclipse.jetty.servlets.MultiPartFilter; > import org.eclipse.jetty.util.component.LifeCycle; > import org.eclipse.jetty.util.thread.QueuedThreadPool; > import org.eclipse.jetty.util.thread.ThreadPool; > @@ -599,7 +598,7 @@ public class JettyHttpComponent extends > } > context.setAttribute("javax.servlet.context.tempdir", file); > } > - filterHolder.setFilter(new MultiPartFilter()); > + filterHolder.setFilter(new CamelMultipartFilter()); > //add the default MultiPartFilter filter for it > context.addFilter(filterHolder, "/*", 0); > context.addServlet(holder, "/*"); > > > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus
