Author: krasserm
Date: Thu Jul 22 19:24:24 2010
New Revision: 966815
URL: http://svn.apache.org/viewvc?rev=966815&view=rev
Log:
Closes CAMEL-2986: IllegalStateException in CamelContinuationServlet under
heavy load
Added:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=966815&r1=966814&r2=966815&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
(original)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
Thu Jul 22 19:24:24 2010
@@ -38,6 +38,8 @@ import org.eclipse.jetty.continuation.Co
*/
public class CamelContinuationServlet extends CamelServlet {
+ static final String EXCHANGE_ATRRIBUTE_NAME = "CamelExchange";
+
private static final long serialVersionUID = 1L;
@Override
@@ -50,15 +52,15 @@ public class CamelContinuationServlet ex
return;
}
- // are we suspended?
- if (consumer.isSuspended()) {
+ final Continuation continuation =
ContinuationSupport.getContinuation(request);
+
+ // are we suspended and a request is dispatched initially?
+ if (consumer.isSuspended() && continuation.isInitial()) {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
return;
}
- final Continuation continuation =
ContinuationSupport.getContinuation(request);
if (continuation.isInitial()) {
-
// a new request so create an exchange
final Exchange exchange = new
DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
if (consumer.getEndpoint().isBridgeEndpoint()) {
@@ -69,44 +71,28 @@ public class CamelContinuationServlet ex
}
exchange.setIn(new HttpMessage(exchange, request, response));
+ if (log.isTraceEnabled()) {
+ log.trace("Suspending continuation of exchangeId: " +
exchange.getExchangeId());
+ }
+ continuation.suspend();
+
// use the asynchronous API to process the exchange
- boolean sync = consumer.getAsyncProcessor().process(exchange,
new AsyncCallback() {
+ consumer.getAsyncProcessor().process(exchange, new
AsyncCallback() {
public void done(boolean doneSync) {
- // we only have to handle async completion
- if (doneSync) {
- return;
- }
-
- // we should resume the continuation now that we are
done asynchronously
if (log.isTraceEnabled()) {
log.trace("Resuming continuation of exchangeId: " +
exchange.getExchangeId());
}
- continuation.setAttribute("CamelExchange", exchange);
+ // resume processing after both, sync and async
callbacks
+ continuation.setAttribute(EXCHANGE_ATRRIBUTE_NAME,
exchange);
continuation.resume();
}
});
-
- if (!sync) {
- // wait for the exchange to get processed.
- // this might block until it completes or it might return
via an exception and
- // then this method is re-invoked once the the exchange
has finished processing
- if (log.isTraceEnabled()) {
- log.trace("Suspending continuation of exchangeId: " +
exchange.getExchangeId());
- }
- continuation.suspend(response);
- return;
- }
-
- // now lets output to the response
- if (log.isTraceEnabled()) {
- log.trace("Writing response of exchangeId: " +
exchange.getExchangeId());
- }
- consumer.getBinding().writeResponse(exchange, response);
return;
}
if (continuation.isResumed()) {
- Exchange exchange = (Exchange)
continuation.getAttribute("CamelExchange");
+ // a re-dispatched request containing the processing result
+ Exchange exchange = (Exchange)
continuation.getAttribute(EXCHANGE_ATRRIBUTE_NAME);
if (log.isTraceEnabled()) {
log.trace("Resuming continuation of exchangeId: " +
exchange.getExchangeId());
}
@@ -116,8 +102,10 @@ public class CamelContinuationServlet ex
log.trace("Writing response of exchangeId: " +
exchange.getExchangeId());
}
consumer.getBinding().writeResponse(exchange, response);
- return;
}
+ } catch (IOException e) {
+ log.error("Error processing request", e);
+ throw e;
} catch (Exception e) {
log.error("Error processing request", e);
throw new ServletException(e);
Added:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java?rev=966815&view=auto
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
(added)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
Thu Jul 22 19:24:24 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import java.io.IOException;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+
+import org.eclipse.jetty.servlets.MultiPartFilter;
+
+/**
+ * A multipart filter that processes only initially dispatched requests.
+ * Re-dispatched requests are ignored.
+ */
+class CamelMultipartFilter extends MultiPartFilter {
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response,
FilterChain chain) throws IOException, ServletException {
+ if
(request.getAttribute(CamelContinuationServlet.EXCHANGE_ATRRIBUTE_NAME) ==
null) {
+ super.doFilter(request, response, chain);
+ } else {
+ chain.doFilter(request, response);
+ }
+ }
+
+}
Modified:
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java?rev=966815&r1=966814&r2=966815&view=diff
==============================================================================
---
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
(original)
+++
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
Thu Jul 22 19:24:24 2010
@@ -55,7 +55,6 @@ import org.eclipse.jetty.server.ssl.SslS
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.servlets.MultiPartFilter;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
@@ -599,7 +598,7 @@ public class JettyHttpComponent extends
}
context.setAttribute("javax.servlet.context.tempdir", file);
}
- filterHolder.setFilter(new MultiPartFilter());
+ filterHolder.setFilter(new CamelMultipartFilter());
//add the default MultiPartFilter filter for it
context.addFilter(filterHolder, "/*", 0);
context.addServlet(holder, "/*");