This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push: new b0393cb Add sync on operation b0393cb is described below commit b0393cbafc4324a85bb5759f15954e1b96f3bc8b Author: remm <r...@apache.org> AuthorDate: Mon Apr 15 18:17:26 2019 +0200 Add sync on operation As an experiment, to mimic sync on the wrapper. Also add error handling when executing the operation. --- java/org/apache/tomcat/util/net/NioEndpoint.java | 81 +++++++++++++++--------- 1 file changed, 52 insertions(+), 29 deletions(-) diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index 8ac66ff..3fc5564 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -39,6 +39,7 @@ import java.nio.channels.WritableByteChannel; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -811,14 +812,18 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> // Read goes before write if (sk.isReadable()) { if (socketWrapper.readOperation != null) { - getExecutor().execute(socketWrapper.readOperation); + if (!socketWrapper.readOperation.process()) { + closeSocket = true; + } } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (socketWrapper.writeOperation != null) { - getExecutor().execute(socketWrapper.writeOperation); + if (!socketWrapper.writeOperation.process()) { + closeSocket = true; + } } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } @@ -1436,38 +1441,56 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> private volatile long nBytes = 0; private volatile CompletionState state = CompletionState.PENDING; + public boolean process() { + try { + getEndpoint().getExecutor().execute(this); + } catch (RejectedExecutionException ree) { + log.warn(sm.getString("endpoint.executor.fail", NioSocketWrapper.this) , ree); + return false; + } catch (Throwable t) { + ExceptionUtils.handleThrowable(t); + // This means we got an OOM or similar creating a thread, or that + // the pool and its queue are full + log.error(sm.getString("endpoint.process.fail"), t); + return false; + } + return true; + } + @Override public void run() { - // Perform the IO operation - // Called from the poller to continue the IO operation - long nBytes = 0; - if (getError() == null) { - try { - if (read) { - nBytes = getSocket().read(buffers, offset, length); - } else { - nBytes = getSocket().write(buffers, offset, length); + synchronized (semaphore) { + // Perform the IO operation + // Called from the poller to continue the IO operation + long nBytes = 0; + if (getError() == null) { + try { + if (read) { + nBytes = getSocket().read(buffers, offset, length); + } else { + nBytes = getSocket().write(buffers, offset, length); + } + } catch (IOException e) { + setError(e); } - } catch (IOException e) { - setError(e); - } - } - if (nBytes > 0) { - // The bytes read are only updated in the completion handler - completion.completed(Long.valueOf(nBytes), this); - } else if (nBytes < 0 || getError() != null) { - IOException error = getError(); - if (error == null) { - error = new EOFException(); } - completion.failed(error, this); - } else { - // As soon as the operation uses the poller, it is no longer inline - inline = false; - if (read) { - registerReadInterest(); + if (nBytes > 0) { + // The bytes read are only updated in the completion handler + completion.completed(Long.valueOf(nBytes), this); + } else if (nBytes < 0 || getError() != null) { + IOException error = getError(); + if (error == null) { + error = new EOFException(); + } + completion.failed(error, this); } else { - registerWriteInterest(); + // As soon as the operation uses the poller, it is no longer inline + inline = false; + if (read) { + registerReadInterest(); + } else { + registerWriteInterest(); + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org