Author: markt
Date: Thu Aug 15 10:32:15 2013
New Revision: 1514228

URL: http://svn.apache.org/r1514228
Log:
The container is responsible for the first call to each of onWritePossible() 
and onDataAvailable() once a listener has been set.
Main component is the addition to the SocketWrapper of a list of dispatch types 
that need to be made. "Dispatch type" in this case meaning "process the socket 
using the specified SocketStatus". This is used to register trigger the first 
call to each of onWritePossible() and onDataAvailable() for which the container 
is responsible.
Fix some additional issues identified in the test case.

Added:
    tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java   (with 
props)
Modified:
    tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java
    tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
    tomcat/trunk/java/org/apache/coyote/ActionCode.java
    tomcat/trunk/java/org/apache/coyote/Response.java
    tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
    tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java

Modified: tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java (original)
+++ tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java Thu Aug 15 
10:32:15 2013
@@ -249,6 +249,18 @@ public class InputBuffer extends Reader
 
     public void setReadListener(ReadListener listener) {
         coyoteRequest.setReadListener(listener);
+
+        // The container is responsible for the first call to
+        // listener.onDataAvailable(). If isReady() returns true, the container
+        // needs to call listener.onDataAvailable() from a new thread. If
+        // isReady() returns false, the socket will be registered for read and
+        // the container will call listener.onDataAvailable() once data 
arrives.
+        // Must call isFinished() first as a call to isReady() if the request
+        // has been finished will register the socket for read interest and 
that
+        // is not required.
+        if (isFinished() || isReady()) {
+            coyoteRequest.action(ActionCode.DISPATCH_READ, null);
+        }
     }
 
 

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Thu Aug 15 
10:32:15 2013
@@ -38,6 +38,7 @@ import org.apache.tomcat.util.collection
 import org.apache.tomcat.util.modeler.Registry;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
+import org.apache.tomcat.util.net.DispatchType;
 import org.apache.tomcat.util.net.SocketStatus;
 import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
@@ -616,7 +617,11 @@ public abstract class AbstractProtocol i
 
                 SocketState state = SocketState.CLOSED;
                 do {
-                    if (status == SocketStatus.DISCONNECT &&
+                    if (wrapper.hasNextDispatch()) {
+                        DispatchType nextDispatch = wrapper.getNextDispatch();
+                        state = processor.asyncDispatch(
+                                nextDispatch.getSocketStatus());
+                    } else if (status == SocketStatus.DISCONNECT &&
                             !processor.isComet()) {
                         // Do nothing here, just wait for it to get recycled
                         // Don't do this for Comet we need to generate an end
@@ -663,7 +668,8 @@ public abstract class AbstractProtocol i
                                 "], State out: [" + state + "]");
                     }
                 } while (state == SocketState.ASYNC_END ||
-                        state == SocketState.UPGRADING);
+                        state == SocketState.UPGRADING ||
+                        wrapper.hasNextDispatch());
 
                 if (state == SocketState.LONG) {
                     // In the middle of processing a request/response. Keep the

Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Thu Aug 15 10:32:15 2013
@@ -215,5 +215,17 @@ public enum ActionCode {
     /**
      * Indicates if the request body has been fully read.
      */
-    REQUEST_BODY_FULLY_READ
+    REQUEST_BODY_FULLY_READ,
+
+    /**
+     * Indicates that the container needs to trigger a call to 
onDataAvailable()
+     * for the registered non-blocking read listener.
+     */
+    DISPATCH_READ,
+
+    /**
+     * Indicates that the container needs to trigger a call to 
onWritePossible()
+     * for the registered non-blocking write listener.
+     */
+    DISPATCH_WRITE
 }

Modified: tomcat/trunk/java/org/apache/coyote/Response.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/Response.java (original)
+++ tomcat/trunk/java/org/apache/coyote/Response.java Thu Aug 15 10:32:15 2013
@@ -593,6 +593,21 @@ public final class Response {
         }
 
         this.listener = listener;
+
+        // The container is responsible for the first call to
+        // listener.onWritePossible(). If isReady() returns true, the container
+        // needs to call listener.onWritePossible() from a new thread. If
+        // isReady() returns false, the socket will be registered for write and
+        // the container will call listener.onWritePossible() once data can be
+        // written.
+        if (isReady()) {
+            action(ActionCode.DISPATCH_WRITE, null);
+            // Need to set the fireListener flag otherwise when the container
+            // tries to trigger onWritePossible, nothing will happen
+            synchronized (nonBlockingStateLock) {
+                fireListener = true;
+            }
+        }
     }
 
     public boolean isReady() {

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Thu 
Aug 15 10:32:15 2013
@@ -51,6 +51,7 @@ import org.apache.tomcat.util.http.MimeH
 import org.apache.tomcat.util.log.UserDataHelper;
 import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.DispatchType;
 import org.apache.tomcat.util.net.SocketStatus;
 import org.apache.tomcat.util.net.SocketWrapper;
 import org.apache.tomcat.util.res.StringManager;
@@ -828,6 +829,10 @@ public abstract class AbstractHttp11Proc
         } else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) {
             AtomicBoolean result = (AtomicBoolean) param;
             result.set(getInputBuffer().isFinished());
+        } else if (actionCode == ActionCode.DISPATCH_READ) {
+            socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ);
+        } else if (actionCode == ActionCode.DISPATCH_WRITE) {
+            socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE);
         } else {
             actionInternal(actionCode, param);
         }

Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java Thu Aug 
15 10:32:15 2013
@@ -554,7 +554,10 @@ public class InternalInputBuffer extends
 
     @Override
     protected int nbRead() throws IOException {
-        throw new IllegalStateException("This method is unused for BIO");
+        // If this gets called for BIO need to make caller think there is data
+        // to read as BIO always reads whether there is data or not (and blocks
+        // until there is data to read).
+        return 1;
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Aug 15 
10:32:15 2013
@@ -1718,16 +1718,30 @@ public class AprEndpoint extends Abstrac
                                         // application code. By signalling 
read/write is possible, a
                                         // read/write will be attempted, fail 
and that will trigger
                                         // an exception the application will 
see.
-                                        if ((desc[n*2] & Poll.APR_POLLIN) == 
Poll.APR_POLLIN ||
-                                                (wrapper.pollerFlags & 
Poll.APR_POLLIN) == Poll.APR_POLLIN) {
-                                            // Must be doing a non-blocking 
read
+                                        // Check the return flags first, 
followed by what the socket
+                                        // was registered for
+                                        if ((desc[n*2] & Poll.APR_POLLIN) == 
Poll.APR_POLLIN) {
+                                            // Error probably occurred during 
a non-blocking read
                                             if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_READ)) {
                                                 // Close socket and clear pool
                                                 destroySocket(desc[n*2+1]);
                                             }
-                                        } else if ((desc[n*2] & 
Poll.APR_POLLOUT) == Poll.APR_POLLOUT ||
-                                                (wrapper.pollerFlags & 
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
-                                            // Must be doing an non-blocking 
write write
+                                        } else if ((desc[n*2] & 
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+                                            // Error probably occurred during 
a non-blocking write
+                                            if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_WRITE)) {
+                                                // Close socket and clear pool
+                                                destroySocket(desc[n*2+1]);
+                                            }
+                                        } else if ((wrapper.pollerFlags & 
Poll.APR_POLLIN) == Poll.APR_POLLIN) {
+                                            // Can't tell what was happening 
when the error occurred but the
+                                            // socket is registered for 
non-blocking read so use that
+                                            if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_READ)) {
+                                                // Close socket and clear pool
+                                                destroySocket(desc[n*2+1]);
+                                            }
+                                        } else if ((wrapper.pollerFlags & 
Poll.APR_POLLOUT) == Poll.APR_POLLOUT) {
+                                            // Can't tell what was happening 
when the error occurred but the
+                                            // socket is registered for 
non-blocking write so use that
                                             if (!processSocket(desc[n*2+1], 
SocketStatus.OPEN_WRITE)) {
                                                 // Close socket and clear pool
                                                 destroySocket(desc[n*2+1]);

Added: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java?rev=1514228&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java Thu Aug 15 
10:32:15 2013
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+/**
+ * This enumeration lists the different types of dispatches that request
+ * processing can trigger. In this instance, dispatch means re-process this
+ * request using the given socket status.
+ */
+public enum DispatchType {
+
+    NON_BLOCKING_READ(SocketStatus.OPEN_READ),
+    NON_BLOCKING_WRITE(SocketStatus.OPEN_WRITE);
+
+    private final SocketStatus status;
+
+    private DispatchType(SocketStatus status) {
+        this.status = status;
+    }
+
+    public SocketStatus getSocketStatus() {
+        return status;
+    }
+}

Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Thu Aug 15 
10:32:15 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.tomcat.util.net;
 
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -60,6 +63,8 @@ public class SocketWrapper<E> {
     private final Object writeThreadLock = new Object();
     public Object getWriteThreadLock() { return writeThreadLock; }
 
+    private Set<DispatchType> dispatches = new LinkedHashSet<>();
+
     public SocketWrapper(E socket) {
         this.socket = socket;
         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -108,4 +113,19 @@ public class SocketWrapper<E> {
     public WriteLock getBlockingStatusWriteLock() {
         return blockingStatusWriteLock;
     }
+    public void addDispatch(DispatchType dispatchType) {
+        dispatches.add(dispatchType);
+    }
+    public boolean hasNextDispatch() {
+        return dispatches.size() > 0;
+    }
+    public DispatchType getNextDispatch() {
+        DispatchType result = null;
+        Iterator<DispatchType> iter = dispatches.iterator();
+        if (iter.hasNext()) {
+            result = iter.next();
+            iter.remove();
+        }
+        return result;
+    }
 }

Modified: 
tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1514228&r1=1514227&r2=1514228&view=diff
==============================================================================
--- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java 
(original)
+++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java 
Thu Aug 15 10:32:15 2013
@@ -251,14 +251,6 @@ public class TestNonBlockingAPI extends 
     public void testNonBlockingWriteError() throws Exception {
         Tomcat tomcat = getTomcatInstance();
 
-        // Not applicable to BIO. This test does not start a new thread for the
-        // write so with BIO all the writes happen in the service() method just
-        // like blocking IO.
-        if (tomcat.getConnector().getProtocolHandlerClassName().equals(
-                "org.apache.coyote.http11.Http11Protocol")) {
-            return;
-        }
-
         // Must have a real docBase - just use temp
         StandardContext ctx = (StandardContext) tomcat.addContext(
                 "", System.getProperty("java.io.tmpdir"));
@@ -416,10 +408,8 @@ public class TestNonBlockingAPI extends 
             });
             // step 2 - notify on read
             ServletInputStream in = req.getInputStream();
-            listener = new TestReadListener(actx);
+            listener = new TestReadListener(actx, false);
             in.setReadListener(listener);
-
-            listener.onDataAvailable();
         }
     }
 
@@ -462,13 +452,12 @@ public class TestNonBlockingAPI extends 
             });
             // step 2 - notify on read
             ServletInputStream in = req.getInputStream();
-            rlistener = new TestReadListener(actx);
+            rlistener = new TestReadListener(actx, true);
             in.setReadListener(rlistener);
             ServletOutputStream out = resp.getOutputStream();
             resp.setBufferSize(200 * 1024);
             wlistener = new TestWriteListener(actx);
             out.setWriteListener(wlistener);
-            wlistener.onWritePossible();
         }
 
 
@@ -476,9 +465,12 @@ public class TestNonBlockingAPI extends 
     private class TestReadListener implements ReadListener {
         private final AsyncContext ctx;
         private final StringBuilder body = new StringBuilder();
+        private final boolean usingNonBlockingWrite;
 
-        public TestReadListener(AsyncContext ctx) {
+        public TestReadListener(AsyncContext ctx,
+                boolean usingNonBlockingWrite) {
             this.ctx = ctx;
+            this.usingNonBlockingWrite = usingNonBlockingWrite;
         }
 
         @Override
@@ -501,18 +493,22 @@ public class TestNonBlockingAPI extends 
         @Override
         public void onAllDataRead() {
             log.info("onAllDataRead");
-            String msg;
-            if (body.toString().endsWith("FINISHED")) {
-                msg = "OK";
-            } else {
-                msg = "FAILED";
-            }
-            try {
-                ctx.getResponse().getOutputStream().print(msg);
-            } catch (IOException ioe) {
-                // Ignore
+            // If non-blocking writes are being used, don't write here as it
+            // will inject unexpected data into the write output.
+            if (!usingNonBlockingWrite) {
+                String msg;
+                if (body.toString().endsWith("FINISHED")) {
+                    msg = "OK";
+                } else {
+                    msg = "FAILED";
+                }
+                try {
+                    ctx.getResponse().getOutputStream().print(msg);
+                } catch (IOException ioe) {
+                    // Ignore
+                }
+                ctx.complete();
             }
-            ctx.complete();
         }
 
         @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to