Repository: kafka
Updated Branches:
  refs/heads/trunk 6496271a1 -> f6fee34a2


KAFKA-4627; Fix timing issue in consumer close tests

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Apurva Mehta <apurva.1...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>

Closes #2367 from rajinisivaram/KAFKA-4627


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6fee34a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6fee34a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6fee34a

Branch: refs/heads/trunk
Commit: f6fee34a2d6315cd25855b9805e2f4129e24e5c3
Parents: 6496271
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Fri Jan 13 11:09:26 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jan 13 11:09:26 2017 -0800

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/KafkaConsumer.java  |  3 ++-
 .../java/org/apache/kafka/clients/MockClient.java     | 14 +++++++++++++-
 .../kafka/clients/consumer/KafkaConsumerTest.java     |  6 +++++-
 .../consumer/internals/ConsumerCoordinatorTest.java   |  9 +++++++--
 4 files changed, 27 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f6fee34a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e0fbb74..d3bdf6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1547,7 +1547,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         AtomicReference<Throwable> firstException = new AtomicReference<>();
         this.closed = true;
         try {
-            coordinator.close(Math.min(timeoutMs, requestTimeoutMs));
+            if (coordinator != null)
+                coordinator.close(Math.min(timeoutMs, requestTimeoutMs));
         } catch (Throwable t) {
             firstException.compareAndSet(null, t);
             log.error("Failed to close coordinator", t);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6fee34a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index f216b85..bfde9f9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -65,7 +67,8 @@ public class MockClient implements KafkaClient {
     private Node node = null;
     private final Set<String> ready = new HashSet<>();
     private final Map<Node, Long> blackedOut = new HashMap<>();
-    private final Queue<ClientRequest> requests = new ArrayDeque<>();
+    // Use concurrent queue for requests so that requests may be queried from 
a different thread
+    private final Queue<ClientRequest> requests = new 
ConcurrentLinkedDeque<>();
     // Use concurrent queue for responses so that responses may be updated 
during poll() from a different thread.
     private final Queue<ClientResponse> responses = new 
ConcurrentLinkedDeque<>();
     private final Queue<FutureResponse> futureResponses = new ArrayDeque<>();
@@ -254,6 +257,15 @@ public class MockClient implements KafkaClient {
         futureResponses.add(new FutureResponse(response, disconnected, 
matcher, node));
     }
 
+    public void waitForRequests(final int minRequests, long maxWaitMs) throws 
InterruptedException {
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return requests.size() >= minRequests;
+            }
+        }, maxWaitMs, "Expected requests have not been sent");
+    }
+
     public void reset() {
         ready.clear();
         blackedOut.clear();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6fee34a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 28febc1..b426c9a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1207,7 +1207,7 @@ public class KafkaConsumerTest {
             });
 
             // Close task should not complete until commit succeeds or close 
times out
-            // if close timeout is not zero,
+            // if close timeout is not zero.
             try {
                 future.get(100, TimeUnit.MILLISECONDS);
                 if (closeTimeoutMs != 0)
@@ -1216,9 +1216,13 @@ public class KafkaConsumerTest {
                 // Expected exception
             }
 
+            // Ensure close has started and queued at least one more request 
after commitAsync
+            client.waitForRequests(2, 1000);
+
             // In graceful mode, commit response results in close() completing 
immediately without a timeout
             // In non-graceful mode, close() times out without an exception 
even though commit response is pending
             for (int i = 0; i < responses.size(); i++) {
+                client.waitForRequests(1, 1000);
                 client.respondFrom(responses.get(i), coordinator);
                 if (i != responses.size() - 1) {
                     try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f6fee34a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 0637ea4..b533efd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1357,7 +1357,12 @@ public class ConsumerCoordinatorTest {
                     coordinator.close(Math.min(closeTimeoutMs, 
requestTimeoutMs));
                 }
             });
-            Thread.sleep(100);
+            // Wait for close to start. If coordinator is known, wait for 
close to queue
+            // at least one request. Otherwise, sleep for a short time.
+            if (!coordinator.coordinatorUnknown())
+                client.waitForRequests(1, 1000);
+            else
+                Thread.sleep(200);
             if (expectedMinTimeMs > 0) {
                 time.sleep(expectedMinTimeMs - 1);
                 try {
@@ -1395,7 +1400,7 @@ public class ConsumerCoordinatorTest {
             }
         }, new LeaveGroupResponse(Errors.NONE.code()));
 
-        closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
+        coordinator.close();
         assertTrue("Commit not requested", commitRequested.get());
         if (dynamicAssignment)
             assertTrue("Leave group not requested", leaveGroupRequested.get());

Reply via email to