[
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823594#comment-15823594
]
huxi commented on KAFKA-4595:
-----------------------------
[~pengwei] I don't think it's doable since it would leave inconsistent states
for the involved partitions, thus polluting controller's cache. As in the
current design, the topic deleting is totally asynchronous. Users nearly always
see the topic is marked as deleted successfully although there are several
steps the controller needs to finish in the background. If we time out the
deleteTopicStopReplicaCallback, completeReplicaDeletion will not be invoked.
Does it make sense?
> Controller send thread can't stop when broker change listener event trigger
> for dead brokers
> ---------------------------------------------------------------------------------------------
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.9.0.0, 0.10.1.1
> Reporter: Pengwei
> Priority: Critical
> Labels: reliability
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184"
> #15 daemon prio=5 os_prio=0 tid=0x00007fb76416e000 nid=0x3019 waiting on
> condition [0x00007fb76b7c8000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000c05497b8> (a
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
> at
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
> at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
> at
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
> at
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
> - locked <0x00000000c0258760> (a java.lang.Object)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Locked ownable synchronizers:
> - <0x00000000c02587f8> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0
> tid=0x00007fb778342000 nid=0x5a4c waiting on condition [0x00007fb761de0000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000000c02587f8> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
> at
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:378)
> at
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345)
> at
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345)
> at
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294)
> at
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:206)
> - locked <0x00000000c04af988> (a java.lang.Object)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> Locked ownable synchronizers:
> - None
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)