This is an automated email from the ASF dual-hosted git repository. hossman pushed a commit to branch jira/solr-16129 in repository https://gitbox.apache.org/repos/asf/solr.git
commit ab2b231c6a0be87fb117c4a610a630c5170e3d43 Author: Chris Hostetter <[email protected]> AuthorDate: Mon Apr 4 17:26:36 2022 -0700 SOLR-16129: Use AutoLock in InputStreamResponseListener and resolve test nocommits --- .../apache/solr/client/solrj/util/AutoLock.java | 139 ++++++++++++++++ .../solrj/util/InputStreamResponseListener.java | 180 +++++++++++++++------ .../util/TestInputStreamResponseListener.java | 120 ++++++++------ 3 files changed, 335 insertions(+), 104 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java new file mode 100644 index 00000000000..2b556d8db3d --- /dev/null +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/AutoLock.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others. + +package org.apache.solr.client.solrj.util; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Reentrant lock that can be used in a try-with-resources statement. + * + * <p>Typical usage: + * + * <pre> + * try (AutoLock lock = this.lock.lock()) + * { + * // Something + * } + * </pre> + */ +public class AutoLock implements AutoCloseable, Serializable { + private static final long serialVersionUID = 3300696774541816341L; + + private final ReentrantLock _lock = new ReentrantLock(); + + /** + * Acquires the lock. + * + * @return this AutoLock for unlocking + */ + public AutoLock lock() { + _lock.lock(); + return this; + } + + /** + * @see ReentrantLock#isHeldByCurrentThread() + * @return whether this lock is held by the current thread + */ + public boolean isHeldByCurrentThread() { + return _lock.isHeldByCurrentThread(); + } + + /** + * @return a {@link Condition} associated with this lock + */ + public Condition newCondition() { + return _lock.newCondition(); + } + + // Package-private for testing only. + boolean isLocked() { + return _lock.isLocked(); + } + + @Override + public void close() { + _lock.unlock(); + } + + /** + * A reentrant lock with a condition that can be used in a try-with-resources statement. + * + * <p>Typical usage: + * + * <pre> + * // Waiting + * try (AutoLock lock = _lock.lock()) + * { + * lock.await(); + * } + * + * // Signaling + * try (AutoLock lock = _lock.lock()) + * { + * lock.signalAll(); + * } + * </pre> + */ + public static class WithCondition extends AutoLock { + private final Condition _condition = newCondition(); + + @Override + public AutoLock.WithCondition lock() { + return (WithCondition) super.lock(); + } + + /** + * @see Condition#signal() + */ + public void signal() { + _condition.signal(); + } + + /** + * @see Condition#signalAll() + */ + public void signalAll() { + _condition.signalAll(); + } + + /** + * @see Condition#await() + * @throws InterruptedException if the current thread is interrupted + */ + public void await() throws InterruptedException { + _condition.await(); + } + + /** + * @see Condition#await(long, TimeUnit) + * @param time the time to wait + * @param unit the time unit + * @return false if the waiting time elapsed + * @throws InterruptedException if the current thread is interrupted + */ + public boolean await(long time, TimeUnit unit) throws InterruptedException { + return _condition.await(time, unit); + } + } +} diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java index ef7a9212400..7edea3ec7ee 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/InputStreamResponseListener.java @@ -31,6 +31,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -48,10 +49,6 @@ import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -// nocommit: we need to switch from using 'Object lock' to using a ReentrantLock w/Condition we can await on -// nocommit: safest plan is to adopt all changes in https://github.com/eclipse/jetty.project/pull/7260 as is ... -// nocommit: ...(while also pulling in "AutoLock") and add requestTimeout logic to await() call - /** * Fork of jetty's <code>InputStreamResponseListener</code> that adds support for an (optional) * <code>requestTimeout</code> (which defaults to 1 hour from instantiation) as well as a @@ -64,7 +61,8 @@ import org.eclipse.jetty.util.log.Logger; * <p>Typical usage is: * * <pre> - * InputStreamResponseListener listener = new InputStreamResponseListener(); + * long maxWaitLimit = 5000; + * InputStreamResponseListener listener = new InputStreamResponseListener(maxWaitLimit); * client.newRequest(...).send(listener); * * // Wait for the response headers to arrive @@ -85,12 +83,14 @@ import org.eclipse.jetty.util.log.Logger; * <p>If the consumer is faster than the producer, then the consumer will block with the typical * {@link InputStream#read()} semantic. If the consumer is slower than the producer, then the * producer will block until the client consumes. + * + * @see <a href="https://github.com/eclipse/jetty.project/pull/7260">Jetty PR#7260</a> */ public class InputStreamResponseListener extends Listener.Adapter { private static final Logger log = Log.getLogger(InputStreamResponseListener.class); private static final DeferredContentProvider.Chunk EOF = new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP); - private final Object lock = this; + private final AutoLock.WithCondition lock = new AutoLock.WithCondition(); private final CountDownLatch responseLatch = new CountDownLatch(1); private final CountDownLatch resultLatch = new CountDownLatch(1); private final AtomicReference<InputStream> stream = new AtomicReference<>(); @@ -121,22 +121,29 @@ public class InputStreamResponseListener extends Listener.Adapter { * will throw an {@link IOException} wrapping a {@link TimeoutException}. Defaults to 1 HOUR from * when this Listener was constructed. * + * <p><b>NOTE:</b> This timeout is only checked when the caller is blocked waiting for more data + * to be recieved, it will not cause any failures in situations where the caller is slower to + * consume the content then the remote server is to provided it. + * * @param requestTimeout Instant past which all response chunks must be recieved, if null then * {@link Instant#MAX} is used + * @see #getInputStream */ public void setRequestTimeout(final Instant requestTimeout) { requestTimeoutRef.set(null == requestTimeout ? Instant.MAX : requestTimeout); } @Override + @SuppressWarnings("try") public void onHeaders(Response response) { - synchronized (lock) { + try (AutoLock l = lock.lock()) { this.response = response; responseLatch.countDown(); } } @Override + @SuppressWarnings("try") public void onContent(Response response, ByteBuffer content, Callback callback) { if (content.remaining() == 0) { if (log.isDebugEnabled()) { @@ -147,14 +154,14 @@ public class InputStreamResponseListener extends Listener.Adapter { } boolean closed; - synchronized (lock) { + try (AutoLock.WithCondition l = lock.lock()) { closed = this.closed; if (!closed) { if (log.isDebugEnabled()) { log.debug("Queueing content {}", content); } chunks.add(new DeferredContentProvider.Chunk(content, callback)); - lock.notifyAll(); + l.signalAll(); } } @@ -167,10 +174,11 @@ public class InputStreamResponseListener extends Listener.Adapter { } @Override + @SuppressWarnings("try") public void onSuccess(Response response) { - synchronized (lock) { + try (AutoLock.WithCondition l = lock.lock()) { if (!closed) chunks.add(EOF); - lock.notifyAll(); + l.signalAll(); } if (log.isDebugEnabled()) { @@ -179,27 +187,34 @@ public class InputStreamResponseListener extends Listener.Adapter { } @Override + @SuppressWarnings("try") public void onFailure(Response response, Throwable failure) { List<Callback> callbacks; - synchronized (lock) { + try (AutoLock.WithCondition l = lock.lock()) { if (this.failure != null) return; + if (failure == null) { + failure = new IOException("Generic failure"); + log.warn("Missing failure in onFailure() callback", failure); + } this.failure = failure; callbacks = drain(); - lock.notifyAll(); + l.signalAll(); } if (log.isDebugEnabled()) { log.debug("Content failure", failure); } - callbacks.forEach(callback -> callback.failed(failure)); + Throwable f = failure; + callbacks.forEach(callback -> callback.failed(f)); } @Override + @SuppressWarnings("try") public void onComplete(Result result) { Throwable failure = result.getFailure(); List<Callback> callbacks = Collections.emptyList(); - synchronized (lock) { + try (AutoLock.WithCondition l = lock.lock()) { this.result = result; if (result.isFailed() && this.failure == null) { this.failure = failure; @@ -208,12 +223,15 @@ public class InputStreamResponseListener extends Listener.Adapter { // Notify the response latch in case of request failures. responseLatch.countDown(); resultLatch.countDown(); - lock.notifyAll(); + l.signalAll(); } if (log.isDebugEnabled()) { - if (failure == null) log.debug("Result success"); - else log.debug("Result failure", failure); + if (failure == null) { + log.debug("Result success"); + } else { + log.debug("Result failure", failure); + } } callbacks.forEach(callback -> callback.failed(failure)); @@ -232,11 +250,12 @@ public class InputStreamResponseListener extends Listener.Adapter { * @throws TimeoutException if the timeout expires * @throws ExecutionException if a failure happened */ + @SuppressWarnings("try") public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { boolean expired = !responseLatch.await(timeout, unit); if (expired) throw new TimeoutException(); - synchronized (lock) { + try (AutoLock l = lock.lock()) { // If the request failed there is no response. if (response == null) throw new ExecutionException(failure); return response; @@ -256,10 +275,11 @@ public class InputStreamResponseListener extends Listener.Adapter { * @throws TimeoutException if the timeout expires * @see #get(long, TimeUnit) */ + @SuppressWarnings("try") public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { boolean expired = !resultLatch.await(timeout, unit); if (expired) throw new TimeoutException(); - synchronized (lock) { + try (AutoLock l = lock.lock()) { return result; } } @@ -270,6 +290,11 @@ public class InputStreamResponseListener extends Listener.Adapter { * <p>The method may be invoked only once; subsequent invocations will return a closed {@link * InputStream}. * + * <p>{@link InputStream#read} calls on this <code>InputStream</code> may block up to the + * configured <code>maxWaitLimit</code> or until the {@link #setRequestTimeout} (which ever is + * sooner) if no data is currently available, at which point an {@link IOException} wrapping a + * {@link TimeoutException} wll be thrown + * * @return an input stream providing the response content */ public InputStream getInputStream() { @@ -278,9 +303,10 @@ public class InputStreamResponseListener extends Listener.Adapter { return IO.getClosedStream(); } + @SuppressWarnings("try") private List<Callback> drain() { List<Callback> callbacks = new ArrayList<>(); - synchronized (lock) { + try (AutoLock l = lock.lock()) { while (true) { DeferredContentProvider.Chunk chunk = chunks.peek(); if (chunk == null || chunk == EOF) break; @@ -291,6 +317,23 @@ public class InputStreamResponseListener extends Listener.Adapter { return callbacks; } + @Override + @SuppressWarnings("try") + public String toString() { + try (AutoLock l = lock.lock()) { + return String.format( + Locale.ROOT, + "%s@%x[response=%s,result=%s,closed=%b,failure=%s,chunks=%s]", + getClass().getSimpleName(), + hashCode(), + response, + result, + closed, + failure, + chunks); + } + } + private class Input extends InputStream { @Override public int read() throws IOException { @@ -300,17 +343,49 @@ public class InputStreamResponseListener extends Listener.Adapter { return tmp[0] & 0xFF; } + /** + * awaits on the condition until either <code>maxWaitLimit</code> or <code>requestTimeout</code> + * is reached (whichever is sooner) + * + * @return an explantion as to why the condition wait expired, or null if the condition was met + * in a timely manner + */ + private final String awaitOrReturnError(final AutoLock.WithCondition condition) + throws InterruptedException { + final Instant effectiveNow = Instant.now(); + final Instant requestTimeout = requestTimeoutRef.get(); + assert null != requestTimeout; + + if (effectiveNow.isBefore(requestTimeout)) { + // NOTE: convert maxWaitLimit to Instant for comparison, rather then vice-versa, so we + // don't risk ArithemticException. (await in MILLIS instead of NANOS for same reason) + final long awaitAmountMillis = + effectiveNow.plusMillis(maxWaitLimit).isBefore(requestTimeout) + ? maxWaitLimit + : Math.min(1L, Duration.between(effectiveNow, requestTimeout).toMillis()); + + if (condition.await(awaitAmountMillis, TimeUnit.MILLISECONDS)) { + return null; + } else { + return (awaitAmountMillis < maxWaitLimit ? "requestTimeout" : "maxWaitLimit") + + " exceeded"; + } + } // else... + + // we've already reached (or exceeded) requestTimeout w/o any waiting + return "requestTimeout exceeded"; + } + @Override public int read(byte[] b, int offset, int length) throws IOException { try { - int result; + int result = 0; Callback callback = null; - synchronized (lock) { + List<Callback> callbacks = Collections.emptyList(); + Throwable timeoutFailure = null; + try (AutoLock.WithCondition l = lock.lock()) { DeferredContentProvider.Chunk chunk; while (true) { - final Instant requestTimeout = requestTimeoutRef.get(); - assert null != requestTimeout; - chunk = chunks.peek(); if (chunk == EOF) return -1; @@ -320,35 +395,38 @@ public class InputStreamResponseListener extends Listener.Adapter { if (closed) throw new AsynchronousCloseException(); - // nocommit: this check shouldn't be needed/used here - the await call should tell us to Timeout - if (requestTimeout.isBefore(Instant.now())) - throw new TimeoutException("requestTimeout exceeded"); - - // NOTE: convert maxWaitLimit to Instant for comparison, rather then vice-versa, so we - // don't risk ArithemticException - final Instant now = Instant.now(); - // nocommit: replace this with await, if result is false throw TimeoutException... - // nocommit: ...exception message should mention waitLimit, unless requestTime.isBefore(now()) - lock.wait( - now.plusMillis(maxWaitLimit).isBefore(requestTimeout) - ? maxWaitLimit - : Duration.between(Instant.now(), requestTimeout).toMillis()); + final String expirationReason = awaitOrReturnError(l); + if (null != expirationReason) { + if (log.isDebugEnabled()) { + log.debug( + "Read timed out: {}, {}", expirationReason, InputStreamResponseListener.this); + } + failure = timeoutFailure = new TimeoutException("Read timeout: " + expirationReason); + callbacks = drain(); + break; + } } - ByteBuffer buffer = chunk.buffer; - result = Math.min(buffer.remaining(), length); - buffer.get(b, offset, result); - if (!buffer.hasRemaining()) { - callback = chunk.callback; - chunks.poll(); + if (timeoutFailure == null) { + ByteBuffer buffer = chunk.buffer; + result = Math.min(buffer.remaining(), length); + buffer.get(b, offset, result); + if (!buffer.hasRemaining()) { + callback = chunk.callback; + chunks.poll(); + } } } - if (callback != null) callback.succeeded(); - return result; + if (timeoutFailure == null) { + if (callback != null) callback.succeeded(); + return result; + } else { + Throwable f = timeoutFailure; + callbacks.forEach(c -> c.failed(f)); + throw toIOException(f); + } } catch (InterruptedException x) { throw new InterruptedIOException(); - } catch (TimeoutException y) { - throw toIOException(y); } } @@ -360,11 +438,11 @@ public class InputStreamResponseListener extends Listener.Adapter { @Override public void close() throws IOException { List<Callback> callbacks; - synchronized (lock) { + try (AutoLock.WithCondition l = lock.lock()) { if (closed) return; closed = true; callbacks = drain(); - lock.notifyAll(); + l.signalAll(); } if (log.isDebugEnabled()) { diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java b/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java index 742fbce0a81..b3477295680 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/util/TestInputStreamResponseListener.java @@ -16,97 +16,111 @@ */ package org.apache.solr.client.solrj.util; -import java.io.InputStream; +import static org.hamcrest.core.StringContains.containsString; + import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; - import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; -import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.TimeoutException; import org.apache.solr.SolrTestCase; -import org.apache.solr.client.solrj.impl.HttpClientUtil; - import org.eclipse.jetty.client.HttpResponse; import org.eclipse.jetty.util.Callback; -import static org.hamcrest.core.StringContains.containsString; - public class TestInputStreamResponseListener extends SolrTestCase { public void testNoDataTriggersWaitLimit() throws Exception { final long waitLimit = 1000; // millis final InputStreamResponseListener listener = new InputStreamResponseListener(waitLimit); + listener.setRequestTimeout(null); - // nocommit: we should be able to use a null requestTimeout to pass this test.... - // nocommit: (the waitLimit should be enough to trigger failure) - listener.setRequestTimeout(Instant.now()); // nocommit - // nocommit: // listener.setRequestTimeout(null); - // emulate low level transport code providing headers, and then nothing else... - final HttpResponse dummyResponse = new HttpResponse(null /* bogus request */, Collections.emptyList()); + final HttpResponse dummyResponse = + new HttpResponse(null /* bogus request */, Collections.emptyList()); listener.onHeaders(dummyResponse); // client tries to consume, but there is never any content... assertEquals(dummyResponse, listener.get(0, TimeUnit.SECONDS)); - final ForkJoinTask<IOException> readTask = ForkJoinPool.commonPool().submit(() -> { - try (final InputStream stream = listener.getInputStream()) { - return expectThrows(IOException.class, () -> { - int trash = stream.read(); - }); - } - }); - final IOException expected = readTask.get(waitLimit * 2L, TimeUnit.MILLISECONDS); + final ForkJoinTask<IOException> readTask = + ForkJoinPool.commonPool() + .submit( + () -> { + // (Do this in a ForkJoin thread so we can easily fail test if read() doesn't + // throw IOException in a timely manner) + try (final InputStream stream = listener.getInputStream()) { + return expectThrows( + IOException.class, + () -> { + int trash = stream.read(); + }); + } + }); + final IOException expected = readTask.get(waitLimit * 3L, TimeUnit.MILLISECONDS); assertNotNull(expected.getCause()); assertEquals(TimeoutException.class, expected.getCause().getClass()); - - // nocommit: this should be something about waitLimit... - assertThat(expected.getCause().getMessage(), containsString("requestTimeout exceeded")); + assertThat(expected.getCause().getMessage(), containsString("maxWaitLimit exceeded")); } - - public void testReallySlowDataTriggersRequestTimeout() throws Exception { - final long writeDelayMillies = 500; - final InputStreamResponseListener listener = new InputStreamResponseListener(writeDelayMillies * 2); - - // emulate low level transport code providing headers, and then writes a (slow) never ending stream of bytes - final HttpResponse dummyResponse = new HttpResponse(null /* bogus request */, Collections.emptyList()); + final long writeDelayMillies = 100; + // crazy long maxWaitLimit relative to how often new data should be available + final InputStreamResponseListener listener = + new InputStreamResponseListener(5 * writeDelayMillies); + + // emulate low level transport code providing headers, and then writes a (slow) never ending + // stream of bytes + final HttpResponse dummyResponse = + new HttpResponse(null /* bogus request */, Collections.emptyList()); listener.onHeaders(dummyResponse); - final CountDownLatch writeTaskCloseLatch = new CountDownLatch(1); + final CountDownLatch closeLatch = new CountDownLatch(1); try { - final ForkJoinTask<Boolean> writeTask = ForkJoinPool.commonPool().submit(() -> { - final ByteBuffer dataToWriteForever = ByteBuffer.allocate(5); - while (0 < writeTaskCloseLatch.getCount()) { - dataToWriteForever.position(0); - listener.onContent(dummyResponse, dataToWriteForever, Callback.NOOP); - Thread.sleep(writeDelayMillies); - } - return true; - }); + final ForkJoinTask<Boolean> writeTask = + ForkJoinPool.commonPool() + .submit( + () -> { + final ByteBuffer dataToWriteForever = ByteBuffer.allocate(5); + while (0 < closeLatch.getCount()) { + dataToWriteForever.position(0); + listener.onContent(dummyResponse, dataToWriteForever, Callback.NOOP); + Thread.sleep(writeDelayMillies); + } + return true; + }); // client reads "forever" ... until read times out because requestTimeout exceeded assertEquals(dummyResponse, listener.get(0, TimeUnit.SECONDS)); - final IOException expected = expectThrows(IOException.class, () -> { - final Instant requestTimeout = Instant.now().plus(1, ChronoUnit.MINUTES); - listener.setRequestTimeout(requestTimeout); - final Instant forever = requestTimeout.plusSeconds(60); - try (final InputStream stream = listener.getInputStream()) { - while (Instant.now().isBefore(forever)) { - int trash = stream.read(); // this should eventually throw an exception - } - } - }); + final ForkJoinTask<IOException> readTask = + ForkJoinPool.commonPool() + .submit( + () -> { + // (Do this in a ForkJoin thread so we can easily fail test if read() doesn't + // throw IOException in a timely manner) + return expectThrows( + IOException.class, + () -> { + final Instant requestTimeout = Instant.now().plus(5, ChronoUnit.SECONDS); + listener.setRequestTimeout(requestTimeout); + final Instant forever = requestTimeout.plusSeconds(60); + try (final InputStream stream = listener.getInputStream()) { + while (0 < closeLatch.getCount()) { + int trash = + stream.read(); // this should eventually throw an exception + } + } + }); + }); + final IOException expected = readTask.get(10, TimeUnit.SECONDS); assertNotNull(expected.getCause()); assertEquals(TimeoutException.class, expected.getCause().getClass()); assertThat(expected.getCause().getMessage(), containsString("requestTimeout exceeded")); } finally { - writeTaskCloseLatch.countDown(); + closeLatch.countDown(); } } }
