Author: markt Date: Thu Aug 15 10:32:15 2013 New Revision: 1514228 URL: http://svn.apache.org/r1514228 Log: The container is responsible for the first call to each of onWritePossible() and onDataAvailable() once a listener has been set. Main component is the addition to the SocketWrapper of a list of dispatch types that need to be made. "Dispatch type" in this case meaning "process the socket using the specified SocketStatus". This is used to register trigger the first call to each of onWritePossible() and onDataAvailable() for which the container is responsible. Fix some additional issues identified in the test case.
Added: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java (with props) Modified: tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java tomcat/trunk/java/org/apache/coyote/ActionCode.java tomcat/trunk/java/org/apache/coyote/Response.java tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Modified: tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java (original) +++ tomcat/trunk/java/org/apache/catalina/connector/InputBuffer.java Thu Aug 15 10:32:15 2013 @@ -249,6 +249,18 @@ public class InputBuffer extends Reader public void setReadListener(ReadListener listener) { coyoteRequest.setReadListener(listener); + + // The container is responsible for the first call to + // listener.onDataAvailable(). If isReady() returns true, the container + // needs to call listener.onDataAvailable() from a new thread. If + // isReady() returns false, the socket will be registered for read and + // the container will call listener.onDataAvailable() once data arrives. + // Must call isFinished() first as a call to isReady() if the request + // has been finished will register the socket for read interest and that + // is not required. + if (isFinished() || isReady()) { + coyoteRequest.action(ActionCode.DISPATCH_READ, null); + } } Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProtocol.java Thu Aug 15 10:32:15 2013 @@ -38,6 +38,7 @@ import org.apache.tomcat.util.collection import org.apache.tomcat.util.modeler.Registry; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.AbstractEndpoint.Handler; +import org.apache.tomcat.util.net.DispatchType; import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapper; import org.apache.tomcat.util.res.StringManager; @@ -616,7 +617,11 @@ public abstract class AbstractProtocol i SocketState state = SocketState.CLOSED; do { - if (status == SocketStatus.DISCONNECT && + if (wrapper.hasNextDispatch()) { + DispatchType nextDispatch = wrapper.getNextDispatch(); + state = processor.asyncDispatch( + nextDispatch.getSocketStatus()); + } else if (status == SocketStatus.DISCONNECT && !processor.isComet()) { // Do nothing here, just wait for it to get recycled // Don't do this for Comet we need to generate an end @@ -663,7 +668,8 @@ public abstract class AbstractProtocol i "], State out: [" + state + "]"); } } while (state == SocketState.ASYNC_END || - state == SocketState.UPGRADING); + state == SocketState.UPGRADING || + wrapper.hasNextDispatch()); if (state == SocketState.LONG) { // In the middle of processing a request/response. Keep the Modified: tomcat/trunk/java/org/apache/coyote/ActionCode.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ActionCode.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ActionCode.java (original) +++ tomcat/trunk/java/org/apache/coyote/ActionCode.java Thu Aug 15 10:32:15 2013 @@ -215,5 +215,17 @@ public enum ActionCode { /** * Indicates if the request body has been fully read. */ - REQUEST_BODY_FULLY_READ + REQUEST_BODY_FULLY_READ, + + /** + * Indicates that the container needs to trigger a call to onDataAvailable() + * for the registered non-blocking read listener. + */ + DISPATCH_READ, + + /** + * Indicates that the container needs to trigger a call to onWritePossible() + * for the registered non-blocking write listener. + */ + DISPATCH_WRITE } Modified: tomcat/trunk/java/org/apache/coyote/Response.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/Response.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/Response.java (original) +++ tomcat/trunk/java/org/apache/coyote/Response.java Thu Aug 15 10:32:15 2013 @@ -593,6 +593,21 @@ public final class Response { } this.listener = listener; + + // The container is responsible for the first call to + // listener.onWritePossible(). If isReady() returns true, the container + // needs to call listener.onWritePossible() from a new thread. If + // isReady() returns false, the socket will be registered for write and + // the container will call listener.onWritePossible() once data can be + // written. + if (isReady()) { + action(ActionCode.DISPATCH_WRITE, null); + // Need to set the fireListener flag otherwise when the container + // tries to trigger onWritePossible, nothing will happen + synchronized (nonBlockingStateLock) { + fireListener = true; + } + } } public boolean isReady() { Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractHttp11Processor.java Thu Aug 15 10:32:15 2013 @@ -51,6 +51,7 @@ import org.apache.tomcat.util.http.MimeH import org.apache.tomcat.util.log.UserDataHelper; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; +import org.apache.tomcat.util.net.DispatchType; import org.apache.tomcat.util.net.SocketStatus; import org.apache.tomcat.util.net.SocketWrapper; import org.apache.tomcat.util.res.StringManager; @@ -828,6 +829,10 @@ public abstract class AbstractHttp11Proc } else if (actionCode == ActionCode.REQUEST_BODY_FULLY_READ) { AtomicBoolean result = (AtomicBoolean) param; result.set(getInputBuffer().isFinished()); + } else if (actionCode == ActionCode.DISPATCH_READ) { + socketWrapper.addDispatch(DispatchType.NON_BLOCKING_READ); + } else if (actionCode == ActionCode.DISPATCH_WRITE) { + socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); } else { actionInternal(actionCode, param); } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalInputBuffer.java Thu Aug 15 10:32:15 2013 @@ -554,7 +554,10 @@ public class InternalInputBuffer extends @Override protected int nbRead() throws IOException { - throw new IllegalStateException("This method is unused for BIO"); + // If this gets called for BIO need to make caller think there is data + // to read as BIO always reads whether there is data or not (and blocks + // until there is data to read). + return 1; } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Aug 15 10:32:15 2013 @@ -1718,16 +1718,30 @@ public class AprEndpoint extends Abstrac // application code. By signalling read/write is possible, a // read/write will be attempted, fail and that will trigger // an exception the application will see. - if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN || - (wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) { - // Must be doing a non-blocking read + // Check the return flags first, followed by what the socket + // was registered for + if ((desc[n*2] & Poll.APR_POLLIN) == Poll.APR_POLLIN) { + // Error probably occurred during a non-blocking read if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { // Close socket and clear pool destroySocket(desc[n*2+1]); } - } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT || - (wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { - // Must be doing an non-blocking write write + } else if ((desc[n*2] & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { + // Error probably occurred during a non-blocking write + if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { + // Close socket and clear pool + destroySocket(desc[n*2+1]); + } + } else if ((wrapper.pollerFlags & Poll.APR_POLLIN) == Poll.APR_POLLIN) { + // Can't tell what was happening when the error occurred but the + // socket is registered for non-blocking read so use that + if (!processSocket(desc[n*2+1], SocketStatus.OPEN_READ)) { + // Close socket and clear pool + destroySocket(desc[n*2+1]); + } + } else if ((wrapper.pollerFlags & Poll.APR_POLLOUT) == Poll.APR_POLLOUT) { + // Can't tell what was happening when the error occurred but the + // socket is registered for non-blocking write so use that if (!processSocket(desc[n*2+1], SocketStatus.OPEN_WRITE)) { // Close socket and clear pool destroySocket(desc[n*2+1]); Added: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java?rev=1514228&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java Thu Aug 15 10:32:15 2013 @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.net; + +/** + * This enumeration lists the different types of dispatches that request + * processing can trigger. In this instance, dispatch means re-process this + * request using the given socket status. + */ +public enum DispatchType { + + NON_BLOCKING_READ(SocketStatus.OPEN_READ), + NON_BLOCKING_WRITE(SocketStatus.OPEN_WRITE); + + private final SocketStatus status; + + private DispatchType(SocketStatus status) { + this.status = status; + } + + public SocketStatus getSocketStatus() { + return status; + } +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/net/DispatchType.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapper.java Thu Aug 15 10:32:15 2013 @@ -16,6 +16,9 @@ */ package org.apache.tomcat.util.net; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -60,6 +63,8 @@ public class SocketWrapper<E> { private final Object writeThreadLock = new Object(); public Object getWriteThreadLock() { return writeThreadLock; } + private Set<DispatchType> dispatches = new LinkedHashSet<>(); + public SocketWrapper(E socket) { this.socket = socket; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -108,4 +113,19 @@ public class SocketWrapper<E> { public WriteLock getBlockingStatusWriteLock() { return blockingStatusWriteLock; } + public void addDispatch(DispatchType dispatchType) { + dispatches.add(dispatchType); + } + public boolean hasNextDispatch() { + return dispatches.size() > 0; + } + public DispatchType getNextDispatch() { + DispatchType result = null; + Iterator<DispatchType> iter = dispatches.iterator(); + if (iter.hasNext()) { + result = iter.next(); + iter.remove(); + } + return result; + } } Modified: tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java?rev=1514228&r1=1514227&r2=1514228&view=diff ============================================================================== --- tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java (original) +++ tomcat/trunk/test/org/apache/catalina/nonblocking/TestNonBlockingAPI.java Thu Aug 15 10:32:15 2013 @@ -251,14 +251,6 @@ public class TestNonBlockingAPI extends public void testNonBlockingWriteError() throws Exception { Tomcat tomcat = getTomcatInstance(); - // Not applicable to BIO. This test does not start a new thread for the - // write so with BIO all the writes happen in the service() method just - // like blocking IO. - if (tomcat.getConnector().getProtocolHandlerClassName().equals( - "org.apache.coyote.http11.Http11Protocol")) { - return; - } - // Must have a real docBase - just use temp StandardContext ctx = (StandardContext) tomcat.addContext( "", System.getProperty("java.io.tmpdir")); @@ -416,10 +408,8 @@ public class TestNonBlockingAPI extends }); // step 2 - notify on read ServletInputStream in = req.getInputStream(); - listener = new TestReadListener(actx); + listener = new TestReadListener(actx, false); in.setReadListener(listener); - - listener.onDataAvailable(); } } @@ -462,13 +452,12 @@ public class TestNonBlockingAPI extends }); // step 2 - notify on read ServletInputStream in = req.getInputStream(); - rlistener = new TestReadListener(actx); + rlistener = new TestReadListener(actx, true); in.setReadListener(rlistener); ServletOutputStream out = resp.getOutputStream(); resp.setBufferSize(200 * 1024); wlistener = new TestWriteListener(actx); out.setWriteListener(wlistener); - wlistener.onWritePossible(); } @@ -476,9 +465,12 @@ public class TestNonBlockingAPI extends private class TestReadListener implements ReadListener { private final AsyncContext ctx; private final StringBuilder body = new StringBuilder(); + private final boolean usingNonBlockingWrite; - public TestReadListener(AsyncContext ctx) { + public TestReadListener(AsyncContext ctx, + boolean usingNonBlockingWrite) { this.ctx = ctx; + this.usingNonBlockingWrite = usingNonBlockingWrite; } @Override @@ -501,18 +493,22 @@ public class TestNonBlockingAPI extends @Override public void onAllDataRead() { log.info("onAllDataRead"); - String msg; - if (body.toString().endsWith("FINISHED")) { - msg = "OK"; - } else { - msg = "FAILED"; - } - try { - ctx.getResponse().getOutputStream().print(msg); - } catch (IOException ioe) { - // Ignore + // If non-blocking writes are being used, don't write here as it + // will inject unexpected data into the write output. + if (!usingNonBlockingWrite) { + String msg; + if (body.toString().endsWith("FINISHED")) { + msg = "OK"; + } else { + msg = "FAILED"; + } + try { + ctx.getResponse().getOutputStream().print(msg); + } catch (IOException ioe) { + // Ignore + } + ctx.complete(); } - ctx.complete(); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org