Author: markt Date: Thu Jan 15 09:21:24 2015 New Revision: 1652004 URL: http://svn.apache.org/r1652004 Log: Fix first set of issues found with NIO2 and new SocketBufferHandler
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java?rev=1652004&r1=1652003&r2=1652004&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java Thu Jan 15 09:21:24 2015 @@ -118,8 +118,7 @@ public class InternalNio2InputBuffer ext * Read bytes into the specified chunk. */ @Override - public int doRead(ByteChunk chunk, Request req ) - throws IOException { + public int doRead(ByteChunk chunk, Request req ) throws IOException { if (pos >= lastValid) { if (!fill(true)) //read body, must be blocking, as the thread is inside the app Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java?rev=1652004&r1=1652003&r2=1652004&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Channel.java Thu Jan 15 09:21:24 2015 @@ -52,21 +52,13 @@ public class Nio2Channel implements Asyn throws IOException { this.sc = channel; this.socket = socket; - bufHandler.getReadBuffer().clear(); - bufHandler.getWriteBuffer().clear(); + bufHandler.reset(); } public SocketWrapperBase<Nio2Channel> getSocket() { return socket; } - public int getBufferSize() { - if ( bufHandler == null ) return 0; - int size = 0; - size += bufHandler.getReadBuffer()!=null?bufHandler.getReadBuffer().capacity():0; - size += bufHandler.getWriteBuffer()!=null?bufHandler.getWriteBuffer().capacity():0; - return size; - } /** * Closes this channel. Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1652004&r1=1652003&r2=1652004&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 15 09:21:24 2015 @@ -726,8 +726,7 @@ public class Nio2Endpoint extends Abstra private boolean upgradeInit = false; private final CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>> readCompletionHandler; - private boolean flipped = false; - private volatile boolean readPending = false; + private final Semaphore readPending = new Semaphore(1); private volatile boolean readInterest = true; private final CompletionHandler<Integer, ByteBuffer> writeCompletionHandler; @@ -751,7 +750,7 @@ public class Nio2Endpoint extends Abstra if (nBytes.intValue() < 0) { failed(new EOFException(), attachment); } else { - readPending = false; + readPending.release(); if (readInterest && !Nio2Endpoint.isInline()) { readInterest = false; notify = true; @@ -771,7 +770,7 @@ public class Nio2Endpoint extends Abstra ioe = new IOException(exc); } Nio2SocketWrapper.this.setError(ioe); - readPending = false; + readPending.release(); if (exc instanceof AsynchronousCloseException) { // If already closed, don't call onError and close again return; @@ -947,30 +946,21 @@ public class Nio2Endpoint extends Abstra @Override public boolean isReady() throws IOException { synchronized (readCompletionHandler) { - if (readPending) { + if (!readPending.tryAcquire()) { readInterest = true; return false; } - ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer(); - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - if (readBuffer.remaining() > 0) { + + socketBufferHandler.configureReadBufferForRead(); + if (!socketBufferHandler.isReadBufferEmpty()) { return true; } - readBuffer.clear(); - flipped = false; int nRead = fillReadBuffer(false); boolean isReady = nRead > 0; - if (isReady) { - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - } else { + + if (!isReady) { readInterest = true; } return isReady; @@ -988,27 +978,33 @@ public class Nio2Endpoint extends Abstra log.debug("Socket: [" + this + "], block: [" + block + "], length: [" + len + "]"); } - synchronized (readCompletionHandler) { - if (readPending) { + if (block) { + try { + readPending.acquire(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } else { + if (!readPending.tryAcquire()) { if (log.isDebugEnabled()) { log.debug("Socket: [" + this + "], Read: [0]"); } return 0; } + } - ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer(); + synchronized (readCompletionHandler) { + socketBufferHandler.configureReadBufferForRead(); + + int remaining = socketBufferHandler.getReadBuffer().remaining(); - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - int remaining = readBuffer.remaining(); // Is there enough data in the read buffer to satisfy this request? if (remaining >= len) { - readBuffer.get(b, off, len); + socketBufferHandler.getReadBuffer().get(b, off, len); if (log.isDebugEnabled()) { log.debug("Socket: [" + this + "], Read from buffer: [" + len + "]"); } + readPending.release(); return len; } @@ -1016,39 +1012,29 @@ public class Nio2Endpoint extends Abstra int leftToWrite = len; int newOffset = off; if (remaining > 0) { - readBuffer.get(b, off, remaining); + socketBufferHandler.getReadBuffer().get(b, off, remaining); leftToWrite -= remaining; newOffset += remaining; } - // Fill the read buffer as best we can - readBuffer.clear(); - flipped = false; - int nRead = fillReadBuffer(block); + // Fill the read buffer as best we can. Only do a blocking read if + // the current read is blocking AND there wasn't any data left over + // in the read buffer. + int nRead = fillReadBuffer(block && remaining == 0); - // Full as much of the remaining byte array as possible with the data - // that was just read + // Fill as much of the remaining byte array as possible with the + // data that was just read if (nRead > 0) { - if (!flipped) { - readBuffer.flip(); - flipped = true; - } + socketBufferHandler.configureReadBufferForRead(); if (nRead > leftToWrite) { - readBuffer.get(b, newOffset, leftToWrite); + socketBufferHandler.getReadBuffer().get(b, newOffset, leftToWrite); leftToWrite = 0; } else { - readBuffer.get(b, newOffset, nRead); + socketBufferHandler.getReadBuffer().get(b, newOffset, nRead); leftToWrite -= nRead; } - } else if (nRead == 0) { - if (block) { - if (!flipped) { - readBuffer.flip(); - flipped = true; - } - } else { - readInterest = true; - } + } else if (nRead == 0 && !block) { + readInterest = true; } else if (nRead == -1) { throw new EOFException(); } @@ -1056,7 +1042,6 @@ public class Nio2Endpoint extends Abstra if (log.isDebugEnabled()) { log.debug("Socket: [" + this + "], Read: [" + (len - leftToWrite) + "]"); } - return len - leftToWrite; } } @@ -1065,7 +1050,8 @@ public class Nio2Endpoint extends Abstra @Override public void unRead(ByteBuffer returnedInput) { if (returnedInput != null) { - getSocket().getBufHandler().getReadBuffer().put(returnedInput); + socketBufferHandler.configureReadBufferForWrite(); + socketBufferHandler.getReadBuffer().put(returnedInput); } } @@ -1079,17 +1065,17 @@ public class Nio2Endpoint extends Abstra } + /* Callers of this method must: + * - have acquired the readPending semaphore + * - have acquired a lock on readCompletionHandler + */ private int fillReadBuffer(boolean block) throws IOException { - ByteBuffer readBuffer = getSocket().getBufHandler().getReadBuffer(); + socketBufferHandler.configureReadBufferForWrite(); int nRead = 0; if (block) { - readPending = true; - readBuffer.clear(); - flipped = false; try { - nRead = getSocket().read(readBuffer) - .get(getTimeout(), TimeUnit.MILLISECONDS).intValue(); - readPending = false; + nRead = getSocket().read(socketBufferHandler.getReadBuffer()).get( + getTimeout(), TimeUnit.MILLISECONDS).intValue(); } catch (ExecutionException e) { if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); @@ -1103,15 +1089,12 @@ public class Nio2Endpoint extends Abstra throw ex; } } else { - readPending = true; - readBuffer.clear(); - flipped = false; Nio2Endpoint.startInline(); - getSocket().read(readBuffer, getTimeout(), TimeUnit.MILLISECONDS, + getSocket().read(socketBufferHandler.getReadBuffer(), getTimeout(), TimeUnit.MILLISECONDS, this, readCompletionHandler); Nio2Endpoint.endInline(); - if (!readPending) { - nRead = readBuffer.position(); + if (readPending.availablePermits() == 1) { + nRead = socketBufferHandler.getReadBuffer().position(); } } return nRead; @@ -1230,7 +1213,9 @@ public class Nio2Endpoint extends Abstra writeCompletionHandler); } else { // Nothing was written - writePending.release(); + if (!hasPermit) { + writePending.release(); + } } } return hasDataToWrite(); @@ -1250,7 +1235,7 @@ public class Nio2Endpoint extends Abstra @Override public boolean isReadPending() { synchronized (readCompletionHandler) { - return readPending; + return readPending.availablePermits() == 0; } } @@ -1258,11 +1243,13 @@ public class Nio2Endpoint extends Abstra @Override public void registerReadInterest() { synchronized (readCompletionHandler) { - if (readPending) { - readInterest = true; - } else { + if (readPending.tryAcquire()) { + readPending.release(); + // If no read is pending, notify getEndpoint().processSocket(this, SocketStatus.OPEN_READ, true); + } else { + readInterest = true; } } } @@ -1352,10 +1339,9 @@ public class Nio2Endpoint extends Abstra if (socket == null || socket.getSocket() == null) { return; } - ByteBuffer byteBuffer = socket.getSocket().getBufHandler().getReadBuffer(); - byteBuffer.clear(); - socket.getSocket().read(byteBuffer, socket.getTimeout(), - TimeUnit.MILLISECONDS, socket, awaitBytes); + socket.getSocket().getBufHandler().configureReadBufferForWrite(); + socket.getSocket().read(socket.getSocket().getBufHandler().getReadBuffer(), + socket.getTimeout(), TimeUnit.MILLISECONDS, socket, awaitBytes); } public enum SendfileState { @@ -1449,8 +1435,8 @@ public class Nio2Endpoint extends Abstra return SendfileState.ERROR; } } + socket.getSocket().getBufHandler().configureReadBufferForWrite(); ByteBuffer buffer = socket.getSocket().getBufHandler().getWriteBuffer(); - buffer.clear(); int nRead = -1; try { nRead = data.fchannel.read(buffer); @@ -1459,10 +1445,10 @@ public class Nio2Endpoint extends Abstra } if (nRead >= 0) { - buffer.flip(); data.socket = socket; data.buffer = buffer; data.length -= nRead; + socket.getSocket().getBufHandler().configureReadBufferForRead(); socket.getSocket().write(buffer, socket.getTimeout(), TimeUnit.MILLISECONDS, data, sendfile); if (data.doneInline) { Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=1652004&r1=1652003&r2=1652004&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioChannel.java Thu Jan 15 09:21:24 2015 @@ -64,13 +64,6 @@ public class NioChannel implements ByteC this.sendFile = false; } - public int getBufferSize() { - if ( bufHandler == null ) return 0; - int size = 0; - size += bufHandler.getReadBuffer().capacity(); - size += bufHandler.getWriteBuffer().capacity(); - return size; - } /** * Returns true if the network buffer has been flushed out and is empty. Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1652004&r1=1652003&r2=1652004&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan 15 09:21:24 2015 @@ -1438,8 +1438,10 @@ public class NioEndpoint extends Abstrac newOffset += remaining; } - // Fill the read buffer as best we can - int nRead = fillReadBuffer(block); + // Fill the read buffer as best we can. Only do a blocking read if + // the current read is blocking AND there wasn't any data left over + // in the read buffer. + int nRead = fillReadBuffer(block && remaining == 0); // Full as much of the remaining byte array as possible with the // data that was just read Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java?rev=1652004&r1=1652003&r2=1652004&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNio2Channel.java Thu Jan 15 09:21:24 2015 @@ -126,13 +126,6 @@ public class SecureNio2Channel extends N handshakeStatus = sslEngine.getHandshakeStatus(); } - @Override - public int getBufferSize() { - int size = super.getBufferSize(); - size += netInBuffer!=null?netInBuffer.capacity():0; - size += netOutBuffer!=null?netOutBuffer.capacity():0; - return size; - } private class FutureFlush implements Future<Boolean> { private Future<Integer> integer; @@ -308,17 +301,14 @@ public class SecureNio2Channel extends N //validate the network buffers are empty if (netInBuffer.position() > 0 && netInBuffer.position() < netInBuffer.limit()) throw new IOException(sm.getString("channel.nio.ssl.netInputNotEmpty")); if (netOutBuffer.position() > 0 && netOutBuffer.position() < netOutBuffer.limit()) throw new IOException(sm.getString("channel.nio.ssl.netOutputNotEmpty")); - ByteBuffer readBuffer = getBufHandler().getReadBuffer(); - ByteBuffer writeBuffer = getBufHandler().getWriteBuffer(); - if (readBuffer.position() > 0 && readBuffer.position() < readBuffer.limit()) throw new IOException(sm.getString("channel.nio.ssl.appInputNotEmpty")); - if (writeBuffer.position() > 0 && writeBuffer.position() < writeBuffer.limit()) throw new IOException(sm.getString("channel.nio.ssl.appOutputNotEmpty")); + if (!getBufHandler().isReadBufferEmpty()) throw new IOException(sm.getString("channel.nio.ssl.appInputNotEmpty")); + if (!getBufHandler().isWriteBufferEmpty()) throw new IOException(sm.getString("channel.nio.ssl.appOutputNotEmpty")); netOutBuffer.position(0); netOutBuffer.limit(0); netInBuffer.position(0); netInBuffer.limit(0); - readBuffer.clear(); - writeBuffer.clear(); + getBufHandler().reset(); handshakeComplete = false; //initiate handshake @@ -366,6 +356,7 @@ public class SecureNio2Channel extends N //so we can clear it here. netOutBuffer.clear(); //perform the wrap + bufHandler.configureWriteBufferForRead(); SSLEngineResult result = sslEngine.wrap(bufHandler.getWriteBuffer(), netOutBuffer); //prepare the results to be written netOutBuffer.flip(); @@ -392,6 +383,7 @@ public class SecureNio2Channel extends N //prepare the buffer with the incoming data netInBuffer.flip(); //call unwrap + bufHandler.configureReadBufferForWrite(); result = sslEngine.unwrap(netInBuffer, bufHandler.getReadBuffer()); //compact the buffer, this is an optional method, wonder what would happen if we didn't netInBuffer.compact(); Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?rev=1652004&r1=1652003&r2=1652004&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java Thu Jan 15 09:21:24 2015 @@ -84,14 +84,6 @@ public class SecureNioChannel extends Ni handshakeStatus = sslEngine.getHandshakeStatus(); } - @Override - public int getBufferSize() { - int size = super.getBufferSize(); - size += netInBuffer!=null?netInBuffer.capacity():0; - size += netOutBuffer!=null?netOutBuffer.capacity():0; - return size; - } - //=========================================================================================== // NIO SSL METHODS --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org