Author: markt
Date: Thu Jan 15 09:21:08 2015
New Revision: 1652002
URL: http://svn.apache.org/r1652002
Log:
NIO reafctoring
- Use read from socketWrapper rather than HttpNio2InputBuffer
- Various API tweaks to support the above
Modified:
tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java
tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
Modified: tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/Http11Nio2Processor.java Thu Jan
15 09:21:08 2015
@@ -85,7 +85,7 @@ public class Http11Nio2Processor extends
@Override
public SocketState asyncDispatch(SocketStatus status) {
SocketState state = super.asyncDispatch(status);
- if (state == SocketState.OPEN && ((InternalNio2InputBuffer)
getInputBuffer()).isPending()) {
+ if (state == SocketState.OPEN && socketWrapper.isReadPending()) {
// Following async processing, a read is still pending, so
// keep the processor associated
return SocketState.LONG;
@@ -97,7 +97,7 @@ public class Http11Nio2Processor extends
@Override
protected void registerForEvent(boolean read, boolean write) {
if (read) {
- ((InternalNio2InputBuffer)
getInputBuffer()).registerReadInterest();
+ socketWrapper.registerReadInterest();
}
if (write) {
socketWrapper.registerWriteInterest();
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprInputBuffer.java Thu
Jan 15 09:21:08 2015
@@ -37,12 +37,9 @@ public class InternalAprInputBuffer exte
private static final Log log =
LogFactory.getLog(InternalAprInputBuffer.class);
- // ----------------------------------------------------------- Constructors
+ // ----------------------------------------------------------- Constructors
- /**
- * Alternate constructor.
- */
public InternalAprInputBuffer(Request request, int headerBufferSize) {
super(request, headerBufferSize);
inputStreamInputBuffer = new SocketInputBuffer();
@@ -81,7 +78,7 @@ public class InternalAprInputBuffer exte
wrapper = socketWrapper;
- int bufLength = Math.max(headerBufferSize, 8192);
+ int bufLength = Math.max(headerBufferSize * 2, 8192);
if (buf == null || buf.length < bufLength) {
buf = new byte[bufLength];
}
@@ -116,9 +113,7 @@ public class InternalAprInputBuffer exte
* This class is an input buffer which will read its data from an input
* stream.
*/
- protected class SocketInputBuffer
- implements InputBuffer {
-
+ protected class SocketInputBuffer implements InputBuffer {
/**
* Read bytes into the specified chunk.
@@ -136,7 +131,7 @@ public class InternalAprInputBuffer exte
chunk.setBytes(buf, pos, length);
pos = lastValid;
- return (length);
+ return length;
}
}
}
Modified:
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java Thu
Jan 15 09:21:08 2015
@@ -18,14 +18,6 @@ package org.apache.coyote.http11;
import java.io.EOFException;
import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import javax.servlet.RequestDispatcher;
import org.apache.coyote.InputBuffer;
import org.apache.coyote.Request;
@@ -34,8 +26,6 @@ import org.apache.juli.logging.LogFactor
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.Nio2Channel;
-import org.apache.tomcat.util.net.Nio2Endpoint;
-import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapperBase;
/**
@@ -46,87 +36,39 @@ public class InternalNio2InputBuffer ext
private static final Log log =
LogFactory.getLog(InternalNio2InputBuffer.class);
- // ----------------------------------------------------------- Constructors
+ // ----------------------------------------------------------- Constructors
public InternalNio2InputBuffer(Request request, int headerBufferSize) {
super(request, headerBufferSize);
inputStreamInputBuffer = new SocketInputBuffer();
}
- /**
- * Underlying socket.
- */
- private SocketWrapperBase<Nio2Channel> socket;
-
- /**
- * Track write interest
- */
- protected volatile boolean interest = false;
-
- /**
- * The completion handler used for asynchronous read operations
- */
- private CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>
completionHandler;
-
- /**
- * The associated endpoint.
- */
- protected AbstractEndpoint<Nio2Channel> endpoint = null;
-
- /**
- * Read pending flag.
- */
- protected volatile boolean readPending = false;
+ // ----------------------------------------------------- Instance Variables
- /**
- * Exception that occurred during writing.
- */
- protected IOException e = null;
+ private SocketWrapperBase<Nio2Channel> wrapper;
- /**
- * Track if the byte buffer is flipped
- */
- protected volatile boolean flipped = false;
// --------------------------------------------------------- Public Methods
- @Override
- protected final Log getLog() {
- return log;
- }
-
-
/**
* Recycle the input buffer. This should be called when closing the
* connection.
*/
@Override
public void recycle() {
+ wrapper = null;
super.recycle();
- socket = null;
- readPending = false;
- flipped = false;
- interest = false;
- e = null;
}
- /**
- * End processing of current HTTP request.
- * Note: All bytes of the current request should have been already
- * consumed. This method only resets all the pointers so that we are ready
- * to parse the next HTTP request.
- */
+ // ------------------------------------------------------ Protected Methods
+
@Override
- public void nextRequest() {
- super.nextRequest();
- interest = false;
+ protected final Log getLog() {
+ return log;
}
- public boolean isPending() {
- return readPending;
- }
// ------------------------------------------------------ Protected Methods
@@ -134,62 +76,17 @@ public class InternalNio2InputBuffer ext
protected void init(SocketWrapperBase<Nio2Channel> socketWrapper,
AbstractEndpoint<Nio2Channel> associatedEndpoint) throws
IOException {
- endpoint = associatedEndpoint;
- socket = socketWrapper;
- if (socket == null) {
- // Socket has been closed in another thread
- throw new IOException(sm.getString("iib.socketClosed"));
- }
- socketReadBufferSize =
- socket.getSocket().getBufHandler().getReadBuffer().capacity();
+ wrapper = socketWrapper;
- int bufLength = headerBufferSize + socketReadBufferSize;
+ int bufLength = headerBufferSize +
wrapper.getSocket().getBufHandler().getReadBuffer().capacity();
if (buf == null || buf.length < bufLength) {
buf = new byte[bufLength];
}
-
- // Initialize the completion handler
- this.completionHandler = new CompletionHandler<Integer,
SocketWrapperBase<Nio2Channel>>() {
-
- @Override
- public void completed(Integer nBytes,
SocketWrapperBase<Nio2Channel> attachment) {
- boolean notify = false;
- synchronized (completionHandler) {
- if (nBytes.intValue() < 0) {
- failed(new
EOFException(sm.getString("iib.eof.error")), attachment);
- } else {
- readPending = false;
- if ((request.getReadListener() == null || interest) &&
!Nio2Endpoint.isInline()) {
- interest = false;
- notify = true;
- }
- }
- }
- if (notify) {
- endpoint.processSocket(attachment, SocketStatus.OPEN_READ,
false);
- }
- }
-
- @Override
- public void failed(Throwable exc, SocketWrapperBase<Nio2Channel>
attachment) {
- if (exc instanceof IOException) {
- e = (IOException) exc;
- } else {
- e = new IOException(exc);
- }
- attachment.setError(e);
- request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
- readPending = false;
- endpoint.processSocket(attachment, SocketStatus.OPEN_READ,
true);
- }
- };
}
@Override
protected boolean fill(boolean block) throws IOException, EOFException {
- if (e != null) {
- throw e;
- }
+
if (parsingHeader) {
if (lastValid > headerBufferSize) {
throw new IllegalArgumentException
@@ -198,127 +95,24 @@ public class InternalNio2InputBuffer ext
} else {
lastValid = pos = end;
}
- // Now fill the internal buffer
- int nRead = 0;
- ByteBuffer byteBuffer =
socket.getSocket().getBufHandler().getReadBuffer();
- if (block) {
- if (!flipped) {
- byteBuffer.flip();
- flipped = true;
- }
- int nBytes = byteBuffer.remaining();
- // This case can happen when a blocking read follows a non blocking
- // fill that completed asynchronously
- if (nBytes > 0) {
- expand(nBytes + pos);
- byteBuffer.get(buf, pos, nBytes);
- lastValid = pos + nBytes;
- byteBuffer.clear();
- flipped = false;
- return true;
- } else {
- byteBuffer.clear();
- flipped = false;
- try {
- nRead = socket.getSocket().read(byteBuffer)
- .get(socket.getTimeout(),
TimeUnit.MILLISECONDS).intValue();
- } catch (ExecutionException e) {
- if (e.getCause() instanceof IOException) {
- throw (IOException) e.getCause();
- } else {
- throw new IOException(e);
- }
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (TimeoutException e) {
- throw new SocketTimeoutException();
- }
- if (nRead > 0) {
- if (!flipped) {
- byteBuffer.flip();
- flipped = true;
- }
- expand(nRead + pos);
- byteBuffer.get(buf, pos, nRead);
- lastValid = pos + nRead;
- return true;
- } else if (nRead == -1) {
- //return false;
- throw new EOFException(sm.getString("iib.eof.error"));
- } else {
- return false;
- }
- }
- } else {
- synchronized (completionHandler) {
- if (!readPending) {
- if (!flipped) {
- byteBuffer.flip();
- flipped = true;
- }
- int nBytes = byteBuffer.remaining();
- if (nBytes > 0) {
- expand(nBytes + pos);
- byteBuffer.get(buf, pos, nBytes);
- lastValid = pos + nBytes;
- byteBuffer.clear();
- flipped = false;
- } else {
- byteBuffer.clear();
- flipped = false;
- readPending = true;
- Nio2Endpoint.startInline();
- socket.getSocket().read(byteBuffer,
socket.getTimeout(),
- TimeUnit.MILLISECONDS, socket,
completionHandler);
- Nio2Endpoint.endInline();
- // Return the number of bytes that have been placed
into the buffer
- if (!readPending) {
- // If the completion handler completed immediately
- if (!flipped) {
- byteBuffer.flip();
- flipped = true;
- }
- nBytes = byteBuffer.remaining();
- if (nBytes > 0) {
- expand(nBytes + pos);
- byteBuffer.get(buf, pos, nBytes);
- lastValid = pos + nBytes;
- }
- byteBuffer.clear();
- flipped = false;
- }
- }
- return (lastValid - pos) > 0;
- } else {
- return false;
- }
- }
- }
- }
-
- public void registerReadInterest() {
- synchronized (completionHandler) {
- if (readPending) {
- interest = true;
- } else {
- // If no read is pending, notify
- endpoint.processSocket(socket, SocketStatus.OPEN_READ, true);
- }
+ int nRead = wrapper.read(block, buf, pos, buf.length - pos);
+ if (nRead > 0) {
+ lastValid = pos + nRead;
+ return true;
}
+
+ return false;
}
// ------------------------------------- InputStreamInputBuffer Inner Class
-
/**
* This class is an input buffer which will read its data from an input
* stream.
*/
- protected class SocketInputBuffer
- implements InputBuffer {
-
+ protected class SocketInputBuffer implements InputBuffer {
/**
* Read bytes into the specified chunk.
@@ -331,19 +125,12 @@ public class InternalNio2InputBuffer ext
if (!fill(true)) //read body, must be blocking, as the thread
is inside the app
return -1;
}
- if (isBlocking()) {
- int length = lastValid - pos;
- chunk.setBytes(buf, pos, length);
- pos = lastValid;
- return (length);
- } else {
- synchronized (completionHandler) {
- int length = lastValid - pos;
- chunk.setBytes(buf, pos, length);
- pos = lastValid;
- return (length);
- }
- }
+
+ int length = lastValid - pos;
+ chunk.setBytes(buf, pos, length);
+ pos = lastValid;
+
+ return length;
}
}
}
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java Thu
Jan 15 09:21:08 2015
@@ -80,7 +80,7 @@ public class InternalNioInputBuffer exte
wrapper = socketWrapper;
- int bufLength = Math.max(headerBufferSize, 8192);
+ int bufLength = headerBufferSize +
wrapper.getSocket().getBufHandler().getReadBuffer().capacity();
if (buf == null || buf.length < bufLength) {
buf = new byte[bufLength];
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan 15
09:21:08 2015
@@ -2629,8 +2629,20 @@ public class AprEndpoint extends Abstrac
@Override
+ public boolean isReadPending() {
+ return false;
+ }
+
+
+ @Override
+ public void registerReadInterest() {
+ regsiterForEvent(true, false);
+ }
+
+
+ @Override
public void registerWriteInterest() {
- ((AprEndpoint)
getEndpoint()).getPoller().add(getSocket().longValue(), -1, false, true);
+ regsiterForEvent(false, true);
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 15
09:21:08 2015
@@ -977,6 +977,9 @@ public class Nio2Endpoint extends Abstra
@Override
public int read(boolean block, byte[] b, int off, int len) throws
IOException {
+ if (getError() != null) {
+ throw getError();
+ }
if (log.isDebugEnabled()) {
log.debug("Socket: [" + this + "], block: [" + block + "],
length: [" + len + "]");
@@ -1242,6 +1245,27 @@ public class Nio2Endpoint extends Abstra
}
}
+
+ @Override
+ public boolean isReadPending() {
+ synchronized (readCompletionHandler) {
+ return readPending;
+ }
+ }
+
+
+ @Override
+ public void registerReadInterest() {
+ synchronized (readCompletionHandler) {
+ if (readPending) {
+ readInterest = true;
+ } else {
+ // If no read is pending, notify
+ getEndpoint().processSocket(this, SocketStatus.OPEN_READ,
true);
+ }
+ }
+ }
+
@Override
public void registerWriteInterest() {
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan 15
09:21:08 2015
@@ -1548,6 +1548,18 @@ public class NioEndpoint extends Abstrac
@Override
+ public boolean isReadPending() {
+ return false;
+ }
+
+
+ @Override
+ public void registerReadInterest() {
+ getPoller().add(getSocket(), SelectionKey.OP_READ);
+ }
+
+
+ @Override
public void registerWriteInterest() {
getPoller().add(getSocket(), SelectionKey.OP_WRITE);
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1652002&r1=1652001&r2=1652002&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan
15 09:21:08 2015
@@ -178,6 +178,8 @@ public abstract class SocketWrapperBase<
}
public Object getWriteThreadLock() { return writeThreadLock; }
+ public abstract boolean isReadPending();
+
protected boolean hasMoreDataToFlush() {
return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) ||
(!writeBufferFlipped && socketWriteBuffer.position() > 0);
@@ -498,6 +500,8 @@ public abstract class SocketWrapperBase<
holder.getBuf().put(buf,offset,length);
}
+ public abstract void registerReadInterest();
+
public abstract void registerWriteInterest();
public abstract void regsiterForEvent(boolean read, boolean write);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]