[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035465#comment-16035465 ] Onur Karaman commented on KAFKA-4595: - [~ijuma] Yeah I think KAFKA-5028 solved this issue. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16034264#comment-16034264 ] Ismael Juma commented on KAFKA-4595: Bumping the fix version until we get confirmation if this was fixed or not. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030705#comment-16030705 ] Ismael Juma commented on KAFKA-4595: [~junrao], is this fixed given that deletion no longer happens in a separate thread? > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei >Priority: Critical > Labels: reliability > Fix For: 0.11.0.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823594#comment-15823594 ] huxi commented on KAFKA-4595: - [~pengwei] I don't think it's doable since it would leave inconsistent states for the involved partitions, thus polluting controller's cache. As in the current design, the topic deleting is totally asynchronous. Users nearly always see the topic is marked as deleted successfully although there are several steps the controller needs to finish in the background. If we time out the deleteTopicStopReplicaCallback, completeReplicaDeletion will not be invoked. Does it make sense? > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei >Priority: Critical > Labels: reliability > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811007#comment-15811007 ] Pengwei commented on KAFKA-4595: One method is to add a lock timeout method instead of blocking locking in the deleteTopicStopReplicaCallback。 If timeout, this call is failed and the RequestSendThread can be stop after this round. And it seems like to be the broker have receive the stop replicas command, but does not response the result to the controller, if the broker recover, it can receive the stop replicas command again and response to the controller. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei >Priority: Critical > Labels: reliability > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803816#comment-15803816 ] Pengwei commented on KAFKA-4595: [~huxi_2b] > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803812#comment-15803812 ] Pengwei commented on KAFKA-4595: Yes, you are right. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) >
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803804#comment-15803804 ] huxi commented on KAFKA-4595: - Yes, seems that the process might look like this: 1. At some point of time, deleting topic operation began which triggered to resume the DeleteTopicsThread. This thread got the controller lock and ran successfully. Then it released the lock. But the request send thread had not begun to execute the callback. 2. Later, the network got something wrong, then controller triggered the zk listener thread to remove dead brokers. This thread got the controller lock and shut down the request send thread for that dead broker and wait. 3. Then, the request send thread began to execute the deleteTopicStopReplicaCallback which also tried to get the controller lock. So it waited forever and failed to be shut down. 4. Then deadlock happened. [~pengwei] Does it make sense? > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > -
[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers
[ https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15801343#comment-15801343 ] Pengwei commented on KAFKA-4595: The broker change event is trigger to remove dead brokers 1003, it lock the controller lock and need to stop the Controller-1001-to-broker-1003-send-thread, but this send thread is calling the deleteTopicStopReplicaCallback, it also try to get the controller lock, so dead lock between these two operation. > Controller send thread can't stop when broker change listener event trigger > for dead brokers > - > > Key: KAFKA-4595 > URL: https://issues.apache.org/jira/browse/KAFKA-4595 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.1.1 >Reporter: Pengwei > Fix For: 0.10.2.0 > > > In our test env, we found controller is not working after a delete topic > opertation and network issue, the stack is below: > "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" > #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on > condition [0x7fb76b7c8000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc05497b8> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50) > at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128) > at > kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81) > - locked <0xc0258760> (a java.lang.Object) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:79) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) >Locked ownable synchronizers: > - <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 > tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0xc02587f8> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at