Claus Ibsen schrieb:
Hi

Great catch.

Btw there is a typo in the constant for the attribute :)

D'oh! Just committed a fix.


On Thu, Jul 22, 2010 at 9:24 PM,  <krass...@apache.org> wrote:
Author: krasserm
Date: Thu Jul 22 19:24:24 2010
New Revision: 966815

URL: http://svn.apache.org/viewvc?rev=966815&view=rev
Log:
Closes CAMEL-2986: IllegalStateException in CamelContinuationServlet under 
heavy load

Added:
   
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
Modified:
   
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
   
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java?rev=966815&r1=966814&r2=966815&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
 Thu Jul 22 19:24:24 2010
@@ -38,6 +38,8 @@ import org.eclipse.jetty.continuation.Co
 */
 public class CamelContinuationServlet extends CamelServlet {

+    static final String EXCHANGE_ATRRIBUTE_NAME = "CamelExchange";
+
    private static final long serialVersionUID = 1L;

    @Override
@@ -50,15 +52,15 @@ public class CamelContinuationServlet ex
                return;
            }

-            // are we suspended?
-            if (consumer.isSuspended()) {
+            final Continuation continuation = 
ContinuationSupport.getContinuation(request);
+
+            // are we suspended and a request is dispatched initially?
+            if (consumer.isSuspended() && continuation.isInitial()) {
                response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
                return;
            }

-            final Continuation continuation = 
ContinuationSupport.getContinuation(request);
            if (continuation.isInitial()) {
-
                // a new request so create an exchange
                final Exchange exchange = new 
DefaultExchange(consumer.getEndpoint(), ExchangePattern.InOut);
                if (consumer.getEndpoint().isBridgeEndpoint()) {
@@ -69,44 +71,28 @@ public class CamelContinuationServlet ex
                }
                exchange.setIn(new HttpMessage(exchange, request, response));

+                if (log.isTraceEnabled()) {
+                    log.trace("Suspending continuation of exchangeId: " + 
exchange.getExchangeId());
+                }
+                continuation.suspend();
+
                // use the asynchronous API to process the exchange
-                boolean sync = consumer.getAsyncProcessor().process(exchange, 
new AsyncCallback() {
+                consumer.getAsyncProcessor().process(exchange, new 
AsyncCallback() {
                    public void done(boolean doneSync) {
-                        // we only have to handle async completion
-                        if (doneSync) {
-                            return;
-                        }
-
-                        // we should resume the continuation now that we are 
done asynchronously
                        if (log.isTraceEnabled()) {
                            log.trace("Resuming continuation of exchangeId: " + 
exchange.getExchangeId());
                        }
-                        continuation.setAttribute("CamelExchange", exchange);
+                        // resume processing after both, sync and async 
callbacks
+                        continuation.setAttribute(EXCHANGE_ATRRIBUTE_NAME, 
exchange);
                        continuation.resume();
                    }
                });
-
-                if (!sync) {
-                    // wait for the exchange to get processed.
-                    // this might block until it completes or it might return 
via an exception and
-                    // then this method is re-invoked once the the exchange 
has finished processing
-                    if (log.isTraceEnabled()) {
-                        log.trace("Suspending continuation of exchangeId: " + 
exchange.getExchangeId());
-                    }
-                    continuation.suspend(response);
-                    return;
-                }
-
-                // now lets output to the response
-                if (log.isTraceEnabled()) {
-                    log.trace("Writing response of exchangeId: " + 
exchange.getExchangeId());
-                }
-                consumer.getBinding().writeResponse(exchange, response);
                return;
            }

            if (continuation.isResumed()) {
-                Exchange exchange = (Exchange) 
continuation.getAttribute("CamelExchange");
+                // a re-dispatched request containing the processing result
+                Exchange exchange = (Exchange) 
continuation.getAttribute(EXCHANGE_ATRRIBUTE_NAME);
                if (log.isTraceEnabled()) {
                    log.trace("Resuming continuation of exchangeId: " + 
exchange.getExchangeId());
                }
@@ -116,8 +102,10 @@ public class CamelContinuationServlet ex
                    log.trace("Writing response of exchangeId: " + 
exchange.getExchangeId());
                }
                consumer.getBinding().writeResponse(exchange, response);
-                return;
            }
+        } catch (IOException e) {
+            log.error("Error processing request", e);
+            throw e;
        } catch (Exception e) {
            log.error("Error processing request", e);
            throw new ServletException(e);

Added: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java?rev=966815&view=auto
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
 (added)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/CamelMultipartFilter.java
 Thu Jul 22 19:24:24 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jetty;
+
+import java.io.IOException;
+
+import javax.servlet.FilterChain;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+
+import org.eclipse.jetty.servlets.MultiPartFilter;
+
+/**
+ * A multipart filter that processes only initially dispatched requests.
+ * Re-dispatched requests are ignored.
+ */
+class CamelMultipartFilter extends MultiPartFilter {
+
+    @Override
+    public void doFilter(ServletRequest request, ServletResponse response, 
FilterChain chain) throws IOException, ServletException {
+        if 
(request.getAttribute(CamelContinuationServlet.EXCHANGE_ATRRIBUTE_NAME) == 
null) {
+            super.doFilter(request, response, chain);
+        } else {
+            chain.doFilter(request, response);
+        }
+    }
+
+}

Modified: 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java?rev=966815&r1=966814&r2=966815&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
 (original)
+++ 
camel/trunk/components/camel-jetty/src/main/java/org/apache/camel/component/jetty/JettyHttpComponent.java
 Thu Jul 22 19:24:24 2010
@@ -55,7 +55,6 @@ import org.eclipse.jetty.server.ssl.SslS
 import org.eclipse.jetty.servlet.FilterHolder;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.servlets.MultiPartFilter;
 import org.eclipse.jetty.util.component.LifeCycle;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.eclipse.jetty.util.thread.ThreadPool;
@@ -599,7 +598,7 @@ public class JettyHttpComponent extends
            }
            context.setAttribute("javax.servlet.context.tempdir", file);
        }
-        filterHolder.setFilter(new MultiPartFilter());
+        filterHolder.setFilter(new CamelMultipartFilter());
        //add the default MultiPartFilter filter for it
        context.addFilter(filterHolder, "/*", 0);
        context.addServlet(holder, "/*");







Reply via email to