Repository: kafka
Updated Branches:
  refs/heads/trunk 08b775c32 -> 4a7064d1a


KAFKA-5512; Awake the heartbeat thread when timetoNextHeartbeat is equal to 0

Author: Stephane Roset <steph...@roset.me>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3442 from rosets/KAFKA-5512


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a7064d1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a7064d1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a7064d1

Branch: refs/heads/trunk
Commit: 4a7064d1ae323903a0d1353372aabadabebb1ed6
Parents: 08b775c
Author: Stephane Roset <steph...@roset.me>
Authored: Fri Jul 21 08:47:50 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jul 21 08:47:54 2017 -0700

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |  5 +-
 .../internals/AbstractCoordinatorTest.java      | 67 +++++++++++++++++---
 2 files changed, 61 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4a7064d1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index d36f711..29acc25 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -288,7 +288,10 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 heartbeatThread = null;
                 throw cause;
             }
-
+            // Awake the heartbeat thread if needed
+            if (heartbeat.shouldHeartbeat(now)) {
+                notify();
+            }
             heartbeat.poll(now);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4a7064d1/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 8a93439..afebd9d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -35,7 +35,6 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
@@ -43,6 +42,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
@@ -56,7 +57,8 @@ public class AbstractCoordinatorTest {
     private static final int REBALANCE_TIMEOUT_MS = 60000;
     private static final int SESSION_TIMEOUT_MS = 10000;
     private static final int HEARTBEAT_INTERVAL_MS = 3000;
-    private static final long RETRY_BACKOFF_MS = 100;
+    private static final long RETRY_BACKOFF_MS = 20;
+    private static final long LONG_RETRY_BACKOFF_MS = 10000;
     private static final long REQUEST_TIMEOUT_MS = 40000;
     private static final String GROUP_ID = "dummy-group";
     private static final String METRIC_GROUP_PREFIX = "consumer";
@@ -68,14 +70,13 @@ public class AbstractCoordinatorTest {
     private ConsumerNetworkClient consumerClient;
     private DummyCoordinator coordinator;
 
-    @Before
-    public void setupCoordinator() {
+    private void setupCoordinator(long retryBackoffMs) {
         this.mockTime = new MockTime();
         this.mockClient = new MockClient(mockTime);
 
         Metadata metadata = new Metadata(100L, 60 * 60 * 1000L, true);
         this.consumerClient = new ConsumerNetworkClient(mockClient, metadata, 
mockTime,
-                RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS);
+                retryBackoffMs, REQUEST_TIMEOUT_MS);
         Metrics metrics = new Metrics();
 
         Cluster cluster = TestUtils.singletonCluster("topic", 1);
@@ -89,12 +90,14 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testCoordinatorDiscoveryBackoff() {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
 
-        // blackout the coordinator for 50 milliseconds to simulate a 
disconnect.
+        // blackout the coordinator for 10 milliseconds to simulate a 
disconnect.
         // after backing off, we should be able to connect.
-        mockClient.blackout(coordinatorNode, 50L);
+        mockClient.blackout(coordinatorNode, 10L);
 
         long initialTime = mockTime.milliseconds();
         coordinator.ensureCoordinatorReady();
@@ -105,6 +108,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testUncaughtExceptionInHeartbeatThread() throws Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
         mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
@@ -124,9 +129,6 @@ public class AbstractCoordinatorTest {
         try {
             coordinator.ensureActiveGroup();
             mockTime.sleep(HEARTBEAT_INTERVAL_MS);
-            synchronized (coordinator) {
-                coordinator.notify();
-            }
             long startMs = System.currentTimeMillis();
             while (System.currentTimeMillis() - startMs < 1000) {
                 Thread.sleep(10);
@@ -139,7 +141,36 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testPollHeartbeatAwakesHeartbeatThread() throws Exception {
+        setupCoordinator(LONG_RETRY_BACKOFF_MS);
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
+        mockClient.prepareResponse(syncGroupResponse(Errors.NONE));
+
+        coordinator.ensureActiveGroup();
+
+        final CountDownLatch heartbeatDone = new CountDownLatch(1);
+        mockClient.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                heartbeatDone.countDown();
+                return body instanceof HeartbeatRequest;
+            }
+        }, heartbeatResponse(Errors.NONE));
+
+        mockTime.sleep(HEARTBEAT_INTERVAL_MS);
+        coordinator.pollHeartbeat(mockTime.milliseconds());
+
+        if (!heartbeatDone.await(1, TimeUnit.SECONDS)) {
+            fail("Should have received a heartbeat request after calling 
pollHeartbeat");
+        }
+    }
+
+    @Test
     public void testLookupCoordinator() throws Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.setNode(null);
         RequestFuture<Void> noBrokersAvailableFuture = 
coordinator.lookupCoordinator();
         assertTrue("Failed future expected", 
noBrokersAvailableFuture.failed());
@@ -156,6 +187,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterJoinGroupSent() throws Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             private int invocations = 0;
@@ -192,6 +225,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterJoinGroupSentExternalCompletion() throws 
Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             private int invocations = 0;
@@ -230,6 +265,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterJoinGroupReceived() throws Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             @Override
@@ -264,6 +301,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterJoinGroupReceivedExternalCompletion() throws 
Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
             @Override
@@ -300,6 +339,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterSyncGroupSent() throws Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -336,6 +377,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterSyncGroupSentExternalCompletion() throws 
Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -374,6 +417,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterSyncGroupReceived() throws Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {
@@ -408,6 +453,8 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws 
Exception {
+        setupCoordinator(RETRY_BACKOFF_MS);
+
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, "memberId", 
"leaderId", Errors.NONE));
         mockClient.prepareResponse(new MockClient.RequestMatcher() {

Reply via email to