kafka git commit: KAFKA-4786; Wait for heartbeat thread to terminate in consumer close
Repository: kafka Updated Branches: refs/heads/0.10.2 c231f5280 -> ba4eafa78 KAFKA-4786; Wait for heartbeat thread to terminate in consumer close Author: Rajini Sivaram Reviewers: Apurva Mehta , Ismael Juma , Jason Gustafson Closes #2586 from rajinisivaram/KAFKA-4786 (cherry picked from commit 5916ef0227d099e5fa05341db3f918f5ef035816) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ba4eafa7 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ba4eafa7 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ba4eafa7 Branch: refs/heads/0.10.2 Commit: ba4eafa7874988374abcd9f48fbab96abb2032a4 Parents: c231f52 Author: Rajini Sivaram Authored: Wed Feb 22 12:08:09 2017 -0800 Committer: Jason Gustafson Committed: Wed Feb 22 12:35:13 2017 -0800 -- .../consumer/internals/AbstractCoordinator.java | 46 ++-- 1 file changed, 33 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/ba4eafa7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 6eea045..b72769e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements Closeable { heartbeatThread.disable(); } +private void closeHeartbeatThread() { +if (heartbeatThread != null) { +heartbeatThread.close(); + +try { +heartbeatThread.join(); +} catch (InterruptedException e) { +log.warn("Interrupted while waiting for consumer heartbeat thread to close"); +throw new InterruptException(e); +} +} +} + // visible for testing. Joins the group without starting the heartbeat thread. void joinGroupIfNeeded() { while (needRejoin() || rejoinIncomplete()) { @@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements Closeable { close(0); } -protected synchronized void close(long timeoutMs) { -if (heartbeatThread != null) -heartbeatThread.close(); -maybeLeaveGroup(); - -// At this point, there may be pending commits (async commits or sync commits that were -// interrupted using wakeup) and the leave group request which have been queued, but not -// yet sent to the broker. Wait up to close timeout for these pending requests to be processed. -// If coordinator is not known, requests are aborted. -Node coordinator = coordinator(); -if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) -log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", -client.pendingRequestCount(coordinator), groupId); +protected void close(long timeoutMs) { +try { +closeHeartbeatThread(); +} finally { + +// Synchronize after closing the heartbeat thread since heartbeat thread +// needs this lock to complete and terminate after close flag is set. +synchronized (this) { +maybeLeaveGroup(); + +// At this point, there may be pending commits (async commits or sync commits that were +// interrupted using wakeup) and the leave group request which have been queued, but not +// yet sent to the broker. Wait up to close timeout for these pending requests to be processed. +// If coordinator is not known, requests are aborted. +Node coordinator = coordinator(); +if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) +log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", +client.pendingRequestCount(coordinator), groupId); +} +} } /**
kafka git commit: KAFKA-4786; Wait for heartbeat thread to terminate in consumer close
Repository: kafka Updated Branches: refs/heads/trunk 913c09e4a -> 5916ef022 KAFKA-4786; Wait for heartbeat thread to terminate in consumer close Author: Rajini Sivaram Reviewers: Apurva Mehta , Ismael Juma , Jason Gustafson Closes #2586 from rajinisivaram/KAFKA-4786 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5916ef02 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5916ef02 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5916ef02 Branch: refs/heads/trunk Commit: 5916ef0227d099e5fa05341db3f918f5ef035816 Parents: 913c09e Author: Rajini Sivaram Authored: Wed Feb 22 12:08:09 2017 -0800 Committer: Jason Gustafson Committed: Wed Feb 22 12:08:09 2017 -0800 -- .../consumer/internals/AbstractCoordinator.java | 46 ++-- 1 file changed, 33 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/5916ef02/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 1c2d607..d36aac9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -315,6 +315,19 @@ public abstract class AbstractCoordinator implements Closeable { heartbeatThread.disable(); } +private void closeHeartbeatThread() { +if (heartbeatThread != null) { +heartbeatThread.close(); + +try { +heartbeatThread.join(); +} catch (InterruptedException e) { +log.warn("Interrupted while waiting for consumer heartbeat thread to close"); +throw new InterruptException(e); +} +} +} + // visible for testing. Joins the group without starting the heartbeat thread. void joinGroupIfNeeded() { while (needRejoin() || rejoinIncomplete()) { @@ -652,19 +665,26 @@ public abstract class AbstractCoordinator implements Closeable { close(0); } -protected synchronized void close(long timeoutMs) { -if (heartbeatThread != null) -heartbeatThread.close(); -maybeLeaveGroup(); - -// At this point, there may be pending commits (async commits or sync commits that were -// interrupted using wakeup) and the leave group request which have been queued, but not -// yet sent to the broker. Wait up to close timeout for these pending requests to be processed. -// If coordinator is not known, requests are aborted. -Node coordinator = coordinator(); -if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) -log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", -client.pendingRequestCount(coordinator), groupId); +protected void close(long timeoutMs) { +try { +closeHeartbeatThread(); +} finally { + +// Synchronize after closing the heartbeat thread since heartbeat thread +// needs this lock to complete and terminate after close flag is set. +synchronized (this) { +maybeLeaveGroup(); + +// At this point, there may be pending commits (async commits or sync commits that were +// interrupted using wakeup) and the leave group request which have been queued, but not +// yet sent to the broker. Wait up to close timeout for these pending requests to be processed. +// If coordinator is not known, requests are aborted. +Node coordinator = coordinator(); +if (coordinator != null && !client.awaitPendingRequests(coordinator, timeoutMs)) +log.warn("Close timed out with {} pending requests to coordinator, terminating client connections for group {}.", +client.pendingRequestCount(coordinator), groupId); +} +} } /**