This is an automated email from the ASF dual-hosted git repository.

ggregory pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/commons-io.git


The following commit(s) were added to refs/heads/master by this push:
     new b212baa  Add QueueInputStream and QueueOutputStream as simpler 
alternatives to PipedInputStream and PipedOutputStream #171.
b212baa is described below

commit b212baa58601a975e19aa88d75b9259ef585fa70
Author: Gary Gregory <garydgreg...@gmail.com>
AuthorDate: Thu Dec 10 14:33:46 2020 -0500

    Add QueueInputStream and QueueOutputStream as simpler alternatives to
    PipedInputStream and PipedOutputStream #171.
    
    Sort new methods.
    Clean up Javadocs.
---
 src/changes/changes.xml                            |  3 ++
 .../apache/commons/io/input/QueueInputStream.java  | 27 ++++++----
 .../commons/io/output/QueueOutputStream.java       | 29 ++++++-----
 .../commons/io/output/QueueOutputStreamTest.java   | 58 +++++++++++-----------
 4 files changed, 65 insertions(+), 52 deletions(-)

diff --git a/src/changes/changes.xml b/src/changes/changes.xml
index 3cab9b9..36e245a 100644
--- a/src/changes/changes.xml
+++ b/src/changes/changes.xml
@@ -97,6 +97,9 @@ The <action> type attribute can be add,update,fix,remove.
       <action dev="ggregory" type="add" due-to="Rob Spoor, Gary Gregory">
         Add factory methods to CloseShieldInputStream, CloseShieldReader, 
CloseShieldOutputStream, CloseShieldWriter, #173.
       </action>
+      <action dev="ggregory" type="add" due-to="Rob Spoor, Gary Gregory">
+        Add QueueInputStream and QueueOutputStream as simpler alternatives to 
PipedInputStream and PipedOutputStream #171.
+      </action>
       <!-- UPDATES -->
       <action dev="ggregory" type="update" due-to="Dependabot">
         Update junit-jupiter from 5.6.2 to 5.7.0 #153.
diff --git a/src/main/java/org/apache/commons/io/input/QueueInputStream.java 
b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
index 4f3ff3a..34296ae 100644
--- a/src/main/java/org/apache/commons/io/input/QueueInputStream.java
+++ b/src/main/java/org/apache/commons/io/input/QueueInputStream.java
@@ -26,9 +26,13 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input 
stream provides what's written in queue output stream.
+ * Simple alternative to JDK {@link java.io.PipedInputStream}; queue input 
stream provides what's written in queue
+ * output stream.
  * 
+ * <p>
  * Example usage:
+ * </p>
+ * 
  * <pre>
  * QueueInputStream inputStream = new QueueInputStream();
  * QueueOutputStream outputStream = inputStream.newQueueOutputStream();
@@ -37,12 +41,14 @@ import java.util.concurrent.LinkedBlockingQueue;
  * inputStream.read();
  * </pre>
  * 
- * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue 
input/output streams may be
- * used safely in a single thread or multiple threads. Also, unlike JDK 
classes, no special meaning is
- * attached to initial or current thread. Instances can be used longer after 
initial threads exited.
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue 
input/output streams may be used safely in a
+ * single thread or multiple threads. Also, unlike JDK classes, no special 
meaning is attached to initial or current
+ * thread. Instances can be used longer after initial threads exited.
  * 
- * Closing a {@code QueueInputStream} has no effect. The methods in this class 
can be called after
- * the stream has been closed without generating an {@code IOException}.
+ * <p>
+ * Closing a {@code QueueInputStream} has no effect. The methods in this class 
can be called after the stream has been
+ * closed without generating an {@code IOException}.
+ * </p>
  * 
  * @see QueueOutputStream
  * @since 2.9.0
@@ -52,14 +58,14 @@ public class QueueInputStream extends InputStream {
     private final BlockingQueue<Integer> queue;
 
     /**
-     * Constructs a QueueInputStream with no limit to internal buffer size
+     * Constructs a new instance with no limit to its internal buffer size.
      */
     public QueueInputStream() {
         this(new LinkedBlockingQueue<>());
     }
 
     /**
-     * Constructs a QueueInputStream with given buffer
+     * Constructs a new instance with given buffer
      * 
      * @param queue backing queue for the stream
      */
@@ -68,7 +74,8 @@ public class QueueInputStream extends InputStream {
     }
 
     /**
-     * Creates a new QueueOutputStream instance connected to this. Writes to 
the output stream will be visible to this input stream.
+     * Creates a new QueueOutputStream instance connected to this. Writes to 
the output stream will be visible to this
+     * input stream.
      * 
      * @return QueueOutputStream connected to this stream
      */
@@ -77,7 +84,7 @@ public class QueueInputStream extends InputStream {
     }
 
     /**
-     * Reads a single byte.
+     * Reads and returns a single byte.
      *
      * @return either the byte read or {@code -1} if the end of the stream has 
been reached
      */
diff --git a/src/main/java/org/apache/commons/io/output/QueueOutputStream.java 
b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java
index 28b3c7d..0304668 100644
--- a/src/main/java/org/apache/commons/io/output/QueueOutputStream.java
+++ b/src/main/java/org/apache/commons/io/output/QueueOutputStream.java
@@ -27,10 +27,12 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input 
stream provides what's
- * written in queue output stream.
- * 
+ * Simple alternative to JDK {@link java.io.PipedOutputStream}; queue input 
stream provides what's written in queue
+ * output stream.
+ * <p>
  * Example usage:
+ * </p>
+ * 
  * <pre>
  * QueueOutputStream outputStream = new QueueOutputStream();
  * QueueInputStream inputStream = outputStream.newPipeInputStream();
@@ -39,12 +41,13 @@ import java.util.concurrent.LinkedBlockingQueue;
  * inputStream.read();
  * </pre>
  * 
- * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue 
input/output streams may be
- * used safely in a single thread or multiple threads. Also, unlike JDK 
classes, no special meaning is
- * attached to initial or current thread. Instances can be used longer after 
initial threads exited.
- * 
- * Closing a {@code QueueOutputStream} has no effect. The methods in this 
class can be called after
- * the stream has been closed without generating an {@code IOException}.
+ * Unlike JDK {@link PipedInputStream} and {@link PipedOutputStream}, queue 
input/output streams may be used safely in a
+ * single thread or multiple threads. Also, unlike JDK classes, no special 
meaning is attached to initial or current
+ * thread. Instances can be used longer after initial threads exited.
+ * <p>
+ * Closing a {@code QueueOutputStream} has no effect. The methods in this 
class can be called after the stream has been
+ * closed without generating an {@code IOException}.
+ * </p>
  * 
  * @see QueueInputStream
  * @since 2.9.0
@@ -54,14 +57,14 @@ public class QueueOutputStream extends OutputStream {
     private final BlockingQueue<Integer> queue;
 
     /**
-     * Constructs a QueueOutputStream with no limit to internal buffer size
+     * Constructs a new instance with no limit to internal buffer size.
      */
     public QueueOutputStream() {
         this(new LinkedBlockingQueue<>());
     }
 
     /**
-     * Constructs a QueueOutputStream with given buffer
+     * Constructs a new instance with given buffer.
      * 
      * @param queue backing queue for the stream
      */
@@ -70,7 +73,8 @@ public class QueueOutputStream extends OutputStream {
     }
 
     /**
-     * Creates a new QueueInputStream instance connected to this. Writes to 
this output stream will be visible to the input stream.
+     * Creates a new QueueInputStream instance connected to this. Writes to 
this output stream will be visible to the
+     * input stream.
      * 
      * @return QueueInputStream connected to this stream
      */
@@ -95,4 +99,3 @@ public class QueueOutputStream extends OutputStream {
         }
     }
 }
-
diff --git 
a/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java 
b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java
index 28a726c..02c0179 100644
--- a/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java
+++ b/src/test/java/org/apache/commons/io/output/QueueOutputStreamTest.java
@@ -50,28 +50,19 @@ public class QueueOutputStreamTest {
         executorService.shutdown();
     }
 
-    @Test
-    public void writeString() throws Exception {
-        try (final QueueOutputStream outputStream = new QueueOutputStream();
-                final QueueInputStream inputStream = 
outputStream.newQueueInputStream()) {
-            outputStream.write("ABC".getBytes(UTF_8));
-            final String value = IOUtils.toString(inputStream, UTF_8);
-            assertEquals("ABC", value);
-        }
+    private static <T> T callInThrowAwayThread(final Callable<T> callable) 
throws Exception {
+        final Exchanger<T> exchanger = new Exchanger<>();
+        executorService.submit(() -> {
+            final T value = callable.call();
+            exchanger.exchange(value);
+            return null;
+        });
+        return exchanger.exchange(null);
     }
 
     @Test
-    public void writeStringMultiThread() throws Exception {
-        try (final QueueOutputStream outputStream = 
callInThrowAwayThread(QueueOutputStream::new);
-                final QueueInputStream inputStream = 
callInThrowAwayThread(outputStream::newQueueInputStream)) {
-            callInThrowAwayThread(() -> {
-                outputStream.write("ABC".getBytes(UTF_8));
-                return null;
-            });
-
-            final String value = callInThrowAwayThread(() -> 
IOUtils.toString(inputStream, UTF_8));
-            assertEquals("ABC", value);
-        }
+    public void testNullArgument() {
+        assertThrows(NullPointerException.class, () -> new 
QueueOutputStream(null), "queue is required");
     }
 
     @Test
@@ -106,17 +97,26 @@ public class QueueOutputStreamTest {
     }
 
     @Test
-    public void testNullArgument() {
-        assertThrows(NullPointerException.class, () -> new 
QueueOutputStream(null), "queue is required");
+    public void writeString() throws Exception {
+        try (final QueueOutputStream outputStream = new QueueOutputStream();
+                final QueueInputStream inputStream = 
outputStream.newQueueInputStream()) {
+            outputStream.write("ABC".getBytes(UTF_8));
+            final String value = IOUtils.toString(inputStream, UTF_8);
+            assertEquals("ABC", value);
+        }
     }
 
-    private static <T> T callInThrowAwayThread(final Callable<T> callable) 
throws Exception {
-        final Exchanger<T> exchanger = new Exchanger<>();
-        executorService.submit(() -> {
-            final T value = callable.call();
-            exchanger.exchange(value);
-            return null;
-        });
-        return exchanger.exchange(null);
+    @Test
+    public void writeStringMultiThread() throws Exception {
+        try (final QueueOutputStream outputStream = 
callInThrowAwayThread(QueueOutputStream::new);
+                final QueueInputStream inputStream = 
callInThrowAwayThread(outputStream::newQueueInputStream)) {
+            callInThrowAwayThread(() -> {
+                outputStream.write("ABC".getBytes(UTF_8));
+                return null;
+            });
+
+            final String value = callInThrowAwayThread(() -> 
IOUtils.toString(inputStream, UTF_8));
+            assertEquals("ABC", value);
+        }
     }
 }

Reply via email to