Author: markt Date: Thu Jan 8 13:11:24 2015 New Revision: 1650285 URL: http://svn.apache.org/r1650285 Log: Rework notify to handle nested inline completion handlers correctly.
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650285&r1=1650284&r2=1650285&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original) +++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan 8 13:11:24 2015 @@ -714,6 +714,13 @@ public class Nio2Endpoint extends Abstra public static class Nio2SocketWrapper extends SocketWrapperBase<Nio2Channel> { + private static final ThreadLocal<Boolean> writeCompletionInProgress = new ThreadLocal<Boolean>() { + @Override + protected Boolean initialValue() { + return Boolean.FALSE; + } + }; + private SendfileData sendfileData = null; private boolean upgradeInit = false; @@ -726,6 +733,7 @@ public class Nio2Endpoint extends Abstra private final CompletionHandler<Long, ByteBuffer[]> gatheringWriteCompletionHandler; private final Semaphore writePending = new Semaphore(1); private volatile boolean writeInterest = true; + private boolean writeNotify = false; public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) { @@ -774,11 +782,12 @@ public class Nio2Endpoint extends Abstra this.writeCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer nBytes, ByteBuffer attachment) { - boolean notify = false; + writeNotify = false; synchronized (writeCompletionHandler) { if (nBytes.intValue() < 0) { failed(new EOFException(sm.getString("iob.failedwrite")), attachment); } else if (Nio2SocketWrapper.this.bufferedWrites.size() > 0) { + writeCompletionInProgress.set(Boolean.TRUE); // Continue writing data using a gathering write ArrayList<ByteBuffer> arrayList = new ArrayList<>(); if (attachment.hasRemaining()) { @@ -793,22 +802,25 @@ public class Nio2Endpoint extends Abstra Nio2SocketWrapper.this.getSocket().write(array, 0, array.length, Nio2SocketWrapper.this.getTimeout(), TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); + writeCompletionInProgress.set(Boolean.FALSE); } else if (attachment.hasRemaining()) { // Regular write + writeCompletionInProgress.set(Boolean.TRUE); Nio2SocketWrapper.this.getSocket().write(attachment, Nio2SocketWrapper.this.getTimeout(), TimeUnit.MILLISECONDS, attachment, writeCompletionHandler); + writeCompletionInProgress.set(Boolean.FALSE); } else { // All data has been written - if (writeInterest && !Nio2Endpoint.isInline()) { + if (writeInterest) { writeInterest = false; - notify = true; + writeNotify = true; } writePending.release(); socketWriteBuffer.clear(); writeBufferFlipped = false; } } - if (notify) { + if (writeNotify && !writeCompletionInProgress.get().booleanValue()) { endpoint.processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, false); } } @@ -830,12 +842,13 @@ public class Nio2Endpoint extends Abstra gatheringWriteCompletionHandler = new CompletionHandler<Long, ByteBuffer[]>() { @Override public void completed(Long nBytes, ByteBuffer[] attachment) { - boolean notify = false; + writeNotify = false; synchronized (writeCompletionHandler) { if (nBytes.longValue() < 0) { failed(new EOFException(sm.getString("iob.failedwrite")), attachment); } else if (Nio2SocketWrapper.this.bufferedWrites.size() > 0 || arrayHasData(attachment)) { // Continue writing data + writeCompletionInProgress.set(Boolean.TRUE); ArrayList<ByteBuffer> arrayList = new ArrayList<>(); for (ByteBuffer buffer : attachment) { if (buffer.hasRemaining()) { @@ -851,18 +864,19 @@ public class Nio2Endpoint extends Abstra Nio2SocketWrapper.this.getSocket().write(array, 0, array.length, Nio2SocketWrapper.this.getTimeout(), TimeUnit.MILLISECONDS, array, gatheringWriteCompletionHandler); + writeCompletionInProgress.set(Boolean.FALSE); } else { // All data has been written - if (writeInterest && !Nio2Endpoint.isInline()) { + if (writeInterest) { writeInterest = false; - notify = true; + writeNotify = true; } writePending.release(); socketWriteBuffer.clear(); writeBufferFlipped = false; } } - if (notify) { + if (writeNotify && !writeCompletionInProgress.get().booleanValue()) { endpoint.processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, false); } } @@ -1115,8 +1129,8 @@ public class Nio2Endpoint extends Abstra // Could be "smart" with coordination with the main CoyoteOutputStream to // indicate the end of a write // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS)) - if (writePending.tryAcquire()) { - synchronized (writeCompletionHandler) { + synchronized (writeCompletionHandler) { + if (writePending.tryAcquire()) { // No pending completion handler, so writing to the main buffer // is possible int thisTime = transfer(buf, off, len, socketWriteBuffer); @@ -1127,9 +1141,7 @@ public class Nio2Endpoint extends Abstra addToBuffers(buf, off, len); } flush(false, true); - } - } else { - synchronized (writeCompletionHandler) { + } else { addToBuffers(buf, off, len); } } @@ -1212,7 +1224,6 @@ public class Nio2Endpoint extends Abstra socketWriteBuffer.flip(); writeBufferFlipped = true; } - Nio2Endpoint.startInline(); if (bufferedWrites.size() > 0) { // Gathering write of the main buffer plus all leftovers ArrayList<ByteBuffer> arrayList = new ArrayList<>(); @@ -1235,7 +1246,6 @@ public class Nio2Endpoint extends Abstra // Nothing was written writePending.release(); } - Nio2Endpoint.endInline(); if (writePending.availablePermits() > 0) { if (socketWriteBuffer.remaining() == 0) { socketWriteBuffer.clear(); @@ -1475,13 +1485,8 @@ public class Nio2Endpoint extends Abstra data.socket = socket; data.buffer = buffer; data.length -= nRead; - startInline(); - try { - socket.getSocket().write(buffer, socket.getTimeout(), TimeUnit.MILLISECONDS, - data, sendfile); - } finally { - endInline(); - } + socket.getSocket().write(buffer, socket.getTimeout(), TimeUnit.MILLISECONDS, + data, sendfile); if (data.doneInline) { if (data.error) { return SendfileState.ERROR; --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org