[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-06-02 Thread Onur Karaman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16035465#comment-16035465
 ] 

Onur Karaman commented on KAFKA-4595:
-

[~ijuma] Yeah I think KAFKA-5028 solved this issue.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> 

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-06-02 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16034264#comment-16034264
 ] 

Ismael Juma commented on KAFKA-4595:


Bumping the fix version until we get confirmation if this was fixed or not.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> 

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-05-31 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16030705#comment-16030705
 ] 

Ismael Juma commented on KAFKA-4595:


[~junrao], is this fixed given that deletion no longer happens in a separate 
thread?

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> 

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-16 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15823594#comment-15823594
 ] 

huxi commented on KAFKA-4595:
-

[~pengwei] I don't think it's doable since it would leave inconsistent states 
for the involved partitions, thus polluting controller's cache. As in the 
current design, the topic deleting is totally asynchronous. Users nearly always 
see the topic is marked as deleted successfully although there are several 
steps the controller needs to finish in the background. If we time out the 
deleteTopicStopReplicaCallback, completeReplicaDeletion will not be invoked.

Does it make sense?

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-09 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15811007#comment-15811007
 ] 

Pengwei commented on KAFKA-4595:


One method is to add a lock timeout method instead of blocking locking in the 
deleteTopicStopReplicaCallback。 If timeout, this call is failed and the 
RequestSendThread can be stop after this round. And it seems like to be the 
broker have receive the stop replicas command, but does not response the result 
to the controller,  if the  broker recover, it can receive the stop replicas 
command again and response to the controller.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
> 

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-05 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803816#comment-15803816
 ] 

Pengwei commented on KAFKA-4595:


[~huxi_2b]

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>   at 
> 

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-05 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803812#comment-15803812
 ] 

Pengwei commented on KAFKA-4595:


Yes, you are right.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-05 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15803804#comment-15803804
 ] 

huxi commented on KAFKA-4595:
-

Yes, seems that the process might look like this:
1. At some point of time, deleting topic operation began which triggered to 
resume the DeleteTopicsThread. This thread got the controller lock and ran 
successfully. Then it released the lock. But the request send thread had not 
begun to execute the callback.
2. Later, the network got something wrong, then controller triggered the zk 
listener thread to remove dead brokers. This thread got the controller lock and 
shut down the request send thread for that dead broker and wait.
3. Then, the request send thread began to execute the 
deleteTopicStopReplicaCallback which also tried to get the controller lock. So 
it waited forever and failed to be shut down.
4. Then deadlock happened.
[~pengwei]  Does it make sense?

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - 

[jira] [Commented] (KAFKA-4595) Controller send thread can't stop when broker change listener event trigger for dead brokers

2017-01-05 Thread Pengwei (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15801343#comment-15801343
 ] 

Pengwei commented on KAFKA-4595:


The broker change event is trigger to remove dead brokers 1003,  it lock the 
controller lock and  need to stop the 
Controller-1001-to-broker-1003-send-thread, but this send thread is calling the 
deleteTopicStopReplicaCallback, it also try to get the controller lock, so dead 
lock between these two operation.

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> -
>
> Key: KAFKA-4595
> URL: https://issues.apache.org/jira/browse/KAFKA-4595
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.1.1
>Reporter: Pengwei
> Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x7fb76416e000 nid=0x3019 waiting on 
> condition [0x7fb76b7c8000]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>   at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>   at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>   at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>   at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>   at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>   - locked <0xc0258760> (a java.lang.Object)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>   at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>Locked ownable synchronizers:
>   - <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x7fb778342000 nid=0x5a4c waiting on condition [0x7fb761de]
>java.lang.Thread.State: WAITING (parking)
>   at sun.misc.Unsafe.park(Native Method)
>   - parking to wait for  <0xc02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>   at