Repository: kafka
Updated Branches:
  refs/heads/trunk 2c0055e62 -> 45d9fb3d5


KAFKA-4735; Fix deadlock issue during MM shutdown

In https://issues.apache.org/jira/browse/KAFKA-4521 we fixed a potential 
message reorder bug in MM. However, the patch introduced another bug that can 
cause deadlock during MM shutdown. The deadlock will happen if zookeeper 
listener thread call requestAndWaitForCommit() after MirrorMaker thread has 
already exited loop of consuming and producing messages.

This patch fixes the problem by setting `iter` to `null` in 
`MirrorMakerOldConsumer.cleanup()`. If zookeeper listener thread calls 
`requestAndWaitForCommit()` after `cleanup()`, then it will not block waiting 
for commit notification since `iter == null`. If zookeeper listener thread 
calls `requestAndWaitForCommit()` before `cleanup()`, then `cleanup()` will 
call `notifyAll()` to unblock zookeeper listener thread.

Author: Dong Lin <lindon...@gmail.com>

Reviewers: Jiangjie Qin <becket....@gmail.com>

Closes #2504 from lindong28/KAFKA-4735


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45d9fb3d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45d9fb3d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45d9fb3d

Branch: refs/heads/trunk
Commit: 45d9fb3d5f13bef0bf6809ea71b2cbd73996a1b6
Parents: 2c0055e
Author: Dong Lin <lindon...@gmail.com>
Authored: Mon Feb 6 16:01:59 2017 -0800
Committer: Jiangjie Qin <becket....@gmail.com>
Committed: Mon Feb 6 16:01:59 2017 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/MirrorMaker.scala | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45d9fb3d/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 42456f7..a2866ad 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -527,7 +527,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
     override def requestAndWaitForCommit() {
       this.synchronized {
-        // skip wait() if mirrorMakerConsumer has not been initialized
+        // only wait() if mirrorMakerConsumer has been initialized and it has 
not been cleaned up.
         if (iter != null) {
           immediateCommitRequested = true
           this.wait()
@@ -566,6 +566,15 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
     }
 
     override def cleanup() {
+      // We need to set the iterator to null and notify the rebalance listener 
thread.
+      // This is to handle the case that the consumer rebalance is triggered 
when the
+      // mirror maker thread is shutting down and the rebalance listener is 
waiting for the offset commit.
+      this.synchronized {
+        iter = null
+        if (immediateCommitRequested) {
+          notifyCommit()
+        }
+      }
       connector.shutdown()
     }
 

Reply via email to