[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823594#comment-15823594 ]
huxi commented on KAFKA-4595: ----------------------------- [~pengwei] I don't think it's doable since it would leave inconsistent states for the involved partitions, thus polluting controller's cache. As in the current design, the topic deleting is totally asynchronous. Users nearly always see the topic is marked as deleted successfully although there are several steps the controller needs to finish in the background. If we time out the deleteTopicStopReplicaCallback, completeReplicaDeletion will not be invoked. Does it make sense? > Controller send thread can't stop when broker change listener event trigger > for dead brokers > --------------------------------------------------------------------------------------------- > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.0, 0.10.1.1 > Reporter: Pengwei > Priority: Critical > Labels: reliability > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x00007fb76416e000 nid=0x3019 waiting on > condition [0x00007fb76b7c8000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000c05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0x00000000c0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > Locked ownable synchronizers: > - <0x00000000c02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x00007fb778342000 nid=0x5a4c waiting on condition [0x00007fb761de0000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000c02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:378) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294) > at > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:206) > - locked <0x00000000c04af988> (a java.lang.Object) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > Locked ownable synchronizers: > - None -- This message was sent by Atlassian JIRA (v6.3.4#6332)