Author: markt Date: Thu Jan 8 13:10:05 2015 New Revision: 1650271 URL: http://svn.apache.org/r1650271 Log: First (untested) pass at moving APR writes to SocketWrapper
Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Modified: tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1650271&r1=1650270&r2=1650271&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java (original) +++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu Jan 8 13:10:05 2015 @@ -19,14 +19,9 @@ package org.apache.coyote.http11; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.coyote.Response; import org.apache.tomcat.jni.Socket; -import org.apache.tomcat.jni.Status; -import org.apache.tomcat.util.buf.ByteBufferHolder; import org.apache.tomcat.util.net.AbstractEndpoint; import org.apache.tomcat.util.net.AprEndpoint; import org.apache.tomcat.util.net.SocketWrapperBase; @@ -76,6 +71,7 @@ public class InternalAprOutputBuffer ext this.endpoint = socketWrapper.getEndpoint(); Socket.setsbb(this.socket, socketWriteBuffer); + socketWrapper.socketWriteBuffer = socketWriteBuffer; } @@ -99,166 +95,25 @@ public class InternalAprOutputBuffer ext @Override public void sendAck() throws IOException { if (!committed) { - if (Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0) + addToBB(Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length); + if (flushBuffer(true)) { throw new IOException(sm.getString("iob.failedwrite.ack")); - } - } - - - // ------------------------------------------------------ Protected Methods - - @Override - protected synchronized void addToBB(byte[] buf, int offset, int length) - throws IOException { - - if (length == 0) return; - - // If bbuf is currently being used for writes, add this data to the - // write buffer - if (writeBufferFlipped) { - addToBuffers(buf, offset, length); - return; - } - - // Keep writing until all the data is written or a non-blocking write - // leaves data in the buffer - while (length > 0) { - int thisTime = length; - if (socketWriteBuffer.position() == socketWriteBuffer.capacity()) { - if (flushBuffer(isBlocking())) { - break; - } - } - if (thisTime > socketWriteBuffer.capacity() - socketWriteBuffer.position()) { - thisTime = socketWriteBuffer.capacity() - socketWriteBuffer.position(); } - socketWriteBuffer.put(buf, offset, thisTime); - length = length - thisTime; - offset = offset + thisTime; - } - - if (!isBlocking() && length>0) { - // Buffer the remaining data - addToBuffers(buf, offset, length); } } - private void addToBuffers(byte[] buf, int offset, int length) { - ByteBufferHolder holder = bufferedWrites.peekLast(); - if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) { - ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); - holder = new ByteBufferHolder(buffer,false); - bufferedWrites.add(holder); - } - holder.getBuf().put(buf,offset,length); - } - + // ------------------------------------------------------ Protected Methods @Override - protected synchronized boolean flushBuffer(boolean block) - throws IOException { - - if (hasMoreDataToFlush()) { - writeToSocket(block); - } - - if (bufferedWrites.size() > 0) { - Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); - while (!hasMoreDataToFlush() && bufIter.hasNext()) { - ByteBufferHolder buffer = bufIter.next(); - buffer.flip(); - while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { - transfer(buffer.getBuf(), socketWriteBuffer); - if (buffer.getBuf().remaining() == 0) { - bufIter.remove(); - } - writeToSocket(block); - //here we must break if we didn't finish the write - } - } - } - - return hasMoreDataToFlush(); + protected synchronized void addToBB(byte[] buf, int offset, int length) throws IOException { + socketWrapper.write(isBlocking(), buf, offset, length); } - private synchronized void writeToSocket(boolean block) throws IOException { - - Lock readLock = socketWrapper.getBlockingStatusReadLock(); - WriteLock writeLock = socketWrapper.getBlockingStatusWriteLock(); - - readLock.lock(); - try { - if (socketWrapper.getBlockingStatus() == block) { - writeToSocket(); - return; - } - } finally { - readLock.unlock(); - } - - writeLock.lock(); - try { - // Set the current settings for this socket - socketWrapper.setBlockingStatus(block); - if (block) { - Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000); - } else { - Socket.timeoutSet(socket, 0); - } - - // Downgrade the lock - readLock.lock(); - try { - writeLock.unlock(); - writeToSocket(); - } finally { - readLock.unlock(); - } - } finally { - // Should have been released above but may not have been on some - // exception paths - if (writeLock.isHeldByCurrentThread()) { - writeLock.unlock(); - } - } - } - - private synchronized void writeToSocket() throws IOException { - if (!writeBufferFlipped) { - writeBufferFlipped = true; - socketWriteBuffer.flip(); - } - - int written; - - do { - written = Socket.sendbb(socket, socketWriteBuffer.position(), socketWriteBuffer.remaining()); - if (Status.APR_STATUS_IS_EAGAIN(-written)) { - written = 0; - } else if (written < 0) { - throw new IOException("APR error: " + written); - } - socketWriteBuffer.position(socketWriteBuffer.position() + written); - } while (written > 0 && socketWriteBuffer.hasRemaining()); - - if (socketWriteBuffer.remaining() == 0) { - socketWriteBuffer.clear(); - writeBufferFlipped = false; - } - // If there is data left in the buffer the socket will be registered for - // write further up the stack. This is to ensure the socket is only - // registered for write once as both container and user code can trigger - // write registration. - } - - - //-------------------------------------------------- Non-blocking IO methods - @Override - protected synchronized boolean hasMoreDataToFlush() { - return super.hasMoreDataToFlush(); + protected boolean flushBuffer(boolean block) throws IOException { + return socketWrapper.flush(block); } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan 8 13:10:05 2015 @@ -2505,11 +2505,8 @@ public class AprEndpoint extends Abstrac @Override - public void write(boolean block, byte[] b, int off, int len) throws IOException { - doWrite(block, b, off, len); - } - - private void doWrite(boolean block, byte[] b, int off, int len) throws IOException { + protected int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip) + throws IOException { if (closed) { throw new IOException(sm.getString("apr.closed", getSocket())); } @@ -2520,8 +2517,7 @@ public class AprEndpoint extends Abstrac readLock.lock(); try { if (getBlockingStatus() == block) { - doWriteInternal(b, off, len); - return; + return doWriteInternal(bytebuffer, flip); } } finally { readLock.unlock(); @@ -2541,8 +2537,7 @@ public class AprEndpoint extends Abstrac readLock.lock(); try { writeLock.unlock(); - doWriteInternal(b, off, len); - return; + return doWriteInternal(bytebuffer, flip); } finally { readLock.unlock(); } @@ -2556,57 +2551,66 @@ public class AprEndpoint extends Abstrac } - private int doWriteInternal(byte[] b, int off, int len) throws IOException { + private int doWriteInternal(ByteBuffer bytebuffer, boolean flip) + throws IOException { + if (flip) { + bytebuffer.flip(); + writeBufferFlipped = true; + } - int start = off; - int left = len; - int written; + int written = 0; + int thisTime; do { + thisTime = 0; if (getEndpoint().isSSLEnabled()) { if (sslOutputBuffer.remaining() == 0) { // Buffer was fully written last time around sslOutputBuffer.clear(); - if (left < SSL_OUTPUT_BUFFER_SIZE) { - sslOutputBuffer.put(b, start, left); - } else { - sslOutputBuffer.put(b, start, SSL_OUTPUT_BUFFER_SIZE); - } + transfer(bytebuffer, sslOutputBuffer); sslOutputBuffer.flip(); + thisTime = sslOutputBuffer.remaining(); } else { // Buffer still has data from previous attempt to write // APR + SSL requires that exactly the same parameters are // passed when re-attempting the write } - written = Socket.sendb(getSocket().longValue(), sslOutputBuffer, + int sslWritten = Socket.sendb(getSocket().longValue(), sslOutputBuffer, sslOutputBuffer.position(), sslOutputBuffer.limit()); - if (written > 0) { + if (sslWritten > 0) { sslOutputBuffer.position( - sslOutputBuffer.position() + written); + sslOutputBuffer.position() + sslWritten); } } else { - written = Socket.send(getSocket().longValue(), b, start, left); + thisTime = Socket.sendb(getSocket().longValue(), bytebuffer, + bytebuffer.position(), bytebuffer.limit() - bytebuffer.position()); } - if (Status.APR_STATUS_IS_EAGAIN(-written)) { - written = 0; - } else if (-written == Status.APR_EOF) { - throw new EOFException(sm.getString("apr.clientAbort")); + if (Status.APR_STATUS_IS_EAGAIN(-thisTime)) { + thisTime = 0; + } else if (-thisTime == Status.APR_EOF) { + throw new EOFException(sm.getString("socket.apr.clientAbort")); } else if ((OS.IS_WIN32 || OS.IS_WIN64) && - (-written == Status.APR_OS_START_SYSERR + 10053)) { + (-thisTime == Status.APR_OS_START_SYSERR + 10053)) { // 10053 on Windows is connection aborted - throw new EOFException(sm.getString("apr.clientAbort")); - } else if (written < 0) { - throw new IOException(sm.getString("apr.write.error", - Integer.valueOf(-written), getSocket(), this)); - } - start += written; - left -= written; - } while (written > 0 && left > 0); - - if (left > 0) { - ((AprEndpoint) getEndpoint()).getPoller().add(getSocket().longValue(), -1, false, true); + throw new EOFException(sm.getString("socket.apr.clientAbort")); + } else if (thisTime < 0) { + throw new IOException(sm.getString("socket.apr.write.error", + Integer.valueOf(-thisTime), getSocket(), this)); + } + written += thisTime; + bytebuffer.position(bytebuffer.position() + thisTime); + } while (thisTime > 0 && bytebuffer.hasRemaining()); + + if (bytebuffer.remaining() == 0) { + bytebuffer.clear(); + writeBufferFlipped = false; } - return len - left; + // If there is data left in the buffer the socket will be registered for + // write further up the stack. This is to ensure the socket is only + // registered for write once as both container and user code can trigger + // write registration. + + return written; } @@ -2615,12 +2619,5 @@ public class AprEndpoint extends Abstrac ((AprEndpoint) getEndpoint()).getPoller().add( getSocket().longValue(), -1, read, write); } - - - @Override - public boolean flush(boolean block) throws IOException { - // TODO Auto-generated method stub - return false; - } } } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 8 13:10:05 2015 @@ -1025,6 +1025,13 @@ public class Nio2Endpoint extends Abstra } + @Override + protected int doWrite(ByteBuffer buffer, boolean block, boolean flip) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + private int writeInternal(boolean block, byte[] b, int off, int len) throws IOException { ByteBuffer writeBuffer = getSocket().getBufHandler().getWriteBuffer(); Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1650271&r1=1650270&r2=1650271&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan 8 13:10:05 2015 @@ -51,7 +51,6 @@ import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; import org.apache.tomcat.util.ExceptionUtils; import org.apache.tomcat.util.IntrospectionUtils; -import org.apache.tomcat.util.buf.ByteBufferHolder; import org.apache.tomcat.util.collections.SynchronizedQueue; import org.apache.tomcat.util.collections.SynchronizedStack; import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; @@ -1510,91 +1509,8 @@ public class NioEndpoint extends Abstrac @Override - public void write(boolean block, byte[] b, int off, int len) throws IOException { - // Always flush any data remaining in the buffers - boolean dataLeft = flush(block); - - if (len == 0 || b == null) { - return; - } - - ByteBuffer socketWriteBuffer = getSocket().getBufHandler().getWriteBuffer(); - - // Keep writing until all the data is written or a non-blocking write - // leaves data in the buffer - while (!dataLeft && len > 0) { - int thisTime = transfer(b, off, len, socketWriteBuffer); - len = len - thisTime; - off = off + thisTime; - int written = doWrite(socketWriteBuffer, block, true); - if (written == 0) { - dataLeft = true; - } else { - dataLeft = flush(block); - } - } - - // Prevent timeouts for just doing client writes - access(); - - if (!block && len > 0) { - // Remaining data must be buffered - addToBuffers(b, off, len); - } - } - - - @Override - public boolean flush(boolean block) throws IOException { - - //prevent timeout for async, - SelectionKey key = getSocket().getIOChannel().keyFor(getSocket().getPoller().getSelector()); - if (key != null) { - NioEndpoint.NioSocketWrapper attach = (NioEndpoint.NioSocketWrapper) key.attachment(); - attach.access(); - } - - boolean dataLeft = hasMoreDataToFlush(); - - //write to the socket, if there is anything to write - if (dataLeft) { - doWrite(socketWriteBuffer, block, !writeBufferFlipped); - } - - dataLeft = hasMoreDataToFlush(); - - if (!dataLeft && bufferedWrites.size() > 0) { - Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); - while (!hasMoreDataToFlush() && bufIter.hasNext()) { - ByteBufferHolder buffer = bufIter.next(); - buffer.flip(); - while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { - transfer(buffer.getBuf(), socketWriteBuffer); - if (buffer.getBuf().remaining() == 0) { - bufIter.remove(); - } - doWrite(socketWriteBuffer, block, true); - //here we must break if we didn't finish the write - } - } - } - - return hasMoreDataToFlush(); - } - - - private void addToBuffers(byte[] buf, int offset, int length) { - ByteBufferHolder holder = bufferedWrites.peekLast(); - if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) { - ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); - holder = new ByteBufferHolder(buffer,false); - bufferedWrites.add(holder); - } - holder.getBuf().put(buf,offset,length); - } - - - private synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException { + protected synchronized int doWrite(ByteBuffer bytebuffer, boolean block, boolean flip) + throws IOException { if (flip) { bytebuffer.flip(); writeBufferFlipped = true; @@ -1619,9 +1535,7 @@ public class NioEndpoint extends Abstrac pool.put(selector); } } - if (block || bytebuffer.remaining() == 0) { - // Blocking writes must empty the buffer - // and if remaining==0 then we did empty it + if (bytebuffer.remaining() == 0) { bytebuffer.clear(); writeBufferFlipped = false; } @@ -1629,6 +1543,7 @@ public class NioEndpoint extends Abstrac // write further up the stack. This is to ensure the socket is only // registered for write once as both container and user code can trigger // write registration. + return written; } Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650271&r1=1650270&r2=1650271&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 8 13:10:05 2015 @@ -70,7 +70,8 @@ public abstract class SocketWrapperBase< */ private final Object writeThreadLock = new Object(); - protected ByteBuffer socketWriteBuffer; + // TODO This being public is a temporary hack to simplify refactoring + public volatile ByteBuffer socketWriteBuffer; protected volatile boolean writeBufferFlipped; /** @@ -289,7 +290,38 @@ public abstract class SocketWrapperBase< * * @throws IOException If an IO error occurs during the write */ - public abstract void write(boolean block, byte[] b, int off, int len) throws IOException; + public void write(boolean block, byte[] b, int off, int len) throws IOException { + // Always flush any data remaining in the buffers + boolean dataLeft = flush(block); + + if (len == 0 || b == null) { + return; + } + + // Keep writing until all the data is written or a non-blocking write + // leaves data in the buffer + while (!dataLeft && len > 0) { + int thisTime = transfer(b, off, len, socketWriteBuffer); + len = len - thisTime; + off = off + thisTime; + int written = doWrite(socketWriteBuffer, block, true); + if (written == 0) { + dataLeft = true; + } else { + dataLeft = flush(block); + } + } + + // Prevent timeouts for just doing client writes + access(); + + if (!block && len > 0) { + // Remaining data must be buffered + addToBuffers(b, off, len); + } + } + + /** * Writes as much data as possible from any that remains in the buffers. @@ -303,7 +335,54 @@ public abstract class SocketWrapperBase< * * @throws IOException If an IO error occurs during the write */ - public abstract boolean flush(boolean block) throws IOException; + public boolean flush(boolean block) throws IOException { + + // Prevent timeout for async + access(); + + boolean dataLeft = hasMoreDataToFlush(); + + // Write to the socket, if there is anything to write + if (dataLeft) { + doWrite(socketWriteBuffer, block, !writeBufferFlipped); + } + + dataLeft = hasMoreDataToFlush(); + + if (!dataLeft && bufferedWrites.size() > 0) { + Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); + while (!hasMoreDataToFlush() && bufIter.hasNext()) { + ByteBufferHolder buffer = bufIter.next(); + buffer.flip(); + while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { + transfer(buffer.getBuf(), socketWriteBuffer); + if (buffer.getBuf().remaining() == 0) { + bufIter.remove(); + } + doWrite(socketWriteBuffer, block, true); + //here we must break if we didn't finish the write + } + } + } + + return hasMoreDataToFlush(); + } + + + protected abstract int doWrite(ByteBuffer buffer, boolean block, boolean flip) + throws IOException; + + + protected void addToBuffers(byte[] buf, int offset, int length) { + ByteBufferHolder holder = bufferedWrites.peekLast(); + if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) { + ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); + holder = new ByteBufferHolder(buffer,false); + bufferedWrites.add(holder); + } + holder.getBuf().put(buf,offset,length); + } + public abstract void regsiterForEvent(boolean read, boolean write); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org