Repository: kafka Updated Branches: refs/heads/trunk 1f6494202 -> b411f57c1
KAFKA-6026; Fix for indefinite wait in KafkaFutureImpl Author: bartdevylder <bartdevyl...@gmail.com> Author: Bart De Vylder <bartdevyl...@gmail.com> Reviewers: Colin P. Mccabe <cmcc...@confluent.io>, Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #4044 from bartdevylder/KAFKA-6026 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b411f57c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b411f57c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b411f57c Branch: refs/heads/trunk Commit: b411f57c1ce82d66ec2c837349a54357b322e803 Parents: 1f64942 Author: bartdevylder <bartdevyl...@gmail.com> Authored: Mon Oct 9 17:49:52 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Oct 9 17:57:13 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/internals/KafkaFutureImpl.java | 4 ++-- .../test/java/org/apache/kafka/common/KafkaFutureTest.java | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b411f57c/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java index cb97e87..9ca019b 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java @@ -96,7 +96,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> { R await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long startMs = System.currentTimeMillis(); - long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1; + long waitTimeMs = unit.toMillis(timeout); long delta = 0; synchronized (this) { while (true) { @@ -104,7 +104,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> { wrapAndThrow(exception); if (done) return value; - if (delta > waitTimeMs) { + if (delta >= waitTimeMs) { throw new TimeoutException(); } this.wait(waitTimeMs - delta); http://git-wip-us.apache.org/repos/asf/kafka/blob/b411f57c/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java index 7d29bc5..71f3c3c 100644 --- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; @@ -173,4 +174,11 @@ public class KafkaFutureTest { assertFalse(allFuture.isCompletedExceptionally()); allFuture.get(); } + + @Test(expected = TimeoutException.class) + public void testFutureTimeoutWithZeroWait() throws Exception { + final KafkaFutureImpl<String> future = new KafkaFutureImpl<>(); + future.get(0, TimeUnit.MILLISECONDS); + } + }