This is an automated email from the ASF dual-hosted git repository. remm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tomcat.git
The following commit(s) were added to refs/heads/master by this push: new 2abb1b9 Add classic NIO2 style read and write 2abb1b9 is described below commit 2abb1b9d4d5b1d2e3aa9b0a5907c4c81b61ac367 Author: remm <r...@apache.org> AuthorDate: Wed May 1 15:13:20 2019 +0200 Add classic NIO2 style read and write Possible use with CompletableFuture which would need exceptions or completion handler failed call to proceed. --- java/org/apache/tomcat/util/net/Nio2Endpoint.java | 20 ++++++-- java/org/apache/tomcat/util/net/NioEndpoint.java | 20 ++++++-- .../apache/tomcat/util/net/SocketWrapperBase.java | 54 +++++++++++++++++++++- webapps/docs/changelog.xml | 5 ++ 4 files changed, 90 insertions(+), 9 deletions(-) diff --git a/java/org/apache/tomcat/util/net/Nio2Endpoint.java b/java/org/apache/tomcat/util/net/Nio2Endpoint.java index 6d3e5d0..815713a 100644 --- a/java/org/apache/tomcat/util/net/Nio2Endpoint.java +++ b/java/org/apache/tomcat/util/net/Nio2Endpoint.java @@ -31,6 +31,8 @@ import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.NetworkChannel; +import java.nio.channels.ReadPendingException; +import java.nio.channels.WritePendingException; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.concurrent.ExecutionException; @@ -1007,7 +1009,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS } // Disable any regular read notifications caused by registerReadInterest readNotify = true; - if (block != BlockingMode.NON_BLOCK) { + if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { if (!readPending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); @@ -1019,7 +1021,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS } } else { if (!readPending.tryAcquire()) { - return CompletionState.NOT_DONE; + if (block == BlockingMode.NON_BLOCK) { + return CompletionState.NOT_DONE; + } else { + handler.failed(new ReadPendingException(), attachment); + return CompletionState.ERROR; + } } } OperationState<A> state = new OperationState<>(true, dsts, offset, length, block, @@ -1076,7 +1083,7 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS } // Disable any regular write notifications caused by registerWriteInterest writeNotify = true; - if (block != BlockingMode.NON_BLOCK) { + if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { if (!writePending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); @@ -1088,7 +1095,12 @@ public class Nio2Endpoint extends AbstractJsseEndpoint<Nio2Channel,AsynchronousS } } else { if (!writePending.tryAcquire()) { - return CompletionState.NOT_DONE; + if (block == BlockingMode.NON_BLOCK) { + return CompletionState.NOT_DONE; + } else { + handler.failed(new WritePendingException(), attachment); + return CompletionState.ERROR; + } } } if (!socketBufferHandler.isWriteBufferEmpty()) { diff --git a/java/org/apache/tomcat/util/net/NioEndpoint.java b/java/org/apache/tomcat/util/net/NioEndpoint.java index eb9b87a..259dc64 100644 --- a/java/org/apache/tomcat/util/net/NioEndpoint.java +++ b/java/org/apache/tomcat/util/net/NioEndpoint.java @@ -31,11 +31,13 @@ import java.nio.channels.CompletionHandler; import java.nio.channels.FileChannel; import java.nio.channels.InterruptedByTimeoutException; import java.nio.channels.NetworkChannel; +import java.nio.channels.ReadPendingException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.channels.WritableByteChannel; +import java.nio.channels.WritePendingException; import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.concurrent.CountDownLatch; @@ -1568,7 +1570,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } else if (unit.toMillis(timeout) != getReadTimeout()) { setReadTimeout(unit.toMillis(timeout)); } - if (block != BlockingMode.NON_BLOCK) { + if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { if (!readPending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); @@ -1580,7 +1582,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } } else { if (!readPending.tryAcquire()) { - return CompletionState.NOT_DONE; + if (block == BlockingMode.NON_BLOCK) { + return CompletionState.NOT_DONE; + } else { + handler.failed(new ReadPendingException(), attachment); + return CompletionState.ERROR; + } } } VectoredIOCompletionHandler<A> completion = new VectoredIOCompletionHandler<>(); @@ -1634,7 +1641,7 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } else if (unit.toMillis(timeout) != getWriteTimeout()) { setWriteTimeout(unit.toMillis(timeout)); } - if (block != BlockingMode.NON_BLOCK) { + if (block == BlockingMode.BLOCK || block == BlockingMode.SEMI_BLOCK) { try { if (!writePending.tryAcquire(timeout, unit)) { handler.failed(new SocketTimeoutException(), attachment); @@ -1646,7 +1653,12 @@ public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> } } else { if (!writePending.tryAcquire()) { - return CompletionState.NOT_DONE; + if (block == BlockingMode.NON_BLOCK) { + return CompletionState.NOT_DONE; + } else { + handler.failed(new WritePendingException(), attachment); + return CompletionState.ERROR; + } } } if (!socketBufferHandler.isWriteBufferEmpty()) { diff --git a/java/org/apache/tomcat/util/net/SocketWrapperBase.java b/java/org/apache/tomcat/util/net/SocketWrapperBase.java index cd2e619..b09284a 100644 --- a/java/org/apache/tomcat/util/net/SocketWrapperBase.java +++ b/java/org/apache/tomcat/util/net/SocketWrapperBase.java @@ -808,7 +808,12 @@ public abstract class SocketWrapperBase<E> { public enum BlockingMode { /** - * The operation will now block. If there are pending operations, + * The operation will not block. If there are pending operations, + * the operation will throw a pending exception. + */ + CLASSIC, + /** + * The operation will not block. If there are pending operations, * the operation will return CompletionState.NOT_DONE. */ NON_BLOCK, @@ -1006,6 +1011,29 @@ public abstract class SocketWrapperBase<E> { /** * Scatter read. The completion handler will be called once some + * data has been read or an error occurred. The default NIO2 + * behavior is used: the completion handler will be called as soon + * as some data has been read, even if the read has completed inline. + * + * @param timeout timeout duration for the read + * @param unit units for the timeout duration + * @param attachment an object to attach to the I/O operation that will be + * used when calling the completion handler + * @param handler to call when the IO is complete + * @param dsts buffers + * @param <A> The attachment type + * @return the completion state (done, done inline, or still pending) + */ + public final <A> CompletionState read(long timeout, TimeUnit unit, A attachment, + CompletionHandler<Long, ? super A> handler, ByteBuffer... dsts) { + if (dsts == null) { + throw new IllegalArgumentException(); + } + return read(dsts, 0, dsts.length, BlockingMode.CLASSIC, timeout, unit, attachment, null, handler); + } + + /** + * Scatter read. The completion handler will be called once some * data has been read or an error occurred. If a CompletionCheck * object has been provided, the completion handler will only be * called if the callHandler method returned true. If no @@ -1063,6 +1091,30 @@ public abstract class SocketWrapperBase<E> { /** * Gather write. The completion handler will be called once some + * data has been written or an error occurred. The default NIO2 + * behavior is used: the completion handler will be called, even + * if the write is incomplete and data remains in the buffers, or + * if the write completed inline. + * + * @param timeout timeout duration for the write + * @param unit units for the timeout duration + * @param attachment an object to attach to the I/O operation that will be + * used when calling the completion handler + * @param handler to call when the IO is complete + * @param srcs buffers + * @param <A> The attachment type + * @return the completion state (done, done inline, or still pending) + */ + public final <A> CompletionState write(long timeout, TimeUnit unit, A attachment, + CompletionHandler<Long, ? super A> handler, ByteBuffer... srcs) { + if (srcs == null) { + throw new IllegalArgumentException(); + } + return write(srcs, 0, srcs.length, BlockingMode.CLASSIC, timeout, unit, attachment, null, handler); + } + + /** + * Gather write. The completion handler will be called once some * data has been written or an error occurred. If a CompletionCheck * object has been provided, the completion handler will only be * called if the callHandler method returned true. If no diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml index c65b93b..21d2af2 100644 --- a/webapps/docs/changelog.xml +++ b/webapps/docs/changelog.xml @@ -166,6 +166,11 @@ Refactor Hostname validation to improve performance. Patch provided by Uwe Hees. (markt) </scode> + <update> + Add additional NIO2 style read and write methods closer to core NIO2, + for possible use with an asynchronous workflow like CompletableFuture. + (remm) + </update> </changelog> </subsection> <subsection name="Other"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org