Repository: kafka
Updated Branches:
  refs/heads/0.10.2 c231f5280 -> ba4eafa78


KAFKA-4786; Wait for heartbeat thread to terminate in consumer close

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk>, 
Jason Gustafson <ja...@confluent.io>

Closes #2586 from rajinisivaram/KAFKA-4786

(cherry picked from commit 5916ef0227d099e5fa05341db3f918f5ef035816)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba4eafa7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba4eafa7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba4eafa7

Branch: refs/heads/0.10.2
Commit: ba4eafa7874988374abcd9f48fbab96abb2032a4
Parents: c231f52
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Wed Feb 22 12:08:09 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Feb 22 12:35:13 2017 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 46 ++++++++++++++------
 1 file changed, 33 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ba4eafa7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6eea045..b72769e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements 
Closeable {
             heartbeatThread.disable();
     }
 
+    private void closeHeartbeatThread() {
+        if (heartbeatThread != null) {
+            heartbeatThread.close();
+
+            try {
+                heartbeatThread.join();
+            } catch (InterruptedException e) {
+                log.warn("Interrupted while waiting for consumer heartbeat 
thread to close");
+                throw new InterruptException(e);
+            }
+        }
+    }
+
     // visible for testing. Joins the group without starting the heartbeat 
thread.
     void joinGroupIfNeeded() {
         while (needRejoin() || rejoinIncomplete()) {
@@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements 
Closeable {
         close(0);
     }
 
-    protected synchronized void close(long timeoutMs) {
-        if (heartbeatThread != null)
-            heartbeatThread.close();
-        maybeLeaveGroup();
-
-        // At this point, there may be pending commits (async commits or sync 
commits that were
-        // interrupted using wakeup) and the leave group request which have 
been queued, but not
-        // yet sent to the broker. Wait up to close timeout for these pending 
requests to be processed.
-        // If coordinator is not known, requests are aborted.
-        Node coordinator = coordinator();
-        if (coordinator != null && !client.awaitPendingRequests(coordinator, 
timeoutMs))
-            log.warn("Close timed out with {} pending requests to coordinator, 
terminating client connections for group {}.",
-                    client.pendingRequestCount(coordinator), groupId);
+    protected void close(long timeoutMs) {
+        try {
+            closeHeartbeatThread();
+        } finally {
+
+            // Synchronize after closing the heartbeat thread since heartbeat 
thread
+            // needs this lock to complete and terminate after close flag is 
set.
+            synchronized (this) {
+                maybeLeaveGroup();
+
+                // At this point, there may be pending commits (async commits 
or sync commits that were
+                // interrupted using wakeup) and the leave group request which 
have been queued, but not
+                // yet sent to the broker. Wait up to close timeout for these 
pending requests to be processed.
+                // If coordinator is not known, requests are aborted.
+                Node coordinator = coordinator();
+                if (coordinator != null && 
!client.awaitPendingRequests(coordinator, timeoutMs))
+                    log.warn("Close timed out with {} pending requests to 
coordinator, terminating client connections for group {}.",
+                            client.pendingRequestCount(coordinator), groupId);
+            }
+        }
     }
 
     /**

Reply via email to