Repository: kafka
Updated Branches:
  refs/heads/trunk 1f6494202 -> b411f57c1


KAFKA-6026; Fix for indefinite wait in KafkaFutureImpl

Author: bartdevylder <bartdevyl...@gmail.com>
Author: Bart De Vylder <bartdevyl...@gmail.com>

Reviewers: Colin P. Mccabe <cmcc...@confluent.io>, Ismael Juma 
<ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #4044 from bartdevylder/KAFKA-6026


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b411f57c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b411f57c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b411f57c

Branch: refs/heads/trunk
Commit: b411f57c1ce82d66ec2c837349a54357b322e803
Parents: 1f64942
Author: bartdevylder <bartdevyl...@gmail.com>
Authored: Mon Oct 9 17:49:52 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Oct 9 17:57:13 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/internals/KafkaFutureImpl.java   | 4 ++--
 .../test/java/org/apache/kafka/common/KafkaFutureTest.java   | 8 ++++++++
 2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b411f57c/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
index cb97e87..9ca019b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java
@@ -96,7 +96,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
         R await(long timeout, TimeUnit unit)
                 throws InterruptedException, ExecutionException, 
TimeoutException {
             long startMs = System.currentTimeMillis();
-            long waitTimeMs = (unit.toMillis(timeout) > 0) ? 
unit.toMillis(timeout) : 1;
+            long waitTimeMs = unit.toMillis(timeout);
             long delta = 0;
             synchronized (this) {
                 while (true) {
@@ -104,7 +104,7 @@ public class KafkaFutureImpl<T> extends KafkaFuture<T> {
                         wrapAndThrow(exception);
                     if (done)
                         return value;
-                    if (delta > waitTimeMs) {
+                    if (delta >= waitTimeMs) {
                         throw new TimeoutException();
                     }
                     this.wait(waitTimeMs - delta);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b411f57c/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java 
b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
index 7d29bc5..71f3c3c 100644
--- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
@@ -173,4 +174,11 @@ public class KafkaFutureTest {
         assertFalse(allFuture.isCompletedExceptionally());
         allFuture.get();
     }
+
+    @Test(expected = TimeoutException.class)
+    public void testFutureTimeoutWithZeroWait() throws Exception {
+        final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
+        future.get(0, TimeUnit.MILLISECONDS);
+    }
+
 }

Reply via email to