[
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15803804#comment-15803804
]
huxi commented on KAFKA-4595:
-----------------------------
Yes, seems that the process might look like this:
1. At some point of time, deleting topic operation began which triggered to
resume the DeleteTopicsThread. This thread got the controller lock and ran
successfully. Then it released the lock. But the request send thread had not
begun to execute the callback.
2. Later, the network got something wrong, then controller triggered the zk
listener thread to remove dead brokers. This thread got the controller lock and
shut down the request send thread for that dead broker and wait.
3. Then, the request send thread began to execute the
deleteTopicStopReplicaCallback which also tried to get the controller lock. So
it waited forever and failed to be shut down.
4. Then deadlock happened.
[~pengwei] Does it make sense?
> Controller send thread can't stop when broker change listener event trigger
> for dead brokers
> ---------------------------------------------------------------------------------------------
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.9.0.0, 0.10.1.1
> Reporter: Pengwei
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184"
> #15 daemon prio=5 os_prio=0 tid=0x00007fb76416e000 nid=0x3019 waiting on
> condition [0x00007fb76b7c8000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000c05497b8> (a
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
> at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
> at
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
> at
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
> - locked <0x00000000c0258760> (a java.lang.Object)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Locked ownable synchronizers:
> - <0x00000000c02587f8> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0
> tid=0x00007fb778342000 nid=0x5a4c waiting on condition [0x00007fb761de0000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000c02587f8> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
> at
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:378)
> at
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345)
> at
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345)
> at
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294)
> at
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:206)
> - locked <0x00000000c04af988> (a java.lang.Object)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Locked ownable synchronizers:
> - None
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)