Author: markt
Date: Fri Sep 25 20:29:30 2015
New Revision: 1705349

URL: http://svn.apache.org/viewvc?rev=1705349&view=rev
Log:
More work on servlet 3.1 non-blocking for HTTP/2. NumberWriter works.

Modified:
    tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
    tomcat/trunk/java/org/apache/coyote/ActionCode.java
    tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
    tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
    tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties
    tomcat/trunk/java/org/apache/coyote/http2/Stream.java
    tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java

Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Fri Sep 25 
20:29:30 2015
@@ -78,7 +78,8 @@ public abstract class AbstractProcessor
     }
 
 
-    private AbstractProcessor(AbstractEndpoint<?> endpoint, Request 
coyoteRequest, Response coyoteResponse) {
+    private AbstractProcessor(AbstractEndpoint<?> endpoint, Request 
coyoteRequest,
+            Response coyoteResponse) {
         this.endpoint = endpoint;
         asyncStateMachine = new AsyncStateMachine(this);
         request = coyoteRequest;

Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original)
+++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Fri Sep 25 20:29:30 2015
@@ -195,13 +195,15 @@ public enum ActionCode {
 
     /**
      * Indicator that Servlet is interested in being
-     * notified when data is available to be read
+     * notified when data is available to be read.
      */
     NB_READ_INTEREST,
 
     /**
-     *Indicator that the Servlet is interested
-     *in being notified when it can write data
+     * Used with non-blocking writes to determine if a write is currently
+     * allowed (sets passed parameter to <code>true</code>) or not (sets passed
+     * parameter to <code>false</code>). If a write is not allowed then 
callback
+     * will be triggered at some future point when write becomes possible 
again.
      */
     NB_WRITE_INTEREST,
 

Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Fri Sep 25 
20:29:30 2015
@@ -147,4 +147,6 @@ abstract class AbstractStream {
     protected abstract String getConnectionId();
 
     protected abstract int getWeight();
+
+    protected abstract void doNotifyAll();
 }

Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Fri Sep 
25 20:29:30 2015
@@ -638,7 +638,7 @@ public class Http2UpgradeHandler extends
                 if (allocation > 0) {
                     backLogSize -= allocation;
                     synchronized (entry.getKey()) {
-                        entry.getKey().notifyAll();
+                        entry.getKey().doNotifyAll();
                     }
                 }
             }
@@ -646,6 +646,13 @@ public class Http2UpgradeHandler extends
     }
 
 
+
+    @Override
+    protected synchronized void doNotifyAll() {
+        this.notifyAll();
+    }
+
+
     private int allocate(AbstractStream stream, int allocation) {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("upgradeHandler.allocate.debug", 
getConnectionId(),

Modified: tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties Fri Sep 
25 20:29:30 2015
@@ -72,7 +72,10 @@ stream.write=Connection [{0}], Stream [{
 
 stream.outputBuffer.flush.debug=Connection [{0}], Stream [{1}], flushing 
output with buffer at position [{2}], writeInProgress [{3}] and closed [{4}]
 
+streamProcessor.dispatch=Connection [{0}], Stream [{1}], status [{2}]
 streamProcessor.httpupgrade.notsupported=HTTP upgrade is not supported within 
HTTP/2 streams
+streamProcessor.process.loopend=Connection [{0}], Stream [{1}], loop end, 
state [{2}], dispatches [{3}]
+streamProcessor.process.loopstart=Connection [{0}], Stream [{1}], loop start, 
status [{2}], dispatches [{3}]
 streamProcessor.ssl.error=Unable to retrieve SSL request attributes
 
 streamStateMachine.debug.change=Connection [{0}], Stream [{1}], State changed 
from [{2}] to [{3}]

Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Fri Sep 25 20:29:30 
2015
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 
+import org.apache.coyote.ActionCode;
 import org.apache.coyote.InputBuffer;
 import org.apache.coyote.OutputBuffer;
 import org.apache.coyote.Request;
@@ -134,11 +135,15 @@ public class Stream extends AbstractStre
     }
 
 
-    private synchronized int reserveWindowSize(int reservation) throws 
IOException {
+    private synchronized int reserveWindowSize(int reservation, boolean block) 
throws IOException {
         long windowSize = getWindowSize();
         while (windowSize < 1) {
             try {
-                wait();
+                if (block) {
+                    wait();
+                } else {
+                    return 0;
+                }
             } catch (InterruptedException e) {
                 // Possible shutdown / rst or similar. Use an IOException to
                 // signal to the client that further I/O isn't possible for 
this
@@ -159,6 +164,20 @@ public class Stream extends AbstractStre
 
 
     @Override
+    protected synchronized void doNotifyAll() {
+        if (coyoteResponse.getWriteListener() == null) {
+            // Blocking IO so thread will be waiting. Release it.
+            // Use notifyAll() to be safe (should be unnecessary)
+            this.notifyAll();
+        } else {
+            if (outputBuffer.isRegisteredForWrite()) {
+                coyoteResponse.action(ActionCode.DISPATCH_WRITE, null);
+            }
+        }
+    }
+
+
+    @Override
     public void emitHeader(String name, String value, boolean neverIndex) {
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("stream.header.debug", getConnectionId(), 
getIdentifier(),
@@ -226,7 +245,7 @@ public class Stream extends AbstractStre
         if (log.isDebugEnabled()) {
             log.debug(sm.getString("stream.write", getConnectionId(), 
getIdentifier()));
         }
-        outputBuffer.flush();
+        outputBuffer.flush(true);
     }
 
 
@@ -308,6 +327,7 @@ public class Stream extends AbstractStre
         private volatile long written = 0;
         private volatile boolean closed = false;
         private volatile boolean endOfStreamSent = false;
+        private volatile boolean writeInterest = false;
 
         /* The write methods are synchronized to ensure that only one thread at
          * a time is able to access the buffer. Without this protection, a
@@ -330,22 +350,25 @@ public class Stream extends AbstractStre
                 if (len > 0 && !buffer.hasRemaining()) {
                     // Only flush if we have more data to write and the buffer
                     // is full
-                    flush(true);
+                    if (flush(true, coyoteResponse.getWriteListener() == 
null)) {
+                        break;
+                    }
                 }
             }
             written += offset;
             return offset;
         }
 
-        public synchronized void flush() throws IOException {
-            flush(false);
+        public synchronized boolean flush(boolean block) throws IOException {
+            return flush(false, block);
         }
 
-        private synchronized void flush(boolean writeInProgress) throws 
IOException {
+        private synchronized boolean flush(boolean writeInProgress, boolean 
block)
+                throws IOException {
             if (log.isDebugEnabled()) {
-                log.debug(sm.getString("stream.outputBuffer.flush.debug", 
getConnectionId(), getIdentifier(),
-                        Integer.toString(buffer.position()), 
Boolean.toString(writeInProgress),
-                        Boolean.toString(closed)));
+                log.debug(sm.getString("stream.outputBuffer.flush.debug", 
getConnectionId(),
+                        getIdentifier(), Integer.toString(buffer.position()),
+                        Boolean.toString(writeInProgress), 
Boolean.toString(closed)));
             }
             if (!coyoteResponse.isCommitted()) {
                 coyoteResponse.sendHeaders();
@@ -357,12 +380,17 @@ public class Stream extends AbstractStre
                     handler.writeBody(Stream.this, buffer, 0, true);
                 }
                 // Buffer is empty. Nothing to do.
-                return;
+                return false;
             }
             buffer.flip();
             int left = buffer.remaining();
             while (left > 0) {
-                int streamReservation  = reserveWindowSize(left);
+                int streamReservation  = reserveWindowSize(left, block);
+                if (streamReservation == 0) {
+                    // Must be non-blocking
+                    buffer.compact();
+                    return true;
+                }
                 while (streamReservation > 0) {
                     int connectionReservation =
                                 handler.reserveWindowSize(Stream.this, 
streamReservation);
@@ -375,6 +403,25 @@ public class Stream extends AbstractStre
                 }
             }
             buffer.clear();
+            return false;
+        }
+
+        synchronized boolean isReady() {
+            if (getWindowSize() > 0 && handler.getWindowSize() > 0) {
+                return true;
+            } else {
+                writeInterest = true;
+                return false;
+            }
+        }
+
+        synchronized boolean isRegisteredForWrite() {
+            if (writeInterest) {
+                writeInterest = false;
+                return true;
+            } else {
+                return false;
+            }
         }
 
         @Override

Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1705349&r1=1705348&r2=1705349&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original)
+++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Fri Sep 25 
20:29:30 2015
@@ -17,20 +17,28 @@
 package org.apache.coyote.http2;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.servlet.RequestDispatcher;
 import javax.servlet.http.HttpUpgradeHandler;
 
 import org.apache.coyote.AbstractProcessor;
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.Adapter;
 import org.apache.coyote.AsyncContextCallback;
-import org.apache.coyote.AsyncStateMachine;
 import org.apache.coyote.ContainerThreadMarker;
+import org.apache.coyote.ErrorState;
+import org.apache.coyote.RequestInfo;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.ExceptionUtils;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
+import org.apache.tomcat.util.net.DispatchType;
 import org.apache.tomcat.util.net.SSLSupport;
 import org.apache.tomcat.util.net.SocketStatus;
 import org.apache.tomcat.util.net.SocketWrapperBase;
@@ -42,7 +50,7 @@ public class StreamProcessor extends Abs
     private static final StringManager sm = 
StringManager.getManager(StreamProcessor.class);
 
     private final Stream stream;
-    private final AsyncStateMachine asyncStateMachine;
+    private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>();
 
     private volatile SSLSupport sslSupport;
 
@@ -50,36 +58,61 @@ public class StreamProcessor extends Abs
     public StreamProcessor(Stream stream, Adapter adapter, 
SocketWrapperBase<?> socketWrapper) {
         super(stream.getCoyoteRequest(), stream.getCoyoteResponse());
         this.stream = stream;
-        asyncStateMachine = new AsyncStateMachine(this);
         setAdapter(adapter);
         setSocketWrapper(socketWrapper);
     }
 
 
     @Override
-    public void run() {
-        // HTTP/2 equivalent of AbstractConnectionHandler#process()
+    public synchronized void run() {
+        process(SocketStatus.OPEN_READ);
+    }
+
+
+    private synchronized void process(SocketStatus status) {
+        // HTTP/2 equivalent of AbstractConnectionHandler#process() without the
+        // socket <-> processor mapping
         ContainerThreadMarker.set();
         SocketState state = SocketState.CLOSED;
         try {
+            Iterator<DispatchType> dispatches = 
getIteratorAndClearDispatches();
             do {
-                if (asyncStateMachine.isAsync()) {
-                    adapter.asyncDispatch(request, response, 
SocketStatus.OPEN_READ);
+                if (log.isDebugEnabled()) {
+                    log.debug(sm.getString("streamProcessor.process.loopstart",
+                            stream.getConnectionId(), stream.getIdentifier(), 
status, dispatches));
+                }
+                // TODO CLOSE_NOW ?
+                if (dispatches != null) {
+                    DispatchType nextDispatch = dispatches.next();
+                    state = dispatch(nextDispatch.getSocketStatus());
+                // TODO DISCONNECT ?
+                } else if (isAsync()) {
+                    state = dispatch(status);
                 } else if (state == SocketState.ASYNC_END) {
-                    adapter.asyncDispatch(request, response, 
SocketStatus.OPEN_READ);
-                    // Only ever one request per stream so always treat as
-                    // closed at this point.
-                    state = SocketState.CLOSED;
+                    state = dispatch(status);
                 } else {
-                    adapter.service(request, response);
+                    state = process((SocketWrapperBase<?>) null);
                 }
 
-                if (asyncStateMachine.isAsync()) {
+                if (state != SocketState.CLOSED && isAsync()) {
                     state = asyncStateMachine.asyncPostProcess();
-                } else {
-                    response.action(ActionCode.CLOSE, null);
                 }
-            } while (state == SocketState.ASYNC_END);
+
+                if (dispatches == null || !dispatches.hasNext()) {
+                    // Only returns non-null iterator if there are
+                    // dispatches to process.
+                    dispatches = getIteratorAndClearDispatches();
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug(sm.getString("streamProcessor.process.loopend",
+                            stream.getConnectionId(), stream.getIdentifier(), 
state, dispatches));
+                }
+            } while (state == SocketState.ASYNC_END ||
+                    dispatches != null && state != SocketState.CLOSED);
+
+            if (state == SocketState.CLOSED) {
+                // TODO
+            }
         } catch (Exception e) {
             // TODO
             e.printStackTrace();
@@ -265,7 +298,24 @@ public class StreamProcessor extends Abs
             result.set(stream.isInputFinished());
             break;
         }
-
+        case NB_WRITE_INTEREST: {
+            // TODO: Thread safe? Do this in output buffer?
+            AtomicBoolean result = (AtomicBoolean) param;
+            result.set(stream.getOutputBuffer().isReady());
+            break;
+        }
+        case DISPATCH_READ: {
+            dispatches.add(DispatchType.NON_BLOCKING_READ);
+            break;
+        }
+        case DISPATCH_WRITE: {
+            dispatches.add(DispatchType.NON_BLOCKING_WRITE);
+            break;
+        }
+        case DISPATCH_EXECUTE: {
+            socketWrapper.getEndpoint().getExecutor().execute(this);
+            break;
+        }
 
         // Unsupported / illegal under HTTP/2
         case UPGRADE:
@@ -277,12 +327,8 @@ public class StreamProcessor extends Abs
         case AVAILABLE:
         case CLOSE_NOW:
         case DISABLE_SWALLOW_INPUT:
-        case DISPATCH_EXECUTE:
-        case DISPATCH_READ:
-        case DISPATCH_WRITE:
         case END_REQUEST:
         case NB_READ_INTEREST:
-        case NB_WRITE_INTEREST:
         case REQ_SET_BODY_REPLAY:
         case RESET:
             log.info("TODO: Implement [" + actionCode + "] for HTTP/2");
@@ -328,15 +374,126 @@ public class StreamProcessor extends Abs
 
     @Override
     public SocketState process(SocketWrapperBase<?> socket) throws IOException 
{
-        // Should never happen
-        throw new 
IllegalStateException(sm.getString("streamProcessor.httpupgrade.notsupported"));
+        try {
+            adapter.service(request, response);
+        } catch (Exception e) {
+            setErrorState(ErrorState.CLOSE_NOW, e);
+        }
+
+        if (getErrorState().isError()) {
+            action(ActionCode.CLOSE, null);
+            request.updateCounters();
+            return SocketState.CLOSED;
+        } else if (isAsync()) {
+            return SocketState.LONG;
+        } else {
+            action(ActionCode.CLOSE, null);
+            request.updateCounters();
+            return SocketState.CLOSED;
+        }
     }
 
 
     @Override
     public SocketState dispatch(SocketStatus status) {
-        // Should never happen
-        throw new 
IllegalStateException(sm.getString("streamProcessor.httpupgrade.notsupported"));
+        if (log.isDebugEnabled()) {
+            log.debug(sm.getString("streamProcessor.dispatch", 
stream.getConnectionId(),
+                    stream.getIdentifier(), status));
+        }
+        if (status == SocketStatus.OPEN_WRITE && response.getWriteListener() 
!= null) {
+            try {
+                asyncStateMachine.asyncOperation();
+
+                if (stream.getOutputBuffer().flush(false)) {
+                    // The buffer wasn't fully flushed so re-register the
+                    // stream for write. Note this does not go via the
+                    // Response since the write registration state at
+                    // that level should remain unchanged. Once the buffer
+                    // has been emptied then the code below will call
+                    // dispatch() which will enable the
+                    // Response to respond to this event.
+                    if (stream.getOutputBuffer().isReady()) {
+                        // Unexpected
+                        throw new IllegalStateException();
+                    }
+                    return SocketState.LONG;
+                }
+            } catch (IOException | IllegalStateException x) {
+                // IOE - Problem writing to socket
+                // ISE - Request/Response not in correct state for async write
+                if (log.isDebugEnabled()) {
+                    log.debug("Unable to write async data.",x);
+                }
+                status = SocketStatus.ASYNC_WRITE_ERROR;
+                request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x);
+            }
+        } else if (status == SocketStatus.OPEN_READ && 
request.getReadListener() != null) {
+            try {
+                asyncStateMachine.asyncOperation();
+            } catch (IllegalStateException x) {
+                // ISE - Request/Response not in correct state for async read
+                if (log.isDebugEnabled()) {
+                    log.debug("Unable to read async data.",x);
+                }
+                status = SocketStatus.ASYNC_READ_ERROR;
+                request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x);
+            }
+        }
+
+        RequestInfo rp = request.getRequestProcessor();
+        try {
+            rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
+            if (!getAdapter().asyncDispatch(request, response, status)) {
+                setErrorState(ErrorState.CLOSE_NOW, null);
+            }
+        } catch (InterruptedIOException e) {
+            setErrorState(ErrorState.CLOSE_NOW, e);
+        } catch (Throwable t) {
+            ExceptionUtils.handleThrowable(t);
+            setErrorState(ErrorState.CLOSE_NOW, t);
+            log.error(sm.getString("http11processor.request.process"), t);
+        }
+
+        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
+
+        if (getErrorState().isError()) {
+            request.updateCounters();
+            return SocketState.CLOSED;
+        } else if (isAsync()) {
+            return SocketState.LONG;
+        } else {
+            request.updateCounters();
+            return SocketState.CLOSED;
+        }
+    }
+
+
+    public void addDispatch(DispatchType dispatchType) {
+        synchronized (dispatches) {
+            dispatches.add(dispatchType);
+        }
+    }
+    public Iterator<DispatchType> getIteratorAndClearDispatches() {
+        // Note: Logic in AbstractProtocol depends on this method only 
returning
+        // a non-null value if the iterator is non-empty. i.e. it should never
+        // return an empty iterator.
+        Iterator<DispatchType> result;
+        synchronized (dispatches) {
+            // Synchronized as the generation of the iterator and the clearing
+            // of dispatches needs to be an atomic operation.
+            result = dispatches.iterator();
+            if (result.hasNext()) {
+                dispatches.clear();
+            } else {
+                result = null;
+            }
+        }
+        return result;
+    }
+    public void clearDispatches() {
+        synchronized (dispatches) {
+            dispatches.clear();
+        }
     }
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to