[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15803804#comment-15803804 ]
huxi commented on KAFKA-4595: ----------------------------- Yes, seems that the process might look like this: 1. At some point of time, deleting topic operation began which triggered to resume the DeleteTopicsThread. This thread got the controller lock and ran successfully. Then it released the lock. But the request send thread had not begun to execute the callback. 2. Later, the network got something wrong, then controller triggered the zk listener thread to remove dead brokers. This thread got the controller lock and shut down the request send thread for that dead broker and wait. 3. Then, the request send thread began to execute the deleteTopicStopReplicaCallback which also tried to get the controller lock. So it waited forever and failed to be shut down. 4. Then deadlock happened. [~pengwei] Does it make sense? > Controller send thread can't stop when broker change listener event trigger > for dead brokers > --------------------------------------------------------------------------------------------- > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.0, 0.10.1.1 > Reporter: Pengwei > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x00007fb76416e000 nid=0x3019 waiting on > condition [0x00007fb76b7c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000c05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0x00000000c0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > Locked ownable synchronizers: > - <0x00000000c02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x00007fb778342000 nid=0x5a4c waiting on condition [0x00007fb761de0000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000c02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:378) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:206) > - locked <0x00000000c04af988> (a java.lang.Object) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > Locked ownable synchronizers: > - None -- This message was sent by Atlassian JIRA (v6.3.4#6332)