Author: markt Date: Wed Feb 4 23:18:45 2015 New Revision: 1657442 URL: http://svn.apache.org/r1657442 Log: Re-work multiple-write registration fix.
Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java?rev=1657442&r1=1657441&r2=1657442&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeProcessor.java Wed Feb 4 23:18:45 2015 @@ -99,6 +99,7 @@ public class UpgradeProcessor implements public final SocketState upgradeDispatch(SocketStatus status) throws IOException { if (status == SocketStatus.OPEN_READ) { upgradeServletInputStream.onDataAvailable(); + upgradeServletOutputStream.checkWriteDispatch(); } else if (status == SocketStatus.OPEN_WRITE) { upgradeServletOutputStream.onWritePossible(); } else if (status == SocketStatus.STOP) { Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1657442&r1=1657441&r2=1657442&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Wed Feb 4 23:18:45 2015 @@ -34,9 +34,8 @@ public class UpgradeServletOutputStream protected final SocketWrapperBase<?> socketWrapper; // Used to ensure that isReady() and onWritePossible() have a consistent - // view of buffer and fireListener when determining if the listener should - // fire. - private final Object fireListenerLock = new Object(); + // view of buffer and registered. + private final Object registeredLock = new Object(); // Used to ensure that only one thread writes to the socket at a time and // that buffer is consistently updated with any unwritten data after the @@ -52,8 +51,13 @@ public class UpgradeServletOutputStream // Start in blocking-mode private volatile WriteListener listener = null; - // Guarded by fireListenerLock - private volatile boolean fireListener = false; + // Guarded by registeredLock + private volatile boolean registered = false; + + // Use to track if a dispatch needs to be arranged to trigger the first call + // to onWritePossible. If the socket gets registered for write while this is + // set then this will be ignored. + private volatile boolean writeDispatchRequired = false; private volatile ClassLoader applicationLoader = null; @@ -72,20 +76,19 @@ public class UpgradeServletOutputStream // Make sure isReady() and onWritePossible() have a consistent view of // fireListener when determining if the listener should fire - synchronized (fireListenerLock) { + synchronized (registeredLock) { if (flushing) { // Since flushing is true the socket must already be registered // for write and multiple registrations will cause problems. - fireListener = true; + registered = true; return false; - } else if (fireListener){ - // If the listener is configured to fire then the socket must - // already be registered for write and multiple registrations - // will cause problems. + } else if (registered){ + // The socket is already registered for write and multiple + // registrations will cause problems. return false; } else { boolean result = socketWrapper.isReadyForWrite(); - fireListener = !result; + registered = !result; return result; } } @@ -104,10 +107,8 @@ public class UpgradeServletOutputStream } // Container is responsible for first call to onWritePossible() but only // need to do this if setting the listener for the first time. - synchronized (fireListenerLock) { - fireListener = true; - } - socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); + writeDispatchRequired = true; + this.listener = listener; this.applicationLoader = Thread.currentThread().getContextClassLoader(); } @@ -199,6 +200,9 @@ public class UpgradeServletOutputStream return; } } else { + // This may fill the write buffer in which case the + // isReadyForWrite() call below will re-register the socket for + // write flushInternal(false, false); } @@ -206,10 +210,12 @@ public class UpgradeServletOutputStream // of buffer and fireListener when determining if the listener // should fire boolean fire = false; - synchronized (fireListenerLock) { - if (fireListener && socketWrapper.isReadyForWrite()) { - fireListener = false; + synchronized (registeredLock) { + if (socketWrapper.isReadyForWrite()) { + registered = false; fire = true; + } else { + registered = true; } } @@ -239,4 +245,16 @@ public class UpgradeServletOutputStream thread.setContextClassLoader(originalClassLoader); } } + + + void checkWriteDispatch() { + synchronized (registeredLock) { + if (writeDispatchRequired) { + writeDispatchRequired = false; + if (!registered) { + socketWrapper.addDispatch(DispatchType.NON_BLOCKING_WRITE); + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org