This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch jira/solr-16129
in repository https://gitbox.apache.org/repos/asf/solr.git

commit ab2b231c6a0be87fb117c4a610a630c5170e3d43
Author: Chris Hostetter <[email protected]>
AuthorDate: Mon Apr 4 17:26:36 2022 -0700

    SOLR-16129: Use AutoLock in InputStreamResponseListener and resolve test 
nocommits
---
 .../apache/solr/client/solrj/util/AutoLock.java    | 139 ++++++++++++++++
 .../solrj/util/InputStreamResponseListener.java    | 180 +++++++++++++++------
 .../util/TestInputStreamResponseListener.java      | 120 ++++++++------
 3 files changed, 335 insertions(+), 104 deletions(-)

diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java 
b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java
new file mode 100644
index 00000000000..2b556d8db3d
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//  Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
+
+package org.apache.solr.client.solrj.util;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Reentrant lock that can be used in a try-with-resources statement.
+ *
+ * <p>Typical usage:
+ *
+ * <pre>
+ * try (AutoLock lock = this.lock.lock())
+ * {
+ *     // Something
+ * }
+ * </pre>
+ */
+public class AutoLock implements AutoCloseable, Serializable {
+  private static final long serialVersionUID = 3300696774541816341L;
+
+  private final ReentrantLock _lock = new ReentrantLock();
+
+  /**
+   * Acquires the lock.
+   *
+   * @return this AutoLock for unlocking
+   */
+  public AutoLock lock() {
+    _lock.lock();
+    return this;
+  }
+
+  /**
+   * @see ReentrantLock#isHeldByCurrentThread()
+   * @return whether this lock is held by the current thread
+   */
+  public boolean isHeldByCurrentThread() {
+    return _lock.isHeldByCurrentThread();
+  }
+
+  /**
+   * @return a {@link Condition} associated with this lock
+   */
+  public Condition newCondition() {
+    return _lock.newCondition();
+  }
+
+  // Package-private for testing only.
+  boolean isLocked() {
+    return _lock.isLocked();
+  }
+
+  @Override
+  public void close() {
+    _lock.unlock();
+  }
+
+  /**
+   * A reentrant lock with a condition that can be used in a 
try-with-resources statement.
+   *
+   * <p>Typical usage:
+   *
+   * <pre>
+   * // Waiting
+   * try (AutoLock lock = _lock.lock())
+   * {
+   *     lock.await();
+   * }
+   *
+   * // Signaling
+   * try (AutoLock lock = _lock.lock())
+   * {
+   *     lock.signalAll();
+   * }
+   * </pre>
+   */
+  public static class WithCondition extends AutoLock {
+    private final Condition _condition = newCondition();
+
+    @Override
+    public AutoLock.WithCondition lock() {
+      return (WithCondition) super.lock();
+    }
+
+    /**
+     * @see Condition#signal()
+     */
+    public void signal() {
+      _condition.signal();
+    }
+
+    /**
+     * @see Condition#signalAll()
+     */
+    public void signalAll() {
+      _condition.signalAll();
+    }
+
+    /**
+     * @see Condition#await()
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public void await() throws InterruptedException {
+      _condition.await();
+    }
+
+    /**
+     * @see Condition#await(long, TimeUnit)
+     * @param time the time to wait
+     * @param unit the time unit
+     * @return false if the waiting time elapsed
+     * @throws InterruptedException if the current thread is interrupted
+     */
+    public boolean await(long time, TimeUnit unit) throws InterruptedException 
{
+      return _condition.await(time, unit);
+    }
+  }
+}
diff --git 
a/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java
 
b/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java
index ef7a9212400..7edea3ec7ee 100644
--- 
a/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java
+++ 
b/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java
@@ -31,6 +31,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Queue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -48,10 +49,6 @@ import org.eclipse.jetty.util.IO;
 import org.eclipse.jetty.util.log.Log;
 import org.eclipse.jetty.util.log.Logger;
 
-// nocommit: we need to switch from using 'Object lock' to using a 
ReentrantLock w/Condition we can await on
-// nocommit: safest plan is to adopt all changes in 
https://github.com/eclipse/jetty.project/pull/7260 as is ...
-// nocommit: ...(while also pulling in "AutoLock") and add requestTimeout 
logic to await() call
-
 /**
  * Fork of jetty's <code>InputStreamResponseListener</code> that adds support 
for an (optional)
  * <code>requestTimeout</code> (which defaults to 1 hour from instantiation) 
as well as a
@@ -64,7 +61,8 @@ import org.eclipse.jetty.util.log.Logger;
  * <p>Typical usage is:
  *
  * <pre>
- * InputStreamResponseListener listener = new InputStreamResponseListener();
+ * long maxWaitLimit = 5000;
+ * InputStreamResponseListener listener = new 
InputStreamResponseListener(maxWaitLimit);
  * client.newRequest(...).send(listener);
  *
  * // Wait for the response headers to arrive
@@ -85,12 +83,14 @@ import org.eclipse.jetty.util.log.Logger;
  * <p>If the consumer is faster than the producer, then the consumer will 
block with the typical
  * {@link InputStream#read()} semantic. If the consumer is slower than the 
producer, then the
  * producer will block until the client consumes.
+ *
+ * @see <a href="https://github.com/eclipse/jetty.project/pull/7260";>Jetty 
PR#7260</a>
  */
 public class InputStreamResponseListener extends Listener.Adapter {
   private static final Logger log = 
Log.getLogger(InputStreamResponseListener.class);
   private static final DeferredContentProvider.Chunk EOF =
       new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, 
Callback.NOOP);
-  private final Object lock = this; 
+  private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
   private final CountDownLatch responseLatch = new CountDownLatch(1);
   private final CountDownLatch resultLatch = new CountDownLatch(1);
   private final AtomicReference<InputStream> stream = new AtomicReference<>();
@@ -121,22 +121,29 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
    * will throw an {@link IOException} wrapping a {@link TimeoutException}. 
Defaults to 1 HOUR from
    * when this Listener was constructed.
    *
+   * <p><b>NOTE:</b> This timeout is only checked when the caller is blocked 
waiting for more data
+   * to be recieved, it will not cause any failures in situations where the 
caller is slower to
+   * consume the content then the remote server is to provided it.
+   *
    * @param requestTimeout Instant past which all response chunks must be 
recieved, if null then
    *     {@link Instant#MAX} is used
+   * @see #getInputStream
    */
   public void setRequestTimeout(final Instant requestTimeout) {
     requestTimeoutRef.set(null == requestTimeout ? Instant.MAX : 
requestTimeout);
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onHeaders(Response response) {
-    synchronized (lock) {
+    try (AutoLock l = lock.lock()) {
       this.response = response;
       responseLatch.countDown();
     }
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onContent(Response response, ByteBuffer content, Callback 
callback) {
     if (content.remaining() == 0) {
       if (log.isDebugEnabled()) {
@@ -147,14 +154,14 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
     }
 
     boolean closed;
-    synchronized (lock) {
+    try (AutoLock.WithCondition l = lock.lock()) {
       closed = this.closed;
       if (!closed) {
         if (log.isDebugEnabled()) {
           log.debug("Queueing content {}", content);
         }
         chunks.add(new DeferredContentProvider.Chunk(content, callback));
-        lock.notifyAll();
+        l.signalAll();
       }
     }
 
@@ -167,10 +174,11 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onSuccess(Response response) {
-    synchronized (lock) {
+    try (AutoLock.WithCondition l = lock.lock()) {
       if (!closed) chunks.add(EOF);
-      lock.notifyAll();
+      l.signalAll();
     }
 
     if (log.isDebugEnabled()) {
@@ -179,27 +187,34 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onFailure(Response response, Throwable failure) {
     List<Callback> callbacks;
-    synchronized (lock) {
+    try (AutoLock.WithCondition l = lock.lock()) {
       if (this.failure != null) return;
+      if (failure == null) {
+        failure = new IOException("Generic failure");
+        log.warn("Missing failure in onFailure() callback", failure);
+      }
       this.failure = failure;
       callbacks = drain();
-      lock.notifyAll();
+      l.signalAll();
     }
 
     if (log.isDebugEnabled()) {
       log.debug("Content failure", failure);
     }
 
-    callbacks.forEach(callback -> callback.failed(failure));
+    Throwable f = failure;
+    callbacks.forEach(callback -> callback.failed(f));
   }
 
   @Override
+  @SuppressWarnings("try")
   public void onComplete(Result result) {
     Throwable failure = result.getFailure();
     List<Callback> callbacks = Collections.emptyList();
-    synchronized (lock) {
+    try (AutoLock.WithCondition l = lock.lock()) {
       this.result = result;
       if (result.isFailed() && this.failure == null) {
         this.failure = failure;
@@ -208,12 +223,15 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
       // Notify the response latch in case of request failures.
       responseLatch.countDown();
       resultLatch.countDown();
-      lock.notifyAll();
+      l.signalAll();
     }
 
     if (log.isDebugEnabled()) {
-      if (failure == null) log.debug("Result success");
-      else log.debug("Result failure", failure);
+      if (failure == null) {
+        log.debug("Result success");
+      } else {
+        log.debug("Result failure", failure);
+      }
     }
 
     callbacks.forEach(callback -> callback.failed(failure));
@@ -232,11 +250,12 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
    * @throws TimeoutException if the timeout expires
    * @throws ExecutionException if a failure happened
    */
+  @SuppressWarnings("try")
   public Response get(long timeout, TimeUnit unit)
       throws InterruptedException, TimeoutException, ExecutionException {
     boolean expired = !responseLatch.await(timeout, unit);
     if (expired) throw new TimeoutException();
-    synchronized (lock) {
+    try (AutoLock l = lock.lock()) {
       // If the request failed there is no response.
       if (response == null) throw new ExecutionException(failure);
       return response;
@@ -256,10 +275,11 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
    * @throws TimeoutException if the timeout expires
    * @see #get(long, TimeUnit)
    */
+  @SuppressWarnings("try")
   public Result await(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException {
     boolean expired = !resultLatch.await(timeout, unit);
     if (expired) throw new TimeoutException();
-    synchronized (lock) {
+    try (AutoLock l = lock.lock()) {
       return result;
     }
   }
@@ -270,6 +290,11 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
    * <p>The method may be invoked only once; subsequent invocations will 
return a closed {@link
    * InputStream}.
    *
+   * <p>{@link InputStream#read} calls on this <code>InputStream</code> may 
block up to the
+   * configured <code>maxWaitLimit</code> or until the {@link 
#setRequestTimeout} (which ever is
+   * sooner) if no data is currently available, at which point an {@link 
IOException} wrapping a
+   * {@link TimeoutException} wll be thrown
+   *
    * @return an input stream providing the response content
    */
   public InputStream getInputStream() {
@@ -278,9 +303,10 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
     return IO.getClosedStream();
   }
 
+  @SuppressWarnings("try")
   private List<Callback> drain() {
     List<Callback> callbacks = new ArrayList<>();
-    synchronized (lock) {
+    try (AutoLock l = lock.lock()) {
       while (true) {
         DeferredContentProvider.Chunk chunk = chunks.peek();
         if (chunk == null || chunk == EOF) break;
@@ -291,6 +317,23 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
     return callbacks;
   }
 
+  @Override
+  @SuppressWarnings("try")
+  public String toString() {
+    try (AutoLock l = lock.lock()) {
+      return String.format(
+          Locale.ROOT,
+          "%s@%x[response=%s,result=%s,closed=%b,failure=%s,chunks=%s]",
+          getClass().getSimpleName(),
+          hashCode(),
+          response,
+          result,
+          closed,
+          failure,
+          chunks);
+    }
+  }
+
   private class Input extends InputStream {
     @Override
     public int read() throws IOException {
@@ -300,17 +343,49 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
       return tmp[0] & 0xFF;
     }
 
+    /**
+     * awaits on the condition until either <code>maxWaitLimit</code> or 
<code>requestTimeout</code>
+     * is reached (whichever is sooner)
+     *
+     * @return an explantion as to why the condition wait expired, or null if 
the condition was met
+     *     in a timely manner
+     */
+    private final String awaitOrReturnError(final AutoLock.WithCondition 
condition)
+        throws InterruptedException {
+      final Instant effectiveNow = Instant.now();
+      final Instant requestTimeout = requestTimeoutRef.get();
+      assert null != requestTimeout;
+
+      if (effectiveNow.isBefore(requestTimeout)) {
+        // NOTE: convert maxWaitLimit to Instant for comparison, rather then 
vice-versa, so we
+        // don't risk ArithemticException.  (await in MILLIS instead of NANOS 
for same reason)
+        final long awaitAmountMillis =
+            effectiveNow.plusMillis(maxWaitLimit).isBefore(requestTimeout)
+                ? maxWaitLimit
+                : Math.min(1L, Duration.between(effectiveNow, 
requestTimeout).toMillis());
+
+        if (condition.await(awaitAmountMillis, TimeUnit.MILLISECONDS)) {
+          return null;
+        } else {
+          return (awaitAmountMillis < maxWaitLimit ? "requestTimeout" : 
"maxWaitLimit")
+              + " exceeded";
+        }
+      } // else...
+
+      // we've already reached (or exceeded) requestTimeout w/o any waiting
+      return "requestTimeout exceeded";
+    }
+
     @Override
     public int read(byte[] b, int offset, int length) throws IOException {
       try {
-        int result;
+        int result = 0;
         Callback callback = null;
-        synchronized (lock) {
+        List<Callback> callbacks = Collections.emptyList();
+        Throwable timeoutFailure = null;
+        try (AutoLock.WithCondition l = lock.lock()) {
           DeferredContentProvider.Chunk chunk;
           while (true) {
-            final Instant requestTimeout = requestTimeoutRef.get();
-            assert null != requestTimeout;
-
             chunk = chunks.peek();
             if (chunk == EOF) return -1;
 
@@ -320,35 +395,38 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
 
             if (closed) throw new AsynchronousCloseException();
 
-            // nocommit: this check shouldn't be needed/used here - the await 
call should tell us to Timeout
-            if (requestTimeout.isBefore(Instant.now()))
-              throw new TimeoutException("requestTimeout exceeded");
-
-            // NOTE: convert maxWaitLimit to Instant for comparison, rather 
then vice-versa, so we
-            // don't risk ArithemticException
-            final Instant now = Instant.now();
-            // nocommit: replace this with await, if result is false throw 
TimeoutException...
-            // nocommit: ...exception message should mention waitLimit, unless 
requestTime.isBefore(now())
-            lock.wait(
-                now.plusMillis(maxWaitLimit).isBefore(requestTimeout)
-                    ? maxWaitLimit
-                    : Duration.between(Instant.now(), 
requestTimeout).toMillis());
+            final String expirationReason = awaitOrReturnError(l);
+            if (null != expirationReason) {
+              if (log.isDebugEnabled()) {
+                log.debug(
+                    "Read timed out: {}, {}", expirationReason, 
InputStreamResponseListener.this);
+              }
+              failure = timeoutFailure = new TimeoutException("Read timeout: " 
+ expirationReason);
+              callbacks = drain();
+              break;
+            }
           }
 
-          ByteBuffer buffer = chunk.buffer;
-          result = Math.min(buffer.remaining(), length);
-          buffer.get(b, offset, result);
-          if (!buffer.hasRemaining()) {
-            callback = chunk.callback;
-            chunks.poll();
+          if (timeoutFailure == null) {
+            ByteBuffer buffer = chunk.buffer;
+            result = Math.min(buffer.remaining(), length);
+            buffer.get(b, offset, result);
+            if (!buffer.hasRemaining()) {
+              callback = chunk.callback;
+              chunks.poll();
+            }
           }
         }
-        if (callback != null) callback.succeeded();
-        return result;
+        if (timeoutFailure == null) {
+          if (callback != null) callback.succeeded();
+          return result;
+        } else {
+          Throwable f = timeoutFailure;
+          callbacks.forEach(c -> c.failed(f));
+          throw toIOException(f);
+        }
       } catch (InterruptedException x) {
         throw new InterruptedIOException();
-      } catch (TimeoutException y) {
-        throw toIOException(y);
       }
     }
 
@@ -360,11 +438,11 @@ public class InputStreamResponseListener extends 
Listener.Adapter {
     @Override
     public void close() throws IOException {
       List<Callback> callbacks;
-      synchronized (lock) {
+      try (AutoLock.WithCondition l = lock.lock()) {
         if (closed) return;
         closed = true;
         callbacks = drain();
-        lock.notifyAll();
+        l.signalAll();
       }
 
       if (log.isDebugEnabled()) {
diff --git 
a/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java
 
b/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java
index 742fbce0a81..b3477295680 100644
--- 
a/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java
+++ 
b/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java
@@ -16,97 +16,111 @@
  */
 package org.apache.solr.client.solrj.util;
 
-import java.io.InputStream;
+import static org.hamcrest.core.StringContains.containsString;
+
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
-
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.TimeoutException;
 import org.apache.solr.SolrTestCase;
-import org.apache.solr.client.solrj.impl.HttpClientUtil;
-
 import org.eclipse.jetty.client.HttpResponse;
 import org.eclipse.jetty.util.Callback;
 
-import static org.hamcrest.core.StringContains.containsString;
-
 public class TestInputStreamResponseListener extends SolrTestCase {
 
   public void testNoDataTriggersWaitLimit() throws Exception {
     final long waitLimit = 1000; // millis
     final InputStreamResponseListener listener = new 
InputStreamResponseListener(waitLimit);
+    listener.setRequestTimeout(null);
 
-    // nocommit: we should be able to use a null requestTimeout to pass this 
test....
-    // nocommit: (the waitLimit should be enough to trigger failure)
-    listener.setRequestTimeout(Instant.now()); // nocommit
-    // nocommit: // listener.setRequestTimeout(null);
-    
     // emulate low level transport code providing headers, and then nothing 
else...
-    final HttpResponse dummyResponse = new HttpResponse(null /* bogus request 
*/, Collections.emptyList());
+    final HttpResponse dummyResponse =
+        new HttpResponse(null /* bogus request */, Collections.emptyList());
     listener.onHeaders(dummyResponse);
 
     // client tries to consume, but there is never any content...
     assertEquals(dummyResponse, listener.get(0, TimeUnit.SECONDS));
-    final ForkJoinTask<IOException> readTask = 
ForkJoinPool.commonPool().submit(() -> {
-        try (final InputStream stream = listener.getInputStream()) {
-          return expectThrows(IOException.class, () -> {
-              int trash = stream.read();
-            });
-        }
-      });
-    final IOException expected = readTask.get(waitLimit * 2L, 
TimeUnit.MILLISECONDS);
+    final ForkJoinTask<IOException> readTask =
+        ForkJoinPool.commonPool()
+            .submit(
+                () -> {
+                  // (Do this in a ForkJoin thread so we can easily fail test 
if read() doesn't
+                  // throw IOException in a timely manner)
+                  try (final InputStream stream = listener.getInputStream()) {
+                    return expectThrows(
+                        IOException.class,
+                        () -> {
+                          int trash = stream.read();
+                        });
+                  }
+                });
+    final IOException expected = readTask.get(waitLimit * 3L, 
TimeUnit.MILLISECONDS);
     assertNotNull(expected.getCause());
     assertEquals(TimeoutException.class, expected.getCause().getClass());
-
-    // nocommit: this should be something about waitLimit...
-    assertThat(expected.getCause().getMessage(), 
containsString("requestTimeout exceeded"));
+    assertThat(expected.getCause().getMessage(), containsString("maxWaitLimit 
exceeded"));
   }
 
-
-      
   public void testReallySlowDataTriggersRequestTimeout() throws Exception {
-    final long writeDelayMillies = 500;
-    final InputStreamResponseListener listener = new 
InputStreamResponseListener(writeDelayMillies * 2);
-    
-    // emulate low level transport code providing headers, and then writes a 
(slow) never ending stream of bytes
-    final HttpResponse dummyResponse = new HttpResponse(null /* bogus request 
*/, Collections.emptyList());
+    final long writeDelayMillies = 100;
+    // crazy long maxWaitLimit relative to how often new data should be 
available
+    final InputStreamResponseListener listener =
+        new InputStreamResponseListener(5 * writeDelayMillies);
+
+    // emulate low level transport code providing headers, and then writes a 
(slow) never ending
+    // stream of bytes
+    final HttpResponse dummyResponse =
+        new HttpResponse(null /* bogus request */, Collections.emptyList());
     listener.onHeaders(dummyResponse);
-    final CountDownLatch writeTaskCloseLatch = new CountDownLatch(1);
+    final CountDownLatch closeLatch = new CountDownLatch(1);
     try {
-      final ForkJoinTask<Boolean> writeTask = 
ForkJoinPool.commonPool().submit(() -> {
-          final ByteBuffer dataToWriteForever = ByteBuffer.allocate(5);
-          while (0 < writeTaskCloseLatch.getCount()) {
-            dataToWriteForever.position(0);
-            listener.onContent(dummyResponse, dataToWriteForever, 
Callback.NOOP);
-            Thread.sleep(writeDelayMillies);
-          }
-          return true;
-        });
+      final ForkJoinTask<Boolean> writeTask =
+          ForkJoinPool.commonPool()
+              .submit(
+                  () -> {
+                    final ByteBuffer dataToWriteForever = 
ByteBuffer.allocate(5);
+                    while (0 < closeLatch.getCount()) {
+                      dataToWriteForever.position(0);
+                      listener.onContent(dummyResponse, dataToWriteForever, 
Callback.NOOP);
+                      Thread.sleep(writeDelayMillies);
+                    }
+                    return true;
+                  });
 
       // client reads "forever" ... until read times out because 
requestTimeout exceeded
       assertEquals(dummyResponse, listener.get(0, TimeUnit.SECONDS));
-      final IOException expected = expectThrows(IOException.class, () -> {
-          final Instant requestTimeout = Instant.now().plus(1, 
ChronoUnit.MINUTES);
-          listener.setRequestTimeout(requestTimeout);
-          final Instant forever = requestTimeout.plusSeconds(60);
-          try (final InputStream stream = listener.getInputStream()) {
-            while (Instant.now().isBefore(forever)) {
-              int trash = stream.read(); // this should eventually throw an 
exception
-            }
-          }
-        });
+      final ForkJoinTask<IOException> readTask =
+          ForkJoinPool.commonPool()
+              .submit(
+                  () -> {
+                    // (Do this in a ForkJoin thread so we can easily fail 
test if read() doesn't
+                    // throw IOException in a timely manner)
+                    return expectThrows(
+                        IOException.class,
+                        () -> {
+                          final Instant requestTimeout = Instant.now().plus(5, 
ChronoUnit.SECONDS);
+                          listener.setRequestTimeout(requestTimeout);
+                          final Instant forever = 
requestTimeout.plusSeconds(60);
+                          try (final InputStream stream = 
listener.getInputStream()) {
+                            while (0 < closeLatch.getCount()) {
+                              int trash =
+                                  stream.read(); // this should eventually 
throw an exception
+                            }
+                          }
+                        });
+                  });
+      final IOException expected = readTask.get(10, TimeUnit.SECONDS);
       assertNotNull(expected.getCause());
       assertEquals(TimeoutException.class, expected.getCause().getClass());
       assertThat(expected.getCause().getMessage(), 
containsString("requestTimeout exceeded"));
     } finally {
-      writeTaskCloseLatch.countDown();
+      closeLatch.countDown();
     }
   }
 }

Reply via email to