[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15803804#comment-15803804
 ] 

huxi commented on KAFKA-4595:
-----------------------------

Yes, seems that the process might look like this:
1. At some point of time, deleting topic operation began which triggered to 
resume the DeleteTopicsThread. This thread got the controller lock and ran 
successfully. Then it released the lock. But the request send thread had not 
begun to execute the callback.
2. Later, the network got something wrong, then controller triggered the zk 
listener thread to remove dead brokers. This thread got the controller lock and 
shut down the request send thread for that dead broker and wait.
3. Then, the request send thread began to execute the 
deleteTopicStopReplicaCallback which also tried to get the controller lock. So 
it waited forever and failed to be shut down.
4. Then deadlock happened.
[~pengwei]  Does it make sense?

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4595
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.0, 0.10.1.1
>            Reporter: Pengwei
>             Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x00007fb76416e000 nid=0x3019 waiting on 
> condition [0x00007fb76b7c8000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>       at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>       at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>       at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>       at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>       - locked <0x00000000c0258760> (a java.lang.Object)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>       at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>       at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>       at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>    Locked ownable synchronizers:
>       - <0x00000000c02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x00007fb778342000 nid=0x5a4c waiting on condition [0x00007fb761de0000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>       at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
>       at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
>       at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:378)
>       at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345)
>       at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294)
>       at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:206)
>       - locked <0x00000000c04af988> (a java.lang.Object)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>    Locked ownable synchronizers:
>       - None



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to