Repository: kafka Updated Branches: refs/heads/trunk 2c0055e62 -> 45d9fb3d5
KAFKA-4735; Fix deadlock issue during MM shutdown In https://issues.apache.org/jira/browse/KAFKA-4521 we fixed a potential message reorder bug in MM. However, the patch introduced another bug that can cause deadlock during MM shutdown. The deadlock will happen if zookeeper listener thread call requestAndWaitForCommit() after MirrorMaker thread has already exited loop of consuming and producing messages. This patch fixes the problem by setting `iter` to `null` in `MirrorMakerOldConsumer.cleanup()`. If zookeeper listener thread calls `requestAndWaitForCommit()` after `cleanup()`, then it will not block waiting for commit notification since `iter == null`. If zookeeper listener thread calls `requestAndWaitForCommit()` before `cleanup()`, then `cleanup()` will call `notifyAll()` to unblock zookeeper listener thread. Author: Dong Lin <lindon...@gmail.com> Reviewers: Jiangjie Qin <becket....@gmail.com> Closes #2504 from lindong28/KAFKA-4735 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45d9fb3d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45d9fb3d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45d9fb3d Branch: refs/heads/trunk Commit: 45d9fb3d5f13bef0bf6809ea71b2cbd73996a1b6 Parents: 2c0055e Author: Dong Lin <lindon...@gmail.com> Authored: Mon Feb 6 16:01:59 2017 -0800 Committer: Jiangjie Qin <becket....@gmail.com> Committed: Mon Feb 6 16:01:59 2017 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/MirrorMaker.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/45d9fb3d/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 42456f7..a2866ad 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -527,7 +527,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def requestAndWaitForCommit() { this.synchronized { - // skip wait() if mirrorMakerConsumer has not been initialized + // only wait() if mirrorMakerConsumer has been initialized and it has not been cleaned up. if (iter != null) { immediateCommitRequested = true this.wait() @@ -566,6 +566,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } override def cleanup() { + // We need to set the iterator to null and notify the rebalance listener thread. + // This is to handle the case that the consumer rebalance is triggered when the + // mirror maker thread is shutting down and the rebalance listener is waiting for the offset commit. + this.synchronized { + iter = null + if (immediateCommitRequested) { + notifyCommit() + } + } connector.shutdown() }