Author: markt Date: Fri Sep 25 20:29:30 2015 New Revision: 1705349 URL: http://svn.apache.org/viewvc?rev=1705349&view=rev Log: More work on servlet 3.1 non-blocking for HTTP/2. NumberWriter works.
Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java tomcat/trunk/java/org/apache/coyote/ActionCode.java tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties tomcat/trunk/java/org/apache/coyote/http2/Stream.java tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Modified: tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java?rev=1705349&r1=1705348&r2=1705349&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProcessor.java Fri Sep 25 20:29:30 2015 @@ -78,7 +78,8 @@ public abstract class AbstractProcessor } - private AbstractProcessor(AbstractEndpoint<?> endpoint, Request coyoteRequest, Response coyoteResponse) { + private AbstractProcessor(AbstractEndpoint<?> endpoint, Request coyoteRequest, + Response coyoteResponse) { this.endpoint = endpoint; asyncStateMachine = new AsyncStateMachine(this); request = coyoteRequest; Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1705349&r1=1705348&r2=1705349&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original) +++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Fri Sep 25 20:29:30 2015 @@ -195,13 +195,15 @@ public enum ActionCode { /** * Indicator that Servlet is interested in being - * notified when data is available to be read + * notified when data is available to be read. */ NB_READ_INTEREST, /** - *Indicator that the Servlet is interested - *in being notified when it can write data + * Used with non-blocking writes to determine if a write is currently + * allowed (sets passed parameter to <code>true</code>) or not (sets passed + * parameter to <code>false</code>). If a write is not allowed then callback + * will be triggered at some future point when write becomes possible again. */ NB_WRITE_INTEREST, Modified: tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java?rev=1705349&r1=1705348&r2=1705349&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/AbstractStream.java Fri Sep 25 20:29:30 2015 @@ -147,4 +147,6 @@ abstract class AbstractStream { protected abstract String getConnectionId(); protected abstract int getWeight(); + + protected abstract void doNotifyAll(); } Modified: tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java?rev=1705349&r1=1705348&r2=1705349&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Http2UpgradeHandler.java Fri Sep 25 20:29:30 2015 @@ -638,7 +638,7 @@ public class Http2UpgradeHandler extends if (allocation > 0) { backLogSize -= allocation; synchronized (entry.getKey()) { - entry.getKey().notifyAll(); + entry.getKey().doNotifyAll(); } } } @@ -646,6 +646,13 @@ public class Http2UpgradeHandler extends } + + @Override + protected synchronized void doNotifyAll() { + this.notifyAll(); + } + + private int allocate(AbstractStream stream, int allocation) { if (log.isDebugEnabled()) { log.debug(sm.getString("upgradeHandler.allocate.debug", getConnectionId(), Modified: tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties?rev=1705349&r1=1705348&r2=1705349&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties (original) +++ tomcat/trunk/java/org/apache/coyote/http2/LocalStrings.properties Fri Sep 25 20:29:30 2015 @@ -72,7 +72,10 @@ stream.write=Connection [{0}], Stream [{ stream.outputBuffer.flush.debug=Connection [{0}], Stream [{1}], flushing output with buffer at position [{2}], writeInProgress [{3}] and closed [{4}] +streamProcessor.dispatch=Connection [{0}], Stream [{1}], status [{2}] streamProcessor.httpupgrade.notsupported=HTTP upgrade is not supported within HTTP/2 streams +streamProcessor.process.loopend=Connection [{0}], Stream [{1}], loop end, state [{2}], dispatches [{3}] +streamProcessor.process.loopstart=Connection [{0}], Stream [{1}], loop start, status [{2}], dispatches [{3}] streamProcessor.ssl.error=Unable to retrieve SSL request attributes streamStateMachine.debug.change=Connection [{0}], Stream [{1}], State changed from [{2}] to [{3}] Modified: tomcat/trunk/java/org/apache/coyote/http2/Stream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/Stream.java?rev=1705349&r1=1705348&r2=1705349&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/Stream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/Stream.java Fri Sep 25 20:29:30 2015 @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import org.apache.coyote.ActionCode; import org.apache.coyote.InputBuffer; import org.apache.coyote.OutputBuffer; import org.apache.coyote.Request; @@ -134,11 +135,15 @@ public class Stream extends AbstractStre } - private synchronized int reserveWindowSize(int reservation) throws IOException { + private synchronized int reserveWindowSize(int reservation, boolean block) throws IOException { long windowSize = getWindowSize(); while (windowSize < 1) { try { - wait(); + if (block) { + wait(); + } else { + return 0; + } } catch (InterruptedException e) { // Possible shutdown / rst or similar. Use an IOException to // signal to the client that further I/O isn't possible for this @@ -159,6 +164,20 @@ public class Stream extends AbstractStre @Override + protected synchronized void doNotifyAll() { + if (coyoteResponse.getWriteListener() == null) { + // Blocking IO so thread will be waiting. Release it. + // Use notifyAll() to be safe (should be unnecessary) + this.notifyAll(); + } else { + if (outputBuffer.isRegisteredForWrite()) { + coyoteResponse.action(ActionCode.DISPATCH_WRITE, null); + } + } + } + + + @Override public void emitHeader(String name, String value, boolean neverIndex) { if (log.isDebugEnabled()) { log.debug(sm.getString("stream.header.debug", getConnectionId(), getIdentifier(), @@ -226,7 +245,7 @@ public class Stream extends AbstractStre if (log.isDebugEnabled()) { log.debug(sm.getString("stream.write", getConnectionId(), getIdentifier())); } - outputBuffer.flush(); + outputBuffer.flush(true); } @@ -308,6 +327,7 @@ public class Stream extends AbstractStre private volatile long written = 0; private volatile boolean closed = false; private volatile boolean endOfStreamSent = false; + private volatile boolean writeInterest = false; /* The write methods are synchronized to ensure that only one thread at * a time is able to access the buffer. Without this protection, a @@ -330,22 +350,25 @@ public class Stream extends AbstractStre if (len > 0 && !buffer.hasRemaining()) { // Only flush if we have more data to write and the buffer // is full - flush(true); + if (flush(true, coyoteResponse.getWriteListener() == null)) { + break; + } } } written += offset; return offset; } - public synchronized void flush() throws IOException { - flush(false); + public synchronized boolean flush(boolean block) throws IOException { + return flush(false, block); } - private synchronized void flush(boolean writeInProgress) throws IOException { + private synchronized boolean flush(boolean writeInProgress, boolean block) + throws IOException { if (log.isDebugEnabled()) { - log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), getIdentifier(), - Integer.toString(buffer.position()), Boolean.toString(writeInProgress), - Boolean.toString(closed))); + log.debug(sm.getString("stream.outputBuffer.flush.debug", getConnectionId(), + getIdentifier(), Integer.toString(buffer.position()), + Boolean.toString(writeInProgress), Boolean.toString(closed))); } if (!coyoteResponse.isCommitted()) { coyoteResponse.sendHeaders(); @@ -357,12 +380,17 @@ public class Stream extends AbstractStre handler.writeBody(Stream.this, buffer, 0, true); } // Buffer is empty. Nothing to do. - return; + return false; } buffer.flip(); int left = buffer.remaining(); while (left > 0) { - int streamReservation = reserveWindowSize(left); + int streamReservation = reserveWindowSize(left, block); + if (streamReservation == 0) { + // Must be non-blocking + buffer.compact(); + return true; + } while (streamReservation > 0) { int connectionReservation = handler.reserveWindowSize(Stream.this, streamReservation); @@ -375,6 +403,25 @@ public class Stream extends AbstractStre } } buffer.clear(); + return false; + } + + synchronized boolean isReady() { + if (getWindowSize() > 0 && handler.getWindowSize() > 0) { + return true; + } else { + writeInterest = true; + return false; + } + } + + synchronized boolean isRegisteredForWrite() { + if (writeInterest) { + writeInterest = false; + return true; + } else { + return false; + } } @Override Modified: tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java?rev=1705349&r1=1705348&r2=1705349&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http2/StreamProcessor.java Fri Sep 25 20:29:30 2015 @@ -17,20 +17,28 @@ package org.apache.coyote.http2; import java.io.IOException; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; +import javax.servlet.RequestDispatcher; import javax.servlet.http.HttpUpgradeHandler; import org.apache.coyote.AbstractProcessor; import org.apache.coyote.ActionCode; import org.apache.coyote.Adapter; import org.apache.coyote.AsyncContextCallback; -import org.apache.coyote.AsyncStateMachine; import org.apache.coyote.ContainerThreadMarker; +import org.apache.coyote.ErrorState; +import org.apache.coyote.RequestInfo; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.DispatchType; import org.apache.tomcat.util.net.SSLSupport; import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapperBase; @@ -42,7 +50,7 @@ public class StreamProcessor extends Abs private static final StringManager sm = StringManager.getManager(StreamProcessor.class); private final Stream stream; - private final AsyncStateMachine asyncStateMachine; + private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>(); private volatile SSLSupport sslSupport; @@ -50,36 +58,61 @@ public class StreamProcessor extends Abs public StreamProcessor(Stream stream, Adapter adapter, SocketWrapperBase<?> socketWrapper) { super(stream.getCoyoteRequest(), stream.getCoyoteResponse()); this.stream = stream; - asyncStateMachine = new AsyncStateMachine(this); setAdapter(adapter); setSocketWrapper(socketWrapper); } @Override - public void run() { - // HTTP/2 equivalent of AbstractConnectionHandler#process() + public synchronized void run() { + process(SocketStatus.OPEN_READ); + } + + + private synchronized void process(SocketStatus status) { + // HTTP/2 equivalent of AbstractConnectionHandler#process() without the + // socket <-> processor mapping ContainerThreadMarker.set(); SocketState state = SocketState.CLOSED; try { + Iterator<DispatchType> dispatches = getIteratorAndClearDispatches(); do { - if (asyncStateMachine.isAsync()) { - adapter.asyncDispatch(request, response, SocketStatus.OPEN_READ); + if (log.isDebugEnabled()) { + log.debug(sm.getString("streamProcessor.process.loopstart", + stream.getConnectionId(), stream.getIdentifier(), status, dispatches)); + } + // TODO CLOSE_NOW ? + if (dispatches != null) { + DispatchType nextDispatch = dispatches.next(); + state = dispatch(nextDispatch.getSocketStatus()); + // TODO DISCONNECT ? + } else if (isAsync()) { + state = dispatch(status); } else if (state == SocketState.ASYNC_END) { - adapter.asyncDispatch(request, response, SocketStatus.OPEN_READ); - // Only ever one request per stream so always treat as - // closed at this point. - state = SocketState.CLOSED; + state = dispatch(status); } else { - adapter.service(request, response); + state = process((SocketWrapperBase<?>) null); } - if (asyncStateMachine.isAsync()) { + if (state != SocketState.CLOSED && isAsync()) { state = asyncStateMachine.asyncPostProcess(); - } else { - response.action(ActionCode.CLOSE, null); } - } while (state == SocketState.ASYNC_END); + + if (dispatches == null || !dispatches.hasNext()) { + // Only returns non-null iterator if there are + // dispatches to process. + dispatches = getIteratorAndClearDispatches(); + } + if (log.isDebugEnabled()) { + log.debug(sm.getString("streamProcessor.process.loopend", + stream.getConnectionId(), stream.getIdentifier(), state, dispatches)); + } + } while (state == SocketState.ASYNC_END || + dispatches != null && state != SocketState.CLOSED); + + if (state == SocketState.CLOSED) { + // TODO + } } catch (Exception e) { // TODO e.printStackTrace(); @@ -265,7 +298,24 @@ public class StreamProcessor extends Abs result.set(stream.isInputFinished()); break; } - + case NB_WRITE_INTEREST: { + // TODO: Thread safe? Do this in output buffer? + AtomicBoolean result = (AtomicBoolean) param; + result.set(stream.getOutputBuffer().isReady()); + break; + } + case DISPATCH_READ: { + dispatches.add(DispatchType.NON_BLOCKING_READ); + break; + } + case DISPATCH_WRITE: { + dispatches.add(DispatchType.NON_BLOCKING_WRITE); + break; + } + case DISPATCH_EXECUTE: { + socketWrapper.getEndpoint().getExecutor().execute(this); + break; + } // Unsupported / illegal under HTTP/2 case UPGRADE: @@ -277,12 +327,8 @@ public class StreamProcessor extends Abs case AVAILABLE: case CLOSE_NOW: case DISABLE_SWALLOW_INPUT: - case DISPATCH_EXECUTE: - case DISPATCH_READ: - case DISPATCH_WRITE: case END_REQUEST: case NB_READ_INTEREST: - case NB_WRITE_INTEREST: case REQ_SET_BODY_REPLAY: case RESET: log.info("TODO: Implement [" + actionCode + "] for HTTP/2"); @@ -328,15 +374,126 @@ public class StreamProcessor extends Abs @Override public SocketState process(SocketWrapperBase<?> socket) throws IOException { - // Should never happen - throw new IllegalStateException(sm.getString("streamProcessor.httpupgrade.notsupported")); + try { + adapter.service(request, response); + } catch (Exception e) { + setErrorState(ErrorState.CLOSE_NOW, e); + } + + if (getErrorState().isError()) { + action(ActionCode.CLOSE, null); + request.updateCounters(); + return SocketState.CLOSED; + } else if (isAsync()) { + return SocketState.LONG; + } else { + action(ActionCode.CLOSE, null); + request.updateCounters(); + return SocketState.CLOSED; + } } @Override public SocketState dispatch(SocketStatus status) { - // Should never happen - throw new IllegalStateException(sm.getString("streamProcessor.httpupgrade.notsupported")); + if (log.isDebugEnabled()) { + log.debug(sm.getString("streamProcessor.dispatch", stream.getConnectionId(), + stream.getIdentifier(), status)); + } + if (status == SocketStatus.OPEN_WRITE && response.getWriteListener() != null) { + try { + asyncStateMachine.asyncOperation(); + + if (stream.getOutputBuffer().flush(false)) { + // The buffer wasn't fully flushed so re-register the + // stream for write. Note this does not go via the + // Response since the write registration state at + // that level should remain unchanged. Once the buffer + // has been emptied then the code below will call + // dispatch() which will enable the + // Response to respond to this event. + if (stream.getOutputBuffer().isReady()) { + // Unexpected + throw new IllegalStateException(); + } + return SocketState.LONG; + } + } catch (IOException | IllegalStateException x) { + // IOE - Problem writing to socket + // ISE - Request/Response not in correct state for async write + if (log.isDebugEnabled()) { + log.debug("Unable to write async data.",x); + } + status = SocketStatus.ASYNC_WRITE_ERROR; + request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x); + } + } else if (status == SocketStatus.OPEN_READ && request.getReadListener() != null) { + try { + asyncStateMachine.asyncOperation(); + } catch (IllegalStateException x) { + // ISE - Request/Response not in correct state for async read + if (log.isDebugEnabled()) { + log.debug("Unable to read async data.",x); + } + status = SocketStatus.ASYNC_READ_ERROR; + request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, x); + } + } + + RequestInfo rp = request.getRequestProcessor(); + try { + rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); + if (!getAdapter().asyncDispatch(request, response, status)) { + setErrorState(ErrorState.CLOSE_NOW, null); + } + } catch (InterruptedIOException e) { + setErrorState(ErrorState.CLOSE_NOW, e); + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + setErrorState(ErrorState.CLOSE_NOW, t); + log.error(sm.getString("http11processor.request.process"), t); + } + + rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); + + if (getErrorState().isError()) { + request.updateCounters(); + return SocketState.CLOSED; + } else if (isAsync()) { + return SocketState.LONG; + } else { + request.updateCounters(); + return SocketState.CLOSED; + } + } + + + public void addDispatch(DispatchType dispatchType) { + synchronized (dispatches) { + dispatches.add(dispatchType); + } + } + public Iterator<DispatchType> getIteratorAndClearDispatches() { + // Note: Logic in AbstractProtocol depends on this method only returning + // a non-null value if the iterator is non-empty. i.e. it should never + // return an empty iterator. + Iterator<DispatchType> result; + synchronized (dispatches) { + // Synchronized as the generation of the iterator and the clearing + // of dispatches needs to be an atomic operation. + result = dispatches.iterator(); + if (result.hasNext()) { + dispatches.clear(); + } else { + result = null; + } + } + return result; + } + public void clearDispatches() { + synchronized (dispatches) { + dispatches.clear(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org