kafka git commit: KAFKA-4786; Wait for heartbeat thread to terminate in consumer close

2017-02-22 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 c231f5280 -> ba4eafa78


KAFKA-4786; Wait for heartbeat thread to terminate in consumer close

Author: Rajini Sivaram 

Reviewers: Apurva Mehta , Ismael Juma , 
Jason Gustafson 

Closes #2586 from rajinisivaram/KAFKA-4786

(cherry picked from commit 5916ef0227d099e5fa05341db3f918f5ef035816)
Signed-off-by: Jason Gustafson 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba4eafa7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba4eafa7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba4eafa7

Branch: refs/heads/0.10.2
Commit: ba4eafa7874988374abcd9f48fbab96abb2032a4
Parents: c231f52
Author: Rajini Sivaram 
Authored: Wed Feb 22 12:08:09 2017 -0800
Committer: Jason Gustafson 
Committed: Wed Feb 22 12:35:13 2017 -0800

--
 .../consumer/internals/AbstractCoordinator.java | 46 ++--
 1 file changed, 33 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/ba4eafa7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6eea045..b72769e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements 
Closeable {
 heartbeatThread.disable();
 }
 
+private void closeHeartbeatThread() {
+if (heartbeatThread != null) {
+heartbeatThread.close();
+
+try {
+heartbeatThread.join();
+} catch (InterruptedException e) {
+log.warn("Interrupted while waiting for consumer heartbeat 
thread to close");
+throw new InterruptException(e);
+}
+}
+}
+
 // visible for testing. Joins the group without starting the heartbeat 
thread.
 void joinGroupIfNeeded() {
 while (needRejoin() || rejoinIncomplete()) {
@@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements 
Closeable {
 close(0);
 }
 
-protected synchronized void close(long timeoutMs) {
-if (heartbeatThread != null)
-heartbeatThread.close();
-maybeLeaveGroup();
-
-// At this point, there may be pending commits (async commits or sync 
commits that were
-// interrupted using wakeup) and the leave group request which have 
been queued, but not
-// yet sent to the broker. Wait up to close timeout for these pending 
requests to be processed.
-// If coordinator is not known, requests are aborted.
-Node coordinator = coordinator();
-if (coordinator != null && !client.awaitPendingRequests(coordinator, 
timeoutMs))
-log.warn("Close timed out with {} pending requests to coordinator, 
terminating client connections for group {}.",
-client.pendingRequestCount(coordinator), groupId);
+protected void close(long timeoutMs) {
+try {
+closeHeartbeatThread();
+} finally {
+
+// Synchronize after closing the heartbeat thread since heartbeat 
thread
+// needs this lock to complete and terminate after close flag is 
set.
+synchronized (this) {
+maybeLeaveGroup();
+
+// At this point, there may be pending commits (async commits 
or sync commits that were
+// interrupted using wakeup) and the leave group request which 
have been queued, but not
+// yet sent to the broker. Wait up to close timeout for these 
pending requests to be processed.
+// If coordinator is not known, requests are aborted.
+Node coordinator = coordinator();
+if (coordinator != null && 
!client.awaitPendingRequests(coordinator, timeoutMs))
+log.warn("Close timed out with {} pending requests to 
coordinator, terminating client connections for group {}.",
+client.pendingRequestCount(coordinator), groupId);
+}
+}
 }
 
 /**



kafka git commit: KAFKA-4786; Wait for heartbeat thread to terminate in consumer close

2017-02-22 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk 913c09e4a -> 5916ef022


KAFKA-4786; Wait for heartbeat thread to terminate in consumer close

Author: Rajini Sivaram 

Reviewers: Apurva Mehta , Ismael Juma , 
Jason Gustafson 

Closes #2586 from rajinisivaram/KAFKA-4786


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5916ef02
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5916ef02
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5916ef02

Branch: refs/heads/trunk
Commit: 5916ef0227d099e5fa05341db3f918f5ef035816
Parents: 913c09e
Author: Rajini Sivaram 
Authored: Wed Feb 22 12:08:09 2017 -0800
Committer: Jason Gustafson 
Committed: Wed Feb 22 12:08:09 2017 -0800

--
 .../consumer/internals/AbstractCoordinator.java | 46 ++--
 1 file changed, 33 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/5916ef02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 1c2d607..d36aac9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements 
Closeable {
 heartbeatThread.disable();
 }
 
+private void closeHeartbeatThread() {
+if (heartbeatThread != null) {
+heartbeatThread.close();
+
+try {
+heartbeatThread.join();
+} catch (InterruptedException e) {
+log.warn("Interrupted while waiting for consumer heartbeat 
thread to close");
+throw new InterruptException(e);
+}
+}
+}
+
 // visible for testing. Joins the group without starting the heartbeat 
thread.
 void joinGroupIfNeeded() {
 while (needRejoin() || rejoinIncomplete()) {
@@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements 
Closeable {
 close(0);
 }
 
-protected synchronized void close(long timeoutMs) {
-if (heartbeatThread != null)
-heartbeatThread.close();
-maybeLeaveGroup();
-
-// At this point, there may be pending commits (async commits or sync 
commits that were
-// interrupted using wakeup) and the leave group request which have 
been queued, but not
-// yet sent to the broker. Wait up to close timeout for these pending 
requests to be processed.
-// If coordinator is not known, requests are aborted.
-Node coordinator = coordinator();
-if (coordinator != null && !client.awaitPendingRequests(coordinator, 
timeoutMs))
-log.warn("Close timed out with {} pending requests to coordinator, 
terminating client connections for group {}.",
-client.pendingRequestCount(coordinator), groupId);
+protected void close(long timeoutMs) {
+try {
+closeHeartbeatThread();
+} finally {
+
+// Synchronize after closing the heartbeat thread since heartbeat 
thread
+// needs this lock to complete and terminate after close flag is 
set.
+synchronized (this) {
+maybeLeaveGroup();
+
+// At this point, there may be pending commits (async commits 
or sync commits that were
+// interrupted using wakeup) and the leave group request which 
have been queued, but not
+// yet sent to the broker. Wait up to close timeout for these 
pending requests to be processed.
+// If coordinator is not known, requests are aborted.
+Node coordinator = coordinator();
+if (coordinator != null && 
!client.awaitPendingRequests(coordinator, timeoutMs))
+log.warn("Close timed out with {} pending requests to 
coordinator, terminating client connections for group {}.",
+client.pendingRequestCount(coordinator), groupId);
+}
+}
 }
 
 /**