On 27/05/2015 18:09, [email protected] wrote: > Author: markt > Date: Wed May 27 17:09:47 2015 > New Revision: 1682079
Forgot to add that the NIO2 tests pass consistently on my laptop. We'll see what the CI system thinks in a couple of hours. Mark > > URL: http://svn.apache.org/r1682079 > Log: > Fix race condition in NIO2. The issue is: > - Thread one (T1) triggers a non-blocking read > - The read returns no data so a read (R1) is pending > - T1 completes processing > - R1 completes and notifies/dispatches to thread 2 (T2) > - T1 calls awaitBytes which triggers a non-blocking read > - T1's read returns no data so a read (R2) is pending > - T2 starts processing > - T2 tries to read but the read fails because R2 is pending (even though > there is data in the read buffer from R1). > > It isn't safe to read the data from the read buffer while R2 is pending since > R2 could modify the read buffer at any point. > > This fix ensures that R1 remains pending until T2 starts processing. This in > turn means that T1's call to awaitBytes() becomes a NO-OP. When T2 tries to > read since no read is pending it is able to read (and process) the data from > the read buffer and continue. > > Modified: > tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java > tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java > > Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java > URL: > http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java?rev=1682079&r1=1682078&r2=1682079&view=diff > ============================================================================== > --- tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java > (original) > +++ tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Protocol.java Wed > May 27 17:09:47 2015 > @@ -94,8 +94,9 @@ public class Http11Nio2Protocol extends > Processor processor, boolean addToPoller) { > processor.recycle(); > recycledProcessors.push(processor); > - // No need to add to poller. read() will have already been called > - // with an appropriate completion handler. > + if (addToPoller) { > + socket.registerReadInterest(); > + } > } > > > @@ -108,8 +109,7 @@ public class Http11Nio2Protocol extends > // - this is an upgraded connection > // - the request line/headers have not been completely > // read > - // The completion handlers should be in place, > - // so nothing to do here > + socket.registerReadInterest(); > } > } > > > Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java > URL: > http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1682079&r1=1682078&r2=1682079&view=diff > ============================================================================== > --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) > +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Wed May 27 > 17:09:47 2015 > @@ -582,13 +582,11 @@ public class Nio2Endpoint extends Abstra > failed(new ClosedChannelException(), attachment); > return; > } > - readPending.release(); > getEndpoint().processSocket(attachment, > SocketStatus.OPEN_READ, Nio2Endpoint.isInline()); > } > > @Override > public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> > attachment) { > - readPending.release(); > getEndpoint().processSocket(attachment, > SocketStatus.DISCONNECT, true); > } > }; > @@ -682,10 +680,13 @@ public class Nio2Endpoint extends Abstra > if (nBytes.intValue() < 0) { > failed(new EOFException(), attachment); > } else { > - readPending.release(); > if (readInterest && !Nio2Endpoint.isInline()) { > readInterest = false; > notify = true; > + } else { > + // Release here since there will be no > + // notify/dispatch to do the release. > + readPending.release(); > } > } > } > @@ -702,8 +703,10 @@ public class Nio2Endpoint extends Abstra > ioe = new IOException(exc); > } > Nio2SocketWrapper.this.setError(ioe); > - readPending.release(); > if (exc instanceof AsynchronousCloseException) { > + // Release here since there will be no > + // notify/dispatch to do the release. > + readPending.release(); > // If already closed, don't call onError and close > again > return; > } > @@ -903,6 +906,7 @@ public class Nio2Endpoint extends Abstra > if (log.isDebugEnabled()) { > log.debug("Socket: [" + this + "], Read from buffer: [" > + len + "]"); > } > + // No read is going to take place so release here. > readPending.release(); > return len; > } > @@ -1142,6 +1146,8 @@ public class Nio2Endpoint extends Abstra > try { > nRead = > getSocket().read(socketBufferHandler.getReadBuffer()).get( > getNio2ReadTimeout(), > TimeUnit.MILLISECONDS).intValue(); > + // Blocking read so need to release here since there will > + // not be a callback to a completion handler. > readPending.release(); > } catch (ExecutionException e) { > if (e.getCause() instanceof IOException) { > @@ -1311,14 +1317,28 @@ public class Nio2Endpoint extends Abstra > } > > > + /* > + * This should only be called from a thread that currently holds a > lock > + * on the socket. This prevents a race condition between a pending > read > + * being completed and processed and a thread triggering a new read. > + */ > + void releaseReadPending() { > + synchronized (readCompletionHandler) { > + if (readPending.availablePermits() == 0) { > + readPending.release(); > + } > + } > + } > + > + > @Override > public void registerReadInterest() { > synchronized (readCompletionHandler) { > if (readPending.availablePermits() == 0) { > readInterest = true; > } else { > - // If no read is pending, notify > - getEndpoint().processSocket(this, > SocketStatus.OPEN_READ, true); > + // If no read is pending, start waiting for data > + awaitBytes(); > } > } > } > @@ -1341,6 +1361,7 @@ public class Nio2Endpoint extends Abstra > if (getSocket() == null) { > return; > } > + // NO-OP is there is already a read in progress. > if (readPending.tryAcquire()) { > getSocket().getBufHandler().configureReadBufferForWrite(); > Nio2Endpoint.startInline(); > @@ -1595,6 +1616,11 @@ public class Nio2Endpoint extends Abstra > @Override > public void run() { > synchronized (socket) { > + if (SocketStatus.OPEN_WRITE != status) { > + // Anything other than OPEN_WRITE is a genuine read or an > + // error condition so for all of those release the > semaphore > + ((Nio2SocketWrapper) socket).releaseReadPending(); > + } > boolean launch = false; > try { > int handshake = -1; > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
