This is an automated email from the ASF dual-hosted git repository. ggregory pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/commons-io.git
The following commit(s) were added to refs/heads/master by this push: new b212baa Add QueueInputStream and QueueOutputStream as simpler alternatives to PipedInputStream and PipedOutputStream #171. b212baa is described below commit b212baa58601a975e19aa88d75b9259ef585fa70 Author: Gary Gregory <garydgreg...@gmail.com> AuthorDate: Thu Dec 10 14:33:46 2020 -0500 Add QueueInputStream and QueueOutputStream as simpler alternatives to PipedInputStream and PipedOutputStream #171. Sort new methods. Clean up Javadocs. --- src/changes/changes.xml | 3 ++ .../apache/commons/io/input/QueueInputStream.java | 27 ++++++---- .../commons/io/output/QueueOutputStream.java | 29 ++++++----- .../commons/io/output/QueueOutputStreamTest.java | 58 +++++++++++----------- 4 files changed, 65 insertions(+), 52 deletions(-) diff --git a/src/changes/changes.xml b/src/changes/changes.xml index 3cab9b9..36e245a 100644 --- a/src/changes/changes.xml +++ b/src/changes/changes.xml @@ -97,6 +97,9 @@ The <action> type attribute can be add,update,fix,remove. <action dev="ggregory" type="add" due-to="Rob Spoor, Gary Gregory"> Add factory methods to CloseShieldInputStream, CloseShieldReader, CloseShieldOutputStream, CloseShieldWriter, #173. </action> + <action dev="ggregory" type="add" due-to="Rob Spoor, Gary Gregory"> + Add QueueInputStream and QueueOutputStream as simpler alternatives to PipedInputStream and PipedOutputStream #171. + </action> <!-- UPDATES --> <action dev="ggregory" type="update" due-to="Dependabot"> Update junit-jupiter from 5.6.2 to 5.7.0 #153. diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java b/src/main/java/org/apache/commons/io/input/QueueInputStream.java index 4f3ff3a..34296ae 100644 --- a/src/main/java/org/apache/commons/io/input/QueueInputStream.java +++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java @@ -26,9 +26,13 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** - * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue output stream. + * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input stream provides what's written in queue + * output stream. * + * <p> * Example usage: + * </p> + * * <pre> * QueueInputStream inputStream = new QueueInputStream(); * QueueOutputStream outputStream = inputStream.newQueueOutputStream(); @@ -37,12 +41,14 @@ import java.util.concurrent.LinkedBlockingQueue; * inputStream.read(); * </pre> * - * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be - * used safely in a single thread or multiple threads. Also, unlike JDK classes, no special meaning is - * attached to initial or current thread. Instances can be used longer after initial threads exited. + * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a + * single thread or multiple threads. Also, unlike JDK classes, no special meaning is attached to initial or current + * thread. Instances can be used longer after initial threads exited. * - * Closing a {@code QueueInputStream} has no effect. The methods in this class can be called after - * the stream has been closed without generating an {@code IOException}. + * <p> + * Closing a {@code QueueInputStream} has no effect. The methods in this class can be called after the stream has been + * closed without generating an {@code IOException}. + * </p> * * @see QueueOutputStream * @since 2.9.0 @@ -52,14 +58,14 @@ public class QueueInputStream extends InputStream { private final BlockingQueue<Integer> queue; /** - * Constructs a QueueInputStream with no limit to internal buffer size + * Constructs a new instance with no limit to its internal buffer size. */ public QueueInputStream() { this(new LinkedBlockingQueue<>()); } /** - * Constructs a QueueInputStream with given buffer + * Constructs a new instance with given buffer * * @param queue backing queue for the stream */ @@ -68,7 +74,8 @@ public class QueueInputStream extends InputStream { } /** - * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this input stream. + * Creates a new QueueOutputStream instance connected to this. Writes to the output stream will be visible to this + * input stream. * * @return QueueOutputStream connected to this stream */ @@ -77,7 +84,7 @@ public class QueueInputStream extends InputStream { } /** - * Reads a single byte. + * Reads and returns a single byte. * * @return either the byte read or {@code -1} if the end of the stream has been reached */ diff --git a/src/main/java/org/apache/commons/io/output/QueueOutputStream.java b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java index 28b3c7d..0304668 100644 --- a/src/main/java/org/apache/commons/io/output/QueueOutputStream.java +++ b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java @@ -27,10 +27,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** - * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input stream provides what's - * written in queue output stream. - * + * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input stream provides what's written in queue + * output stream. + * <p> * Example usage: + * </p> + * * <pre> * QueueOutputStream outputStream = new QueueOutputStream(); * QueueInputStream inputStream = outputStream.newPipeInputStream(); @@ -39,12 +41,13 @@ import java.util.concurrent.LinkedBlockingQueue; * inputStream.read(); * </pre> * - * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be - * used safely in a single thread or multiple threads. Also, unlike JDK classes, no special meaning is - * attached to initial or current thread. Instances can be used longer after initial threads exited. - * - * Closing a {@code QueueOutputStream} has no effect. The methods in this class can be called after - * the stream has been closed without generating an {@code IOException}. + * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue input/output streams may be used safely in a + * single thread or multiple threads. Also, unlike JDK classes, no special meaning is attached to initial or current + * thread. Instances can be used longer after initial threads exited. + * <p> + * Closing a {@code QueueOutputStream} has no effect. The methods in this class can be called after the stream has been + * closed without generating an {@code IOException}. + * </p> * * @see QueueInputStream * @since 2.9.0 @@ -54,14 +57,14 @@ public class QueueOutputStream extends OutputStream { private final BlockingQueue<Integer> queue; /** - * Constructs a QueueOutputStream with no limit to internal buffer size + * Constructs a new instance with no limit to internal buffer size. */ public QueueOutputStream() { this(new LinkedBlockingQueue<>()); } /** - * Constructs a QueueOutputStream with given buffer + * Constructs a new instance with given buffer. * * @param queue backing queue for the stream */ @@ -70,7 +73,8 @@ public class QueueOutputStream extends OutputStream { } /** - * Creates a new QueueInputStream instance connected to this. Writes to this output stream will be visible to the input stream. + * Creates a new QueueInputStream instance connected to this. Writes to this output stream will be visible to the + * input stream. * * @return QueueInputStream connected to this stream */ @@ -95,4 +99,3 @@ public class QueueOutputStream extends OutputStream { } } } - diff --git a/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java index 28a726c..02c0179 100644 --- a/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java +++ b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java @@ -50,28 +50,19 @@ public class QueueOutputStreamTest { executorService.shutdown(); } - @Test - public void writeString() throws Exception { - try (final QueueOutputStream outputStream = new QueueOutputStream(); - final QueueInputStream inputStream = outputStream.newQueueInputStream()) { - outputStream.write("ABC".getBytes(UTF_8)); - final String value = IOUtils.toString(inputStream, UTF_8); - assertEquals("ABC", value); - } + private static <T> T callInThrowAwayThread(final Callable<T> callable) throws Exception { + final Exchanger<T> exchanger = new Exchanger<>(); + executorService.submit(() -> { + final T value = callable.call(); + exchanger.exchange(value); + return null; + }); + return exchanger.exchange(null); } @Test - public void writeStringMultiThread() throws Exception { - try (final QueueOutputStream outputStream = callInThrowAwayThread(QueueOutputStream::new); - final QueueInputStream inputStream = callInThrowAwayThread(outputStream::newQueueInputStream)) { - callInThrowAwayThread(() -> { - outputStream.write("ABC".getBytes(UTF_8)); - return null; - }); - - final String value = callInThrowAwayThread(() -> IOUtils.toString(inputStream, UTF_8)); - assertEquals("ABC", value); - } + public void testNullArgument() { + assertThrows(NullPointerException.class, () -> new QueueOutputStream(null), "queue is required"); } @Test @@ -106,17 +97,26 @@ public class QueueOutputStreamTest { } @Test - public void testNullArgument() { - assertThrows(NullPointerException.class, () -> new QueueOutputStream(null), "queue is required"); + public void writeString() throws Exception { + try (final QueueOutputStream outputStream = new QueueOutputStream(); + final QueueInputStream inputStream = outputStream.newQueueInputStream()) { + outputStream.write("ABC".getBytes(UTF_8)); + final String value = IOUtils.toString(inputStream, UTF_8); + assertEquals("ABC", value); + } } - private static <T> T callInThrowAwayThread(final Callable<T> callable) throws Exception { - final Exchanger<T> exchanger = new Exchanger<>(); - executorService.submit(() -> { - final T value = callable.call(); - exchanger.exchange(value); - return null; - }); - return exchanger.exchange(null); + @Test + public void writeStringMultiThread() throws Exception { + try (final QueueOutputStream outputStream = callInThrowAwayThread(QueueOutputStream::new); + final QueueInputStream inputStream = callInThrowAwayThread(outputStream::newQueueInputStream)) { + callInThrowAwayThread(() -> { + outputStream.write("ABC".getBytes(UTF_8)); + return null; + }); + + final String value = callInThrowAwayThread(() -> IOUtils.toString(inputStream, UTF_8)); + assertEquals("ABC", value); + } } }