This is an automated email from the ASF dual-hosted git repository.

remm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tomcat.git


The following commit(s) were added to refs/heads/master by this push:
     new b0393cb  Add sync on operation
b0393cb is described below

commit b0393cbafc4324a85bb5759f15954e1b96f3bc8b
Author: remm <r...@apache.org>
AuthorDate: Mon Apr 15 18:17:26 2019 +0200

    Add sync on operation
    
    As an experiment, to mimic sync on the wrapper. Also add error handling
    when executing the operation.
---
 java/org/apache/tomcat/util/net/NioEndpoint.java | 81 +++++++++++++++---------
 1 file changed, 52 insertions(+), 29 deletions(-)

diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java 
b/java/org/apache/tomcat/util/net/NioEndpoint.java
index 8ac66ff..3fc5564 100644
--- a/java/org/apache/tomcat/util/net/NioEndpoint.java
+++ b/java/org/apache/tomcat/util/net/NioEndpoint.java
@@ -39,6 +39,7 @@ import java.nio.channels.WritableByteChannel;
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -811,14 +812,18 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
                             // Read goes before write
                             if (sk.isReadable()) {
                                 if (socketWrapper.readOperation != null) {
-                                    
getExecutor().execute(socketWrapper.readOperation);
+                                    if 
(!socketWrapper.readOperation.process()) {
+                                        closeSocket = true;
+                                    }
                                 } else if (!processSocket(socketWrapper, 
SocketEvent.OPEN_READ, true)) {
                                     closeSocket = true;
                                 }
                             }
                             if (!closeSocket && sk.isWritable()) {
                                 if (socketWrapper.writeOperation != null) {
-                                    
getExecutor().execute(socketWrapper.writeOperation);
+                                    if 
(!socketWrapper.writeOperation.process()) {
+                                        closeSocket = true;
+                                    }
                                 } else if (!processSocket(socketWrapper, 
SocketEvent.OPEN_WRITE, true)) {
                                     closeSocket = true;
                                 }
@@ -1436,38 +1441,56 @@ public class NioEndpoint extends 
AbstractJsseEndpoint<NioChannel,SocketChannel>
             private volatile long nBytes = 0;
             private volatile CompletionState state = CompletionState.PENDING;
 
+            public boolean process() {
+                try {
+                    getEndpoint().getExecutor().execute(this);
+                } catch (RejectedExecutionException ree) {
+                    log.warn(sm.getString("endpoint.executor.fail", 
NioSocketWrapper.this) , ree);
+                    return false;
+                } catch (Throwable t) {
+                    ExceptionUtils.handleThrowable(t);
+                    // This means we got an OOM or similar creating a thread, 
or that
+                    // the pool and its queue are full
+                    log.error(sm.getString("endpoint.process.fail"), t);
+                    return false;
+                }
+                return true;
+            }
+
             @Override
             public void run() {
-                // Perform the IO operation
-                // Called from the poller to continue the IO operation
-                long nBytes = 0;
-                if (getError() == null) {
-                    try {
-                        if (read) {
-                            nBytes = getSocket().read(buffers, offset, length);
-                        } else {
-                            nBytes = getSocket().write(buffers, offset, 
length);
+                synchronized (semaphore) {
+                    // Perform the IO operation
+                    // Called from the poller to continue the IO operation
+                    long nBytes = 0;
+                    if (getError() == null) {
+                        try {
+                            if (read) {
+                                nBytes = getSocket().read(buffers, offset, 
length);
+                            } else {
+                                nBytes = getSocket().write(buffers, offset, 
length);
+                            }
+                        } catch (IOException e) {
+                            setError(e);
                         }
-                    } catch (IOException e) {
-                        setError(e);
-                    }
-                }
-                if (nBytes > 0) {
-                    // The bytes read are only updated in the completion 
handler
-                    completion.completed(Long.valueOf(nBytes), this);
-                } else if (nBytes < 0 || getError() != null) {
-                    IOException error = getError();
-                    if (error == null) {
-                        error = new EOFException();
                     }
-                    completion.failed(error, this);
-                } else {
-                    // As soon as the operation uses the poller, it is no 
longer inline
-                    inline = false;
-                    if (read) {
-                        registerReadInterest();
+                    if (nBytes > 0) {
+                        // The bytes read are only updated in the completion 
handler
+                        completion.completed(Long.valueOf(nBytes), this);
+                    } else if (nBytes < 0 || getError() != null) {
+                        IOException error = getError();
+                        if (error == null) {
+                            error = new EOFException();
+                        }
+                        completion.failed(error, this);
                     } else {
-                        registerWriteInterest();
+                        // As soon as the operation uses the poller, it is no 
longer inline
+                        inline = false;
+                        if (read) {
+                            registerReadInterest();
+                        } else {
+                            registerWriteInterest();
+                        }
                     }
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to