Repository: kafka Updated Branches: refs/heads/trunk 6496271a1 -> f6fee34a2
KAFKA-4627; Fix timing issue in consumer close tests Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Apurva Mehta <apurva.1...@gmail.com>, Jason Gustafson <ja...@confluent.io> Closes #2367 from rajinisivaram/KAFKA-4627 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f6fee34a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f6fee34a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f6fee34a Branch: refs/heads/trunk Commit: f6fee34a2d6315cd25855b9805e2f4129e24e5c3 Parents: 6496271 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Fri Jan 13 11:09:26 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri Jan 13 11:09:26 2017 -0800 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/KafkaConsumer.java | 3 ++- .../java/org/apache/kafka/clients/MockClient.java | 14 +++++++++++++- .../kafka/clients/consumer/KafkaConsumerTest.java | 6 +++++- .../consumer/internals/ConsumerCoordinatorTest.java | 9 +++++++-- 4 files changed, 27 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f6fee34a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index e0fbb74..d3bdf6e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1547,7 +1547,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { AtomicReference<Throwable> firstException = new AtomicReference<>(); this.closed = true; try { - coordinator.close(Math.min(timeoutMs, requestTimeoutMs)); + if (coordinator != null) + coordinator.close(Math.min(timeoutMs, requestTimeoutMs)); } catch (Throwable t) { firstException.compareAndSet(null, t); log.error("Failed to close coordinator", t); http://git-wip-us.apache.org/repos/asf/kafka/blob/f6fee34a/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index f216b85..bfde9f9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; import java.util.ArrayDeque; import java.util.ArrayList; @@ -65,7 +67,8 @@ public class MockClient implements KafkaClient { private Node node = null; private final Set<String> ready = new HashSet<>(); private final Map<Node, Long> blackedOut = new HashMap<>(); - private final Queue<ClientRequest> requests = new ArrayDeque<>(); + // Use concurrent queue for requests so that requests may be queried from a different thread + private final Queue<ClientRequest> requests = new ConcurrentLinkedDeque<>(); // Use concurrent queue for responses so that responses may be updated during poll() from a different thread. private final Queue<ClientResponse> responses = new ConcurrentLinkedDeque<>(); private final Queue<FutureResponse> futureResponses = new ArrayDeque<>(); @@ -254,6 +257,15 @@ public class MockClient implements KafkaClient { futureResponses.add(new FutureResponse(response, disconnected, matcher, node)); } + public void waitForRequests(final int minRequests, long maxWaitMs) throws InterruptedException { + TestUtils.waitForCondition(new TestCondition() { + @Override + public boolean conditionMet() { + return requests.size() >= minRequests; + } + }, maxWaitMs, "Expected requests have not been sent"); + } + public void reset() { ready.clear(); blackedOut.clear(); http://git-wip-us.apache.org/repos/asf/kafka/blob/f6fee34a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 28febc1..b426c9a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1207,7 +1207,7 @@ public class KafkaConsumerTest { }); // Close task should not complete until commit succeeds or close times out - // if close timeout is not zero, + // if close timeout is not zero. try { future.get(100, TimeUnit.MILLISECONDS); if (closeTimeoutMs != 0) @@ -1216,9 +1216,13 @@ public class KafkaConsumerTest { // Expected exception } + // Ensure close has started and queued at least one more request after commitAsync + client.waitForRequests(2, 1000); + // In graceful mode, commit response results in close() completing immediately without a timeout // In non-graceful mode, close() times out without an exception even though commit response is pending for (int i = 0; i < responses.size(); i++) { + client.waitForRequests(1, 1000); client.respondFrom(responses.get(i), coordinator); if (i != responses.size() - 1) { try { http://git-wip-us.apache.org/repos/asf/kafka/blob/f6fee34a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 0637ea4..b533efd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1357,7 +1357,12 @@ public class ConsumerCoordinatorTest { coordinator.close(Math.min(closeTimeoutMs, requestTimeoutMs)); } }); - Thread.sleep(100); + // Wait for close to start. If coordinator is known, wait for close to queue + // at least one request. Otherwise, sleep for a short time. + if (!coordinator.coordinatorUnknown()) + client.waitForRequests(1, 1000); + else + Thread.sleep(200); if (expectedMinTimeMs > 0) { time.sleep(expectedMinTimeMs - 1); try { @@ -1395,7 +1400,7 @@ public class ConsumerCoordinatorTest { } }, new LeaveGroupResponse(Errors.NONE.code())); - closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); + coordinator.close(); assertTrue("Commit not requested", commitRequested.get()); if (dynamicAssignment) assertTrue("Leave group not requested", leaveGroupRequested.get());