Author: markt Date: Thu Jan 8 13:09:54 2015 New Revision: 1650269 URL: http://svn.apache.org/r1650269 Log: Move writes and associated buffers to SocketWrapper for NIO. NIO2/APR likely broken at this point.
Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Modified: tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java (original) +++ tomcat/trunk/java/org/apache/coyote/ajp/AjpProcessor.java Thu Jan 8 13:09:54 2015 @@ -24,8 +24,6 @@ import java.nio.ByteBuffer; import java.security.NoSuchProviderException; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; -import java.util.Iterator; -import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.RequestDispatcher; @@ -44,7 +42,6 @@ import org.apache.coyote.Response; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.ExceptionUtils; -import org.apache.tomcat.util.buf.ByteBufferHolder; import org.apache.tomcat.util.buf.ByteChunk; import org.apache.tomcat.util.buf.HexUtils; import org.apache.tomcat.util.buf.MessageBytes; @@ -183,22 +180,6 @@ public class AjpProcessor<S> extends Abs /** - * The max size of the buffered write buffer - */ - private int bufferedWriteSize = 64*1024; //64k default write buffer - - - /** - * For "non-blocking" writes use an external set of buffers. Although the - * API only allows one non-blocking write at a time, due to buffering and - * the possible need to write HTTP headers, there may be more than one write - * to the OutputBuffer. - */ - private final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites = - new LinkedBlockingDeque<>(); - - - /** * Host name (used to avoid useless B2C conversion on the host name). */ protected char[] hostNameC = new char[0]; @@ -605,7 +586,7 @@ public class AjpProcessor<S> extends Abs } case NB_WRITE_INTEREST: { AtomicBoolean isReady = (AtomicBoolean)param; - boolean result = bufferedWrites.size() == 0 && responseMsgPos == -1; + boolean result = !socketWrapper.hasDataToWrite() && responseMsgPos == -1; isReady.set(result); if (!result) { registerForEvent(false, true); @@ -647,7 +628,8 @@ public class AjpProcessor<S> extends Abs asyncStateMachine.asyncOperation(); try { if (hasDataToWrite()) { - flushBufferedData(); + boolean blocking = (response.getWriteListener() == null); + socketWrapper.flush(blocking); if (hasDataToWrite()) { // There is data to write but go via Response to // maintain a consistent view of non-blocking state @@ -755,7 +737,7 @@ public class AjpProcessor<S> extends Abs } cping = true; try { - output(pongMessageArray, 0, pongMessageArray.length, true); + socketWrapper.write(true, pongMessageArray, 0, pongMessageArray.length); } catch (IOException e) { setErrorState(ErrorState.CLOSE_NOW, e); } @@ -1053,7 +1035,7 @@ public class AjpProcessor<S> extends Abs // Request more data immediately if (!waitingForBodyMessage) { - output(getBodyMessageArray, 0, getBodyMessageArray.length, true); + socketWrapper.write(true, getBodyMessageArray, 0, getBodyMessageArray.length); waitingForBodyMessage = true; } @@ -1460,7 +1442,7 @@ public class AjpProcessor<S> extends Abs // Write to buffer responseMessage.end(); - output(responseMessage.getBuffer(), 0, responseMessage.getLen(), true); + socketWrapper.write(true, responseMessage.getBuffer(), 0, responseMessage.getLen()); } @@ -1473,7 +1455,7 @@ public class AjpProcessor<S> extends Abs // TODO Validate the assertion above if (explicit && !finished) { // Send the flush message - output(flushMessageArray, 0, flushMessageArray.length, true); + socketWrapper.write(true, flushMessageArray, 0, flushMessageArray.length); } } @@ -1505,22 +1487,13 @@ public class AjpProcessor<S> extends Abs // Add the end message if (getErrorState().isError()) { - output(endAndCloseMessageArray, 0, endAndCloseMessageArray.length, true); + socketWrapper.write(true, endAndCloseMessageArray, 0, endAndCloseMessageArray.length); } else { - output(endMessageArray, 0, endMessageArray.length, true); + socketWrapper.write(true, endMessageArray, 0, endMessageArray.length); } } - private int output(byte[] src, int offset, int length, - boolean block) throws IOException { - if (socketWrapper == null || socketWrapper.getSocket() == null) - return -1; - - return socketWrapper.write(block, src, offset, length); - } - - private boolean available() { if (endOfStream) { return false; @@ -1569,15 +1542,12 @@ public class AjpProcessor<S> extends Abs socketWrapper.access(); boolean blocking = (response.getWriteListener() == null); - if (!blocking) { - flushBufferedData(); - } int len = chunk.getLength(); int off = 0; // Write this chunk - while (responseMsgPos == -1 && len > 0) { + while (len > 0) { int thisTime = len; if (thisTime > outputMaxChunkSize) { thisTime = outputMaxChunkSize; @@ -1586,96 +1556,18 @@ public class AjpProcessor<S> extends Abs responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK); responseMessage.appendBytes(chunk.getBytes(), chunk.getOffset() + off, thisTime); responseMessage.end(); - writeResponseMessage(blocking); + socketWrapper.write(blocking, responseMessage.getBuffer(), 0, responseMessage.getLen()); len -= thisTime; off += thisTime; } bytesWritten += off; - - if (len > 0) { - // Add this chunk to the buffer - addToBuffers(chunk.getBuffer(), off, len); - } - } - - - private void addToBuffers(byte[] buf, int offset, int length) { - ByteBufferHolder holder = bufferedWrites.peekLast(); - if (holder == null || holder.isFlipped() || holder.getBuf().remaining() < length) { - ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); - holder = new ByteBufferHolder(buffer, false); - bufferedWrites.add(holder); - } - holder.getBuf().put(buf, offset, length); } private boolean hasDataToWrite() { - return responseMsgPos != -1 || bufferedWrites.size() > 0; - } - - - private void flushBufferedData() throws IOException { - - if (responseMsgPos > -1) { - // Must be using non-blocking IO - // Partially written response message. Try and complete it. - writeResponseMessage(false); - } - - while (responseMsgPos == -1 && bufferedWrites.size() > 0) { - // Try and write any remaining buffer data - Iterator<ByteBufferHolder> holders = bufferedWrites.iterator(); - ByteBufferHolder holder = holders.next(); - holder.flip(); - ByteBuffer buffer = holder.getBuf(); - int initialBufferSize = buffer.remaining(); - while (responseMsgPos == -1 && buffer.remaining() > 0) { - transferToResponseMsg(buffer); - writeResponseMessage(false); - } - bytesWritten += (initialBufferSize - buffer.remaining()); - if (buffer.remaining() == 0) { - holders.remove(); - } - } - } - - - private void transferToResponseMsg(ByteBuffer buffer) { - - int thisTime = buffer.remaining(); - if (thisTime > outputMaxChunkSize) { - thisTime = outputMaxChunkSize; - } - - responseMessage.reset(); - responseMessage.appendByte(Constants.JK_AJP13_SEND_BODY_CHUNK); - buffer.get(responseMessage.getBuffer(), responseMessage.pos, thisTime); - responseMessage.end(); - } - - - private void writeResponseMessage(boolean block) throws IOException { - int len = responseMessage.getLen(); - int written = 1; - if (responseMsgPos == -1) { - // New message. Advance the write position to the beginning - responseMsgPos = 0; - } - - while (written > 0 && responseMsgPos < len) { - written = output( - responseMessage.getBuffer(), responseMsgPos, len - responseMsgPos, block); - responseMsgPos += written; - } - - // Message fully written, reset the position for a new message. - if (responseMsgPos == len) { - responseMsgPos = -1; - } + return responseMsgPos != -1 || socketWrapper.hasDataToWrite(); } Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Thu Jan 8 13:09:54 2015 @@ -20,7 +20,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; -import java.util.Iterator; import java.util.concurrent.LinkedBlockingDeque; import org.apache.coyote.ActionCode; @@ -653,19 +652,7 @@ public abstract class AbstractOutputBuff public boolean hasDataToWrite() { - return hasMoreDataToFlush() || hasBufferedData(); - } - - - protected boolean hasBufferedData() { - boolean result = false; - if (bufferedWrites!=null) { - Iterator<ByteBufferHolder> iter = bufferedWrites.iterator(); - while (!result && iter.hasNext()) { - result = iter.next().hasData(); - } - } - return result; + return socketWrapper.hasDataToWrite(); } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java Thu Jan 8 13:09:54 2015 @@ -417,7 +417,6 @@ public class InternalNio2OutputBuffer ex } } - @Override protected boolean hasBufferedData() { return bufferedWrites.size() > 0; } Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu Jan 8 13:09:54 2015 @@ -18,16 +18,11 @@ package org.apache.coyote.http11; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.util.Iterator; import org.apache.coyote.Response; -import org.apache.tomcat.util.buf.ByteBufferHolder; import org.apache.tomcat.util.net.NioChannel; -import org.apache.tomcat.util.net.NioEndpoint; -import org.apache.tomcat.util.net.NioSelectorPool; +import org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper; import org.apache.tomcat.util.net.SocketWrapperBase; /** @@ -47,25 +42,12 @@ public class InternalNioOutputBuffer ext } - /** - * Underlying socket. - */ - private NioChannel socket; - - /** - * Selector pool, for blocking reads and blocking writes - */ - private NioSelectorPool pool; - - // --------------------------------------------------------- Public Methods @Override public void init(SocketWrapperBase<NioChannel> socketWrapper) { super.init(socketWrapper); - socket = socketWrapper.getSocket(); - pool = ((NioEndpoint)socketWrapper.getEndpoint()).getSelectorPool(); - socketWriteBuffer = socket.getBufHandler().getWriteBuffer(); + socketWriteBuffer = socketWrapper.getSocket().getBufHandler().getWriteBuffer(); } @@ -77,7 +59,7 @@ public class InternalNioOutputBuffer ext public void recycle() { super.recycle(); socketWriteBuffer.clear(); - socket = null; + socketWrapper = null; } @@ -89,103 +71,19 @@ public class InternalNioOutputBuffer ext @Override public void sendAck() throws IOException { if (!committed) { - socketWriteBuffer.put(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length); - int result = writeToSocket(socketWriteBuffer, true, true); - if (result < 0) { + addToBB(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length); + if (flushBuffer(true)) { throw new IOException(sm.getString("iob.failedwrite.ack")); } } } - /** - * - * @param bytebuffer ByteBuffer - * @param flip boolean - * @return int - * @throws IOException - */ - private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException { - if ( flip ) { - bytebuffer.flip(); - writeBufferFlipped = true; - } - - int written = 0; - NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper)socket.getAttachment(); - if ( att == null ) throw new IOException("Key must be cancelled"); - long writeTimeout = att.getWriteTimeout(); - Selector selector = null; - try { - selector = pool.get(); - } catch ( IOException x ) { - //ignore - } - try { - written = pool.write(bytebuffer, socket, selector, writeTimeout, block); - //make sure we are flushed - do { - if (socket.flush(true,selector,writeTimeout)) break; - }while ( true ); - } finally { - if ( selector != null ) pool.put(selector); - } - if ( block || bytebuffer.remaining()==0) { - //blocking writes must empty the buffer - //and if remaining==0 then we did empty it - bytebuffer.clear(); - writeBufferFlipped = false; - } - // If there is data left in the buffer the socket will be registered for - // write further up the stack. This is to ensure the socket is only - // registered for write once as both container and user code can trigger - // write registration. - return written; - } - // ------------------------------------------------------ Protected Methods @Override - protected synchronized void addToBB(byte[] buf, int offset, int length) - throws IOException { - - if (length == 0) return; - - // Try to flush any data in the socket's write buffer first - boolean dataLeft = flushBuffer(isBlocking()); - - // Keep writing until all the data is written or a non-blocking write - // leaves data in the buffer - while (!dataLeft && length > 0) { - int thisTime = transfer(buf,offset,length,socketWriteBuffer); - length = length - thisTime; - offset = offset + thisTime; - int written = writeToSocket(socketWriteBuffer, isBlocking(), true); - if (written == 0) { - dataLeft = true; - } else { - dataLeft = flushBuffer(isBlocking()); - } - } - - // Prevent timeouts for just doing client writes - socketWrapper.access(); - - if (!isBlocking() && length > 0) { - // Remaining data must be buffered - addToBuffers(buf, offset, length); - } - } - - - private void addToBuffers(byte[] buf, int offset, int length) { - ByteBufferHolder holder = bufferedWrites.peekLast(); - if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) { - ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); - holder = new ByteBufferHolder(buffer,false); - bufferedWrites.add(holder); - } - holder.getBuf().put(buf,offset,length); + protected synchronized void addToBB(byte[] buf, int offset, int length) throws IOException { + socketWrapper.write(isBlocking(), buf, offset, length); } @@ -194,49 +92,12 @@ public class InternalNioOutputBuffer ext */ @Override protected boolean flushBuffer(boolean block) throws IOException { - - //prevent timeout for async, - SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); - if (key != null) { - NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment(); - attach.access(); - } - - boolean dataLeft = hasMoreDataToFlush(); - - //write to the socket, if there is anything to write - if (dataLeft) { - writeToSocket(socketWriteBuffer, block, !writeBufferFlipped); - } - - dataLeft = hasMoreDataToFlush(); - - if (!dataLeft && bufferedWrites.size() > 0) { - Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); - while (!hasMoreDataToFlush() && bufIter.hasNext()) { - ByteBufferHolder buffer = bufIter.next(); - buffer.flip(); - while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { - transfer(buffer.getBuf(), socketWriteBuffer); - if (buffer.getBuf().remaining() == 0) { - bufIter.remove(); - } - writeToSocket(socketWriteBuffer, block, true); - //here we must break if we didn't finish the write - } - } - } - - return hasMoreDataToFlush(); + return socketWrapper.flush(block); } @Override protected void registerWriteInterest() throws IOException { - NioEndpoint.NioSocketWrapper att = (NioEndpoint.NioSocketWrapper)socket.getAttachment(); - if (att == null) { - throw new IOException("Key must be cancelled"); - } - att.getPoller().add(socket, SelectionKey.OP_WRITE); + ((NioSocketWrapper) socketWrapper).getPoller().add(socketWrapper.getSocket(), SelectionKey.OP_WRITE); } } Modified: tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/upgrade/UpgradeServletOutputStream.java Thu Jan 8 13:09:54 2015 @@ -55,18 +55,10 @@ public class UpgradeServletOutputStream private volatile ClassLoader applicationLoader = null; - // Writes guarded by writeLock - private volatile byte[] buffer; - private volatile int bufferPos; - private volatile int bufferLimit; - private final int asyncWriteBufferSize; - public UpgradeServletOutputStream(SocketWrapperBase<?> socketWrapper, int asyncWriteBufferSize) { this.socketWrapper = socketWrapper; - this.asyncWriteBufferSize = asyncWriteBufferSize; - buffer = new byte[asyncWriteBufferSize]; } @@ -80,7 +72,7 @@ public class UpgradeServletOutputStream // Make sure isReady() and onWritePossible() have a consistent view of // buffer and fireListener when determining if the listener should fire synchronized (fireListenerLock) { - boolean result = (bufferLimit == 0); + boolean result = !socketWrapper.hasDataToWrite(); fireListener = !result; return result; } @@ -139,7 +131,7 @@ public class UpgradeServletOutputStream private void preWriteChecks() { - if (bufferLimit != 0) { + if (socketWrapper.hasDataToWrite()) { throw new IllegalStateException(sm.getString("upgrade.sis.write.ise")); } } @@ -153,34 +145,7 @@ public class UpgradeServletOutputStream // Simple case - blocking IO socketWrapper.write(true, b, off, len); } else { - // Non-blocking IO - // If the non-blocking read does not complete, doWrite() will add - // the socket back into the poller. The poller may trigger a new - // write event before this method has finished updating buffer. The - // writeLock sync makes sure that buffer is updated before the next - // write executes. - int written = socketWrapper.write(false, b, off, len); - if (written < len) { - if (b == buffer) { - // This is a partial write of the existing buffer. Just - // increment the current position - bufferPos += written; - } else { - // This is a new partial write - int bytesLeft = len - written; - if (bytesLeft > buffer.length) { - buffer = new byte[bytesLeft]; - } else if (bytesLeft < asyncWriteBufferSize && - buffer.length > asyncWriteBufferSize) { - buffer = new byte[asyncWriteBufferSize]; - } - bufferPos = 0; - bufferLimit = bytesLeft; - System.arraycopy(b, off + written, buffer, bufferPos, bufferLimit); - } - } else { - bufferLimit = 0; - } + socketWrapper.write(false, b, off, len); } } @@ -188,9 +153,7 @@ public class UpgradeServletOutputStream protected final void onWritePossible() throws IOException { try { synchronized (writeLock) { - if (bufferLimit > 0) { - writeInternal(buffer, bufferPos, bufferLimit - bufferPos); - } + socketWrapper.flush(false); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); @@ -207,7 +170,7 @@ public class UpgradeServletOutputStream // should fire boolean fire = false; synchronized (fireListenerLock) { - if (bufferLimit == 0 && fireListener) { + if (!socketWrapper.hasDataToWrite() && fireListener) { fireListener = false; fire = true; } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan 8 13:09:54 2015 @@ -2505,7 +2505,7 @@ public class AprEndpoint extends Abstrac @Override - public int write(boolean block, byte[] b, int off, int len) throws IOException { + public void write(boolean block, byte[] b, int off, int len) throws IOException { if (closed) { throw new IOException(sm.getString("apr.closed", getSocket())); @@ -2517,7 +2517,8 @@ public class AprEndpoint extends Abstrac readLock.lock(); try { if (getBlockingStatus() == block) { - return doWriteInternal(b, off, len); + doWriteInternal(b, off, len); + return; } } finally { readLock.unlock(); @@ -2537,7 +2538,8 @@ public class AprEndpoint extends Abstrac readLock.lock(); try { writeLock.unlock(); - return doWriteInternal(b, off, len); + doWriteInternal(b, off, len); + return; } finally { readLock.unlock(); } @@ -2610,5 +2612,12 @@ public class AprEndpoint extends Abstrac ((AprEndpoint) getEndpoint()).getPoller().add( getSocket().longValue(), -1, read, write); } + + + @Override + public boolean flush(boolean block) throws IOException { + // TODO Auto-generated method stub + return false; + } } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 8 13:09:54 2015 @@ -992,9 +992,8 @@ public class Nio2Endpoint extends Abstra @Override - public int write(boolean block, byte[] b, int off, int len) throws IOException { + public void write(boolean block, byte[] b, int off, int len) throws IOException { int leftToWrite = len; - int count = 0; int offset = off; while (leftToWrite > 0) { @@ -1011,11 +1010,10 @@ public class Nio2Endpoint extends Abstra if (writtenThisLoop < 0) { throw new EOFException(); } - count += writtenThisLoop; if (!block && writePending.availablePermits() == 0) { // Prevent concurrent writes in non blocking mode, // leftover data has to be buffered - return count; + return; } offset += writtenThisLoop; leftToWrite -= writtenThisLoop; @@ -1024,8 +1022,6 @@ public class Nio2Endpoint extends Abstra break; } } - - return count; } @@ -1070,6 +1066,12 @@ public class Nio2Endpoint extends Abstra public void regsiterForEvent(boolean read, boolean write) { // NO-OP. Appropriate handlers will already have been registered. } + + @Override + public boolean flush(boolean block) throws IOException { + // TODO Auto-generated method stub + return false; + } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan 8 13:09:54 2015 @@ -51,6 +51,7 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.IntrospectionUtils; +import org.apache.tomcat.util.buf.ByteBufferHolder; import org.apache.tomcat.util.collections.SynchronizedQueue; import org.apache.tomcat.util.collections.SynchronizedStack; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; @@ -1297,7 +1298,6 @@ public class NioEndpoint extends Abstrac // ---------------------------------------------------- Key Attachment Class public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> { - private final int maxWrite; private final NioSelectorPool pool; private Poller poller = null; @@ -1310,7 +1310,6 @@ public class NioEndpoint extends Abstrac public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) { super(channel, endpoint); - maxWrite = channel.getBufHandler().getWriteBuffer().capacity(); pool = endpoint.getSelectorPool(); } @@ -1341,6 +1340,8 @@ public class NioEndpoint extends Abstrac } writeLatch = null; setWriteTimeout(soTimeout); + + socketWriteBuffer = channel.getBufHandler().getWriteBuffer(); } public void reset() { @@ -1509,72 +1510,128 @@ public class NioEndpoint extends Abstrac @Override - public int write(boolean block, byte[] b, int off, int len) throws IOException { - int leftToWrite = len; - int count = 0; - int offset = off; + public void write(boolean block, byte[] b, int off, int len) throws IOException { + // Always flush any data remaining in the buffers + boolean dataLeft = flush(block); - while (leftToWrite > 0) { - int writeThisLoop; - int writtenThisLoop; + if (len == 0 || b == null) { + return; + } - if (leftToWrite > maxWrite) { - writeThisLoop = maxWrite; + ByteBuffer socketWriteBuffer = getSocket().getBufHandler().getWriteBuffer(); + + // Keep writing until all the data is written or a non-blocking write + // leaves data in the buffer + while (!dataLeft && len > 0) { + int thisTime = transfer(b, off, len, socketWriteBuffer); + len = len - thisTime; + off = off + thisTime; + int written = writeToSocket(socketWriteBuffer, block, true); + if (written == 0) { + dataLeft = true; } else { - writeThisLoop = leftToWrite; + dataLeft = flush(block); } + } - writtenThisLoop = writeInternal(block, b, offset, writeThisLoop); - count += writtenThisLoop; - offset += writtenThisLoop; - leftToWrite -= writtenThisLoop; + // Prevent timeouts for just doing client writes + access(); - if (writtenThisLoop < writeThisLoop) { - break; + if (!block && len > 0) { + // Remaining data must be buffered + addToBuffers(b, off, len); + } + } + + + @Override + public boolean flush(boolean block) throws IOException { + + //prevent timeout for async, + SelectionKey key = getSocket().getIOChannel().keyFor(getSocket().getPoller().getSelector()); + if (key != null) { + NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment(); + attach.access(); + } + + boolean dataLeft = hasMoreDataToFlush(); + + //write to the socket, if there is anything to write + if (dataLeft) { + writeToSocket(socketWriteBuffer, block, !writeBufferFlipped); + } + + dataLeft = hasMoreDataToFlush(); + + if (!dataLeft && bufferedWrites.size() > 0) { + Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); + while (!hasMoreDataToFlush() && bufIter.hasNext()) { + ByteBufferHolder buffer = bufIter.next(); + buffer.flip(); + while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { + transfer(buffer.getBuf(), socketWriteBuffer); + if (buffer.getBuf().remaining() == 0) { + bufIter.remove(); + } + writeToSocket(socketWriteBuffer, block, true); + //here we must break if we didn't finish the write + } } } - return count; + return hasMoreDataToFlush(); } - private int writeInternal (boolean block, byte[] b, int off, int len) - throws IOException { - - NioEndpoint.NioSocketWrapper att = - (NioEndpoint.NioSocketWrapper) getSocket().getAttachment(); - if (att == null) { - throw new IOException("Key must be cancelled"); + private void addToBuffers(byte[] buf, int offset, int length) { + ByteBufferHolder holder = bufferedWrites.peekLast(); + if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) { + ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); + holder = new ByteBufferHolder(buffer,false); + bufferedWrites.add(holder); } + holder.getBuf().put(buf,offset,length); + } - ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer(); - writeBuffer.clear(); - writeBuffer.put(b, off, len); - writeBuffer.flip(); + + private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException { + if (flip) { + bytebuffer.flip(); + writeBufferFlipped = true; + } int written = 0; - long writeTimeout = att.getWriteTimeout(); + long writeTimeout = getWriteTimeout(); Selector selector = null; try { selector = pool.get(); - } catch ( IOException x ) { - //ignore + } catch (IOException x) { + // Ignore } try { - written = pool.write(writeBuffer, getSocket(), selector, - writeTimeout, block); + written = pool.write(bytebuffer, getSocket(), selector, writeTimeout, block); + // Make sure we are flushed + do { + if (getSocket().flush(true, selector, writeTimeout)) break; + } while (true); } finally { if (selector != null) { pool.put(selector); } } - if (written < len) { - getSocket().getPoller().add(getSocket(), SelectionKey.OP_WRITE); - } + if (block || bytebuffer.remaining() == 0) { + // Blocking writes must empty the buffer + // and if remaining==0 then we did empty it + bytebuffer.clear(); + writeBufferFlipped = false; + } + // If there is data left in the buffer the socket will be registered for + // write further up the stack. This is to ensure the socket is only + // registered for write once as both container and user code can trigger + // write registration. return written; } - @Override public void regsiterForEvent(boolean read, boolean write) { SelectionKey key = getSocket().getIOChannel().keyFor( Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650269&r1=1650268&r2=1650269&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 8 13:09:54 2015 @@ -21,10 +21,13 @@ import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.tomcat.util.buf.ByteBufferHolder; + public abstract class SocketWrapperBase<E> { private volatile E socket; @@ -67,6 +70,23 @@ public abstract class SocketWrapperBase< */ private final Object writeThreadLock = new Object(); + protected ByteBuffer socketWriteBuffer; + protected volatile boolean writeBufferFlipped; + + /** + * For "non-blocking" writes use an external set of buffers. Although the + * API only allows one non-blocking write at a time, due to buffering and + * the possible need to write HTTP headers, there may be more than one write + * to the OutputBuffer. + */ + protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites = + new LinkedBlockingDeque<>(); + + /** + * The max size of the buffered write buffer + */ + protected int bufferedWriteSize = 64*1024; //64k default write buffer + private Set<DispatchType> dispatches = new CopyOnWriteArraySet<>(); public SocketWrapperBase(E socket, AbstractEndpoint<E> endpoint) { @@ -157,6 +177,28 @@ public abstract class SocketWrapperBase< return blockingStatusWriteLock; } public Object getWriteThreadLock() { return writeThreadLock; } + + protected boolean hasMoreDataToFlush() { + return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) || + (!writeBufferFlipped && socketWriteBuffer.position() > 0); + } + + protected boolean hasBufferedData() { + boolean result = false; + if (bufferedWrites!=null) { + Iterator<ByteBufferHolder> iter = bufferedWrites.iterator(); + while (!result && iter.hasNext()) { + result = iter.next().hasData(); + } + } + return result; + } + + public boolean hasDataToWrite() { + return hasMoreDataToFlush() || hasBufferedData(); + } + + public void addDispatch(DispatchType dispatchType) { synchronized (dispatches) { dispatches.add(dispatchType); @@ -233,7 +275,52 @@ public abstract class SocketWrapperBase< public abstract void unRead(ByteBuffer input); public abstract void close() throws IOException; - public abstract int write(boolean block, byte[] b, int off, int len) throws IOException; + /** + * Writes the provided data to the socket, buffering any remaining data if + * used in non-blocking mode. If any data remains in the buffers from a + * previous write then that data will be written before this data. It is + * therefore unnecessary to call flush() before calling this method. + * + * @param block <code>true<code> if a blocking write should be used, + * otherwise a non-blocking write will be used + * @param b The byte array containing the data to be written + * @param off The offset within the byte array of the data to be written + * @param len The length of the data to be written + * + * @throws IOException If an IO error occurs during the write + */ + public abstract void write(boolean block, byte[] b, int off, int len) throws IOException; + + /** + * Writes as much data as possible from any that remains in the buffers. + * + * @param block <code>true<code> if a blocking write should be used, + * otherwise a non-blocking write will be used + * + * @return <code>true</code> if data remains to be flushed after this method + * completes, otherwise <code>false</code>. In blocking mode + * therefore, the return value should always be <code>false</code> + * + * @throws IOException If an IO error occurs during the write + */ + public abstract boolean flush(boolean block) throws IOException; public abstract void regsiterForEvent(boolean read, boolean write); + + + // --------------------------------------------------------- Utility methods + + protected static int transfer(byte[] from, int offset, int length, ByteBuffer to) { + int max = Math.min(length, to.remaining()); + to.put(from, offset, max); + return max; + } + + protected static void transfer(ByteBuffer from, ByteBuffer to) { + int max = Math.min(from.remaining(), to.remaining()); + int fromLimit = from.limit(); + from.limit(from.position() + max); + to.put(from); + from.limit(fromLimit); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org