Author: markt
Date: Thu Jan  8 13:10:41 2015
New Revision: 1650277

URL: http://svn.apache.org/r1650277
Log:
Partial NIO2 refactoring

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
    tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java

Modified: tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/AbstractOutputBuffer.java Thu 
Jan  8 13:10:41 2015
@@ -591,7 +591,7 @@ public abstract class AbstractOutputBuff
 
     //------------------------------------------------------ Non-blocking 
writes
 
-    protected abstract void registerWriteInterest() throws IOException;
+    protected abstract void registerWriteInterest();
 
 
     /**

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalAprOutputBuffer.java Thu 
Jan  8 13:10:41 2015
@@ -52,7 +52,7 @@ public class InternalAprOutputBuffer ext
 
 
     @Override
-    protected void registerWriteInterest() throws IOException {
+    protected void registerWriteInterest() {
         socketWrapper.registerWriteInterest();
     }
 }

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2InputBuffer.java Thu 
Jan  8 13:10:41 2015
@@ -172,12 +172,12 @@ public class InternalNio2InputBuffer ext
 
             @Override
             public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> 
attachment) {
-                attachment.setError(true);
                 if (exc instanceof IOException) {
                     e = (IOException) exc;
                 } else {
                     e = new IOException(exc);
                 }
+                attachment.setError(e);
                 request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
                 readPending = false;
                 endpoint.processSocket(attachment, SocketStatus.OPEN_READ, 
true);

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java 
Thu Jan  8 13:10:41 2015
@@ -21,22 +21,16 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import java.nio.channels.CompletionHandler;
 import java.util.ArrayList;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import javax.servlet.RequestDispatcher;
-
 import org.apache.coyote.Response;
 import org.apache.tomcat.util.buf.ByteBufferHolder;
-import org.apache.tomcat.util.net.AbstractEndpoint;
 import org.apache.tomcat.util.net.Nio2Channel;
 import org.apache.tomcat.util.net.Nio2Endpoint;
-import org.apache.tomcat.util.net.SocketStatus;
-import org.apache.tomcat.util.net.SocketWrapperBase;
+import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper;
 
 /**
  * Output buffer implementation for NIO2.
@@ -52,186 +46,9 @@ public class InternalNio2OutputBuffer ex
         super(response, headerBufferSize);
     }
 
-    private static final ByteBuffer[] EMPTY_BUF_ARRAY = new ByteBuffer[0];
-
-    /**
-     * Track write interest
-     */
-    protected volatile boolean interest = false;
-
-    /**
-     * The completion handler used for asynchronous write operations
-     */
-    protected CompletionHandler<Integer, ByteBuffer> completionHandler;
-
-    /**
-     * The completion handler used for asynchronous write operations
-     */
-    protected CompletionHandler<Long, ByteBuffer[]> gatherCompletionHandler;
-
-    /**
-     * Write pending flag.
-     */
-    protected Semaphore writePending = new Semaphore(1);
-
-    /**
-     * The associated endpoint.
-     */
-    protected AbstractEndpoint<Nio2Channel> endpoint = null;
-
-    /**
-     * Exception that occurred during writing.
-     */
-    protected IOException e = null;
-
-    // --------------------------------------------------------- Public Methods
-
-    @Override
-    public void init(SocketWrapperBase<Nio2Channel> socketWrapper) {
-        super.init(socketWrapper);
-        this.endpoint = socketWrapper.getEndpoint();
-
-        this.completionHandler = new CompletionHandler<Integer, ByteBuffer>() {
-            @Override
-            public void completed(Integer nBytes, ByteBuffer attachment) {
-                boolean notify = false;
-                synchronized (completionHandler) {
-                    if (nBytes.intValue() < 0) {
-                        failed(new 
EOFException(sm.getString("iob.failedwrite")), attachment);
-                    } else if (socketWrapper.bufferedWrites.size() > 0) {
-                        // Continue writing data using a gathering write
-                        ArrayList<ByteBuffer> arrayList = new ArrayList<>();
-                        if (attachment.hasRemaining()) {
-                            arrayList.add(attachment);
-                        }
-                        for (ByteBufferHolder buffer : 
socketWrapper.bufferedWrites) {
-                            buffer.flip();
-                            arrayList.add(buffer.getBuf());
-                        }
-                        socketWrapper.bufferedWrites.clear();
-                        ByteBuffer[] array = 
arrayList.toArray(EMPTY_BUF_ARRAY);
-                        socketWrapper.getSocket().write(array, 0, array.length,
-                                socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS,
-                                array, gatherCompletionHandler);
-                    } else if (attachment.hasRemaining()) {
-                        // Regular write
-                        socketWrapper.getSocket().write(attachment, 
socketWrapper.getTimeout(),
-                                TimeUnit.MILLISECONDS, attachment, 
completionHandler);
-                    } else {
-                        // All data has been written
-                        if (interest && !Nio2Endpoint.isInline()) {
-                            interest = false;
-                            notify = true;
-                        }
-                        writePending.release();
-                    }
-                }
-                if (notify) {
-                    endpoint.processSocket(socketWrapper, 
SocketStatus.OPEN_WRITE, false);
-                }
-            }
-
-            @Override
-            public void failed(Throwable exc, ByteBuffer attachment) {
-                socketWrapper.setError(true);
-                if (exc instanceof IOException) {
-                    e = (IOException) exc;
-                } else {
-                    e = new IOException(exc);
-                }
-                
response.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
-                writePending.release();
-                endpoint.processSocket(socketWrapper, SocketStatus.OPEN_WRITE, 
true);
-            }
-        };
-        this.gatherCompletionHandler = new CompletionHandler<Long, 
ByteBuffer[]>() {
-            @Override
-            public void completed(Long nBytes, ByteBuffer[] attachment) {
-                boolean notify = false;
-                synchronized (completionHandler) {
-                    if (nBytes.longValue() < 0) {
-                        failed(new 
EOFException(sm.getString("iob.failedwrite")), attachment);
-                    } else if (socketWrapper.bufferedWrites.size() > 0 || 
arrayHasData(attachment)) {
-                        // Continue writing data
-                        ArrayList<ByteBuffer> arrayList = new ArrayList<>();
-                        for (ByteBuffer buffer : attachment) {
-                            if (buffer.hasRemaining()) {
-                                arrayList.add(buffer);
-                            }
-                        }
-                        for (ByteBufferHolder buffer : 
socketWrapper.bufferedWrites) {
-                            buffer.flip();
-                            arrayList.add(buffer.getBuf());
-                        }
-                        socketWrapper.bufferedWrites.clear();
-                        ByteBuffer[] array = 
arrayList.toArray(EMPTY_BUF_ARRAY);
-                        socketWrapper.getSocket().write(array, 0, array.length,
-                                socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS,
-                                array, gatherCompletionHandler);
-                    } else {
-                        // All data has been written
-                        if (interest && !Nio2Endpoint.isInline()) {
-                            interest = false;
-                            notify = true;
-                        }
-                        writePending.release();
-                    }
-                }
-                if (notify) {
-                    endpoint.processSocket(socketWrapper, 
SocketStatus.OPEN_WRITE, false);
-                }
-            }
-
-            @Override
-            public void failed(Throwable exc, ByteBuffer[] attachment) {
-                socketWrapper.setError(true);
-                if (exc instanceof IOException) {
-                    e = (IOException) exc;
-                } else {
-                    e = new IOException(exc);
-                }
-                
response.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION, e);
-                writePending.release();
-                endpoint.processSocket(socketWrapper, SocketStatus.OPEN_WRITE, 
true);
-           }
-        };
-    }
-
-
-    /**
-     * Recycle the output buffer. This should be called when closing the
-     * connection.
-     */
-    @Override
-    public void recycle() {
-        super.recycle();
-        e = null;
-        interest = false;
-        if (writePending.availablePermits() != 1) {
-            writePending.drainPermits();
-            writePending.release();
-        }
-    }
-
-
-    @Override
-    public void nextRequest() {
-        super.nextRequest();
-        interest = false;
-    }
-
 
     // ------------------------------------------------------ Protected Methods
 
-    private static boolean arrayHasData(ByteBuffer[] byteBuffers) {
-        for (ByteBuffer byteBuffer : byteBuffers) {
-            if (byteBuffer.hasRemaining()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
     @Override
     protected void addToBB(byte[] buf, int offset, int length)
             throws IOException {
@@ -259,8 +76,8 @@ public class InternalNio2OutputBuffer ex
             // Could be "smart" with coordination with the main 
CoyoteOutputStream to
             // indicate the end of a write
             // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS))
-            if (writePending.tryAcquire()) {
-                synchronized (completionHandler) {
+            if (((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire()) {
+                synchronized 
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
                     // No pending completion handler, so writing to the main 
buffer
                     // is possible
                     int thisTime = transfer(buf, offset, length, 
socketWrapper.socketWriteBuffer);
@@ -273,7 +90,7 @@ public class InternalNio2OutputBuffer ex
                     flushBufferInternal(false, true);
                 }
             } else {
-                synchronized (completionHandler) {
+                synchronized 
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
                     addToBuffers(buf, offset, length);
                 }
             }
@@ -293,8 +110,8 @@ public class InternalNio2OutputBuffer ex
      */
     @Override
     protected boolean flushBuffer(boolean block) throws IOException {
-        if (e != null) {
-            throw e;
+        if (socketWrapper.getError() != null) {
+            throw socketWrapper.getError();
         }
         return flushBufferInternal(block, false);
     }
@@ -308,8 +125,8 @@ public class InternalNio2OutputBuffer ex
                 // The final flush is blocking, but the processing was using
                 // non blocking so wait until an async write is done
                 try {
-                    if (writePending.tryAcquire(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS)) {
-                        writePending.release();
+                    if 
(((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire(socketWrapper.getTimeout(),
 TimeUnit.MILLISECONDS)) {
+                        
((Nio2SocketWrapper)socketWrapper).writePending.release();
                     }
                 } catch (InterruptedException e) {
                     // Ignore timeout
@@ -352,8 +169,8 @@ public class InternalNio2OutputBuffer ex
             socketWrapper.writeBufferFlipped = false;
             return false;
         } else {
-            synchronized (completionHandler) {
-                if (hasPermit || writePending.tryAcquire()) {
+            synchronized 
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
+                if (hasPermit || 
((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire()) {
                     if (!socketWrapper.writeBufferFlipped) {
                         socketWrapper.socketWriteBuffer.flip();
                         socketWrapper.writeBufferFlipped = true;
@@ -370,26 +187,26 @@ public class InternalNio2OutputBuffer ex
                             arrayList.add(buffer.getBuf());
                         }
                         socketWrapper.bufferedWrites.clear();
-                        ByteBuffer[] array = 
arrayList.toArray(EMPTY_BUF_ARRAY);
+                        ByteBuffer[] array = arrayList.toArray(new 
ByteBuffer[arrayList.size()]);
                         socketWrapper.getSocket().write(array, 0, 
array.length, socketWrapper.getTimeout(),
-                                TimeUnit.MILLISECONDS, array, 
gatherCompletionHandler);
+                                TimeUnit.MILLISECONDS, array, 
((Nio2SocketWrapper)socketWrapper).gatheringWriteCompletionHandler);
                     } else if (socketWrapper.socketWriteBuffer.hasRemaining()) 
{
                         // Regular write
                         
socketWrapper.getSocket().write(socketWrapper.socketWriteBuffer, 
socketWrapper.getTimeout(),
-                                TimeUnit.MILLISECONDS, 
socketWrapper.socketWriteBuffer, completionHandler);
+                                TimeUnit.MILLISECONDS, 
socketWrapper.socketWriteBuffer, 
((Nio2SocketWrapper)socketWrapper).writeCompletionHandler);
                     } else {
                         // Nothing was written
-                        writePending.release();
+                        
((Nio2SocketWrapper)socketWrapper).writePending.release();
                     }
                     Nio2Endpoint.endInline();
-                    if (writePending.availablePermits() > 0) {
+                    if 
(((Nio2SocketWrapper)socketWrapper).writePending.availablePermits() > 0) {
                         if (socketWrapper.socketWriteBuffer.remaining() == 0) {
                             socketWrapper.socketWriteBuffer.clear();
                             socketWrapper.writeBufferFlipped = false;
                         }
                     }
                 }
-                return socketWrapper.hasMoreDataToFlush() || hasBufferedData() 
|| e != null;
+                return socketWrapper.hasMoreDataToFlush() || hasBufferedData() 
|| socketWrapper.getError() != null;
             }
         }
     }
@@ -397,8 +214,8 @@ public class InternalNio2OutputBuffer ex
 
     @Override
     public boolean hasDataToWrite() {
-        synchronized (completionHandler) {
-            return socketWrapper.hasMoreDataToFlush() || hasBufferedData() || 
e != null;
+        synchronized 
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
+            return socketWrapper.hasMoreDataToFlush() || hasBufferedData() || 
socketWrapper.getError() != null;
         }
     }
 
@@ -408,13 +225,6 @@ public class InternalNio2OutputBuffer ex
 
     @Override
     protected void registerWriteInterest() {
-        synchronized (completionHandler) {
-            if (writePending.availablePermits() == 0) {
-                interest = true;
-            } else {
-                // If no write is pending, notify
-                endpoint.processSocket(socketWrapper, SocketStatus.OPEN_WRITE, 
true);
-            }
-        }
+        socketWrapper.registerWriteInterest();
     }
 }

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java Thu 
Jan  8 13:10:41 2015
@@ -53,7 +53,7 @@ public class InternalNioOutputBuffer ext
 
 
     @Override
-    protected void registerWriteInterest() throws IOException {
+    protected void registerWriteInterest() {
         socketWrapper.registerWriteInterest();
     }
 }

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/AprEndpoint.java Thu Jan  8 
13:10:41 2015
@@ -2618,7 +2618,7 @@ public class AprEndpoint extends Abstrac
 
 
         @Override
-        public void registerWriteInterest() throws IOException {
+        public void registerWriteInterest() {
             ((AprEndpoint) 
getEndpoint()).getPoller().add(getSocket().longValue(), -1, false, true);
         }
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan  8 
13:10:41 2015
@@ -32,6 +32,7 @@ import java.nio.channels.ClosedChannelEx
 import java.nio.channels.CompletionHandler;
 import java.nio.channels.FileChannel;
 import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -49,6 +50,7 @@ import javax.net.ssl.X509KeyManager;
 import org.apache.juli.logging.Log;
 import org.apache.juli.logging.LogFactory;
 import org.apache.tomcat.util.ExceptionUtils;
+import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.collections.SynchronizedStack;
 import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
 import org.apache.tomcat.util.net.SecureNio2Channel.ApplicationBufferHandler;
@@ -721,8 +723,10 @@ public class Nio2Endpoint extends Abstra
         private volatile boolean interest = true;
 
         private final int maxWrite;
-        private final CompletionHandler<Integer, ByteBuffer> 
writeCompletionHandler;
-        private final Semaphore writePending = new Semaphore(1);
+        // TODO These are public for now to aid refactoring
+        public final CompletionHandler<Integer, ByteBuffer> 
writeCompletionHandler;
+        public final CompletionHandler<Long, ByteBuffer[]> 
gatheringWriteCompletionHandler;
+        public final Semaphore writePending = new Semaphore(1);
 
 
         public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) {
@@ -753,7 +757,13 @@ public class Nio2Endpoint extends Abstra
                 }
                 @Override
                 public void failed(Throwable exc, 
SocketWrapperBase<Nio2Channel> attachment) {
-                    attachment.setError(true);
+                    IOException ioe;
+                    if (exc instanceof IOException) {
+                        ioe = (IOException) exc;
+                    } else {
+                        ioe = new IOException(exc);
+                    }
+                    Nio2SocketWrapper.this.setError(ioe);
                     readPending = false;
                     if (exc instanceof AsynchronousCloseException) {
                         // If already closed, don't call onError and close 
again
@@ -766,32 +776,120 @@ public class Nio2Endpoint extends Abstra
             this.writeCompletionHandler = new CompletionHandler<Integer, 
ByteBuffer>() {
                 @Override
                 public void completed(Integer nBytes, ByteBuffer attachment) {
-                    if (nBytes.intValue() < 0) {
-                        failed(new EOFException(), attachment);
-                    } else if (attachment.hasRemaining()) {
-                        getSocket().write(attachment, getTimeout(),
-                                TimeUnit.MILLISECONDS, attachment, 
writeCompletionHandler);
-                    } else {
-                        writePending.release();
-                        if (!Nio2Endpoint.isInline()) {
-                            
getEndpoint().processSocket(Nio2SocketWrapper.this, SocketStatus.OPEN_WRITE, 
false);
+                    boolean notify = false;
+                    synchronized (writeCompletionHandler) {
+                        if (nBytes.intValue() < 0) {
+                            failed(new 
EOFException(sm.getString("iob.failedwrite")), attachment);
+                        } else if 
(Nio2SocketWrapper.this.bufferedWrites.size() > 0) {
+                            // Continue writing data using a gathering write
+                            ArrayList<ByteBuffer> arrayList = new 
ArrayList<>();
+                            if (attachment.hasRemaining()) {
+                                arrayList.add(attachment);
+                            }
+                            for (ByteBufferHolder buffer : 
Nio2SocketWrapper.this.bufferedWrites) {
+                                buffer.flip();
+                                arrayList.add(buffer.getBuf());
+                            }
+                            Nio2SocketWrapper.this.bufferedWrites.clear();
+                            ByteBuffer[] array = arrayList.toArray(new 
ByteBuffer[arrayList.size()]);
+                            Nio2SocketWrapper.this.getSocket().write(array, 0, 
array.length,
+                                    Nio2SocketWrapper.this.getTimeout(), 
TimeUnit.MILLISECONDS,
+                                    array, gatheringWriteCompletionHandler);
+                        } else if (attachment.hasRemaining()) {
+                            // Regular write
+                            
Nio2SocketWrapper.this.getSocket().write(attachment, 
Nio2SocketWrapper.this.getTimeout(),
+                                    TimeUnit.MILLISECONDS, attachment, 
writeCompletionHandler);
+                        } else {
+                            // All data has been written
+                            if (interest && !Nio2Endpoint.isInline()) {
+                                interest = false;
+                                notify = true;
+                            }
+                            writePending.release();
                         }
                     }
+                    if (notify) {
+                        endpoint.processSocket(Nio2SocketWrapper.this, 
SocketStatus.OPEN_WRITE, false);
+                    }
                 }
+
                 @Override
                 public void failed(Throwable exc, ByteBuffer attachment) {
-                    setError(true);
+                    IOException ioe;
+                    if (exc instanceof IOException) {
+                        ioe = (IOException) exc;
+                    } else {
+                        ioe = new IOException(exc);
+                    }
+                    Nio2SocketWrapper.this.setError(ioe);
                     writePending.release();
-                    if (exc instanceof AsynchronousCloseException) {
-                        // If already closed, don't call onError and close 
again
-                        return;
+                    endpoint.processSocket(Nio2SocketWrapper.this, 
SocketStatus.OPEN_WRITE, true);
+                }
+            };
+
+            gatheringWriteCompletionHandler = new CompletionHandler<Long, 
ByteBuffer[]>() {
+                @Override
+                public void completed(Long nBytes, ByteBuffer[] attachment) {
+                    boolean notify = false;
+                    synchronized (writeCompletionHandler) {
+                        if (nBytes.longValue() < 0) {
+                            failed(new 
EOFException(sm.getString("iob.failedwrite")), attachment);
+                        } else if 
(Nio2SocketWrapper.this.bufferedWrites.size() > 0 || arrayHasData(attachment)) {
+                            // Continue writing data
+                            ArrayList<ByteBuffer> arrayList = new 
ArrayList<>();
+                            for (ByteBuffer buffer : attachment) {
+                                if (buffer.hasRemaining()) {
+                                    arrayList.add(buffer);
+                                }
+                            }
+                            for (ByteBufferHolder buffer : 
Nio2SocketWrapper.this.bufferedWrites) {
+                                buffer.flip();
+                                arrayList.add(buffer.getBuf());
+                            }
+                            Nio2SocketWrapper.this.bufferedWrites.clear();
+                            ByteBuffer[] array = arrayList.toArray(new 
ByteBuffer[arrayList.size()]);
+                            Nio2SocketWrapper.this.getSocket().write(array, 0, 
array.length,
+                                    Nio2SocketWrapper.this.getTimeout(), 
TimeUnit.MILLISECONDS,
+                                    array, gatheringWriteCompletionHandler);
+                        } else {
+                            // All data has been written
+                            if (interest && !Nio2Endpoint.isInline()) {
+                                interest = false;
+                                notify = true;
+                            }
+                            writePending.release();
+                        }
+                    }
+                    if (notify) {
+                        endpoint.processSocket(Nio2SocketWrapper.this, 
SocketStatus.OPEN_WRITE, false);
                     }
-                    getEndpoint().processSocket(Nio2SocketWrapper.this, 
SocketStatus.ERROR, true);
                 }
+
+                @Override
+                public void failed(Throwable exc, ByteBuffer[] attachment) {
+                    IOException ioe;
+                    if (exc instanceof IOException) {
+                        ioe = (IOException) exc;
+                    } else {
+                        ioe = new IOException(exc);
+                    }
+                    Nio2SocketWrapper.this.setError(ioe);
+                    writePending.release();
+                    endpoint.processSocket(Nio2SocketWrapper.this, 
SocketStatus.OPEN_WRITE, true);
+               }
             };
 
         }
 
+        private static boolean arrayHasData(ByteBuffer[] byteBuffers) {
+            for (ByteBuffer byteBuffer : byteBuffers) {
+                if (byteBuffer.hasRemaining()) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
         @Override
         public void reset(Nio2Channel channel, long soTimeout) {
             super.reset(channel, soTimeout);
@@ -1073,8 +1171,15 @@ public class Nio2Endpoint extends Abstra
 
 
         @Override
-        public void registerWriteInterest() throws IOException {
-            // TODO Auto-generated method stub
+        public void registerWriteInterest() {
+            synchronized (writeCompletionHandler) {
+                if (writePending.availablePermits() == 0) {
+                    interest = true;
+                } else {
+                    // If no write is pending, notify
+                    getEndpoint().processSocket(this, SocketStatus.OPEN_WRITE, 
true);
+                }
+            }
         }
 
 
@@ -1082,13 +1187,6 @@ public class Nio2Endpoint extends Abstra
         public void regsiterForEvent(boolean read, boolean write) {
             // NO-OP. Appropriate handlers will already have been registered.
         }
-
-
-        @Override
-        public boolean flush(boolean block) throws IOException {
-            // TODO Auto-generated method stub
-            return false;
-        }
     }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Thu Jan  8 
13:10:41 2015
@@ -1241,7 +1241,7 @@ public class NioEndpoint extends Abstrac
                     NioSocketWrapper ka = (NioSocketWrapper) key.attachment();
                     if ( ka == null ) {
                         cancelledKey(key); //we don't support any keys without 
attachments
-                    } else if ( ka.getError() ) {
+                    } else if ( ka.getError() != null) {
                         cancelledKey(key);//TODO this is not yet being used
                     } else if (ka.getCallBackNotify() ) {
                         ka.setCallBackNotify(false);
@@ -1549,7 +1549,7 @@ public class NioEndpoint extends Abstrac
 
 
         @Override
-        public void registerWriteInterest() throws IOException {
+        public void registerWriteInterest() {
             getPoller().add(getSocket(), SelectionKey.OP_WRITE);
         }
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650277&r1=1650276&r2=1650277&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 
 8 13:10:41 2015
@@ -37,7 +37,7 @@ public abstract class SocketWrapperBase<
     private volatile long lastAsyncStart = 0;
     private volatile long asyncTimeout = -1;
     private long timeout = -1;
-    private boolean error = false;
+    private IOException error = null;
     private volatile int keepAliveLeft = 100;
     private volatile boolean async = false;
     private boolean keptAlive = false;
@@ -152,8 +152,8 @@ public abstract class SocketWrapperBase<
     void access(long access) { lastAccess = access; }
     public void setTimeout(long timeout) {this.timeout = timeout;}
     public long getTimeout() {return this.timeout;}
-    public boolean getError() { return error; }
-    public void setError(boolean error) { this.error = error; }
+    public IOException getError() { return error; }
+    public void setError(IOException error) { this.error = error; }
     public void setKeepAliveLeft(int keepAliveLeft) { this.keepAliveLeft = 
keepAliveLeft;}
     public int decrementKeepAlive() { return (--keepAliveLeft);}
     public boolean isKeptAlive() {return keptAlive;}
@@ -234,7 +234,7 @@ public abstract class SocketWrapperBase<
         async = false;
         blockingStatus = true;
         dispatches.clear();
-        error = false;
+        error = null;
         keepAliveLeft = 100;
         lastAccess = System.currentTimeMillis();
         lastAsyncStart = 0;
@@ -407,7 +407,7 @@ public abstract class SocketWrapperBase<
         holder.getBuf().put(buf,offset,length);
     }
 
-    public abstract void registerWriteInterest() throws IOException;
+    public abstract void registerWriteInterest();
 
     public abstract void regsiterForEvent(boolean read, boolean write);
 



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to