[
https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14165914#comment-14165914
]
Sriharsha Chintalapani commented on KAFKA-1305:
-----------------------------------------------
[~junrao] I ran the above test you suggested with
leader.imbalance.check.interval.seconds set to 30 and
controller.message.queue.size set 10000. With 5 brokers and 1500 topics with 3
partitons and 3 replication factor. I am able to run into a case where a broker
prints "Shutdown completed" but the process still hangs. Running the test by
setting controller.message.queue.size to higher number.
Here is thread dump
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode):
"Attach Listener" daemon prio=10 tid=0x00007f8860003000 nid=0x26cc waiting on
condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Controller-4-to-broker-3-send-thread" prio=10 tid=0x00007f884c049000
nid=0x26b2 waiting on condition [0x00007f83c99e0000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000d2be8008> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:121)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
"Thread-2" prio=10 tid=0x00007f8868005800 nid=0x26b1 waiting on condition
[0x00007f83c98df000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000d2a34508> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at
java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)
at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:88)
at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:353)
at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:348)
at
kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:348)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at
kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:348)
at kafka.controller.KafkaController.shutdown(KafkaController.scala:663)
at
kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:287)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
at kafka.utils.Logging$class.swallow(Logging.scala:94)
at kafka.utils.Utils$.swallow(Utils.scala:45)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:287)
at
kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:40)
at kafka.Kafka$$anon$1.run(Kafka.scala:42)
"SIGTERM handler" daemon prio=10 tid=0x00007f8860002000 nid=0x26ae in
Object.wait() [0x00007f83c9be2000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000d018bda8> (a kafka.Kafka$$anon$1)
at java.lang.Thread.join(Thread.java:1281)
- locked <0x00000000d018bda8> (a kafka.Kafka$$anon$1)
at java.lang.Thread.join(Thread.java:1355)
at
java.lang.ApplicationShutdownHooks.runHooks(ApplicationShutdownHooks.java:106)
at
java.lang.ApplicationShutdownHooks$1.run(ApplicationShutdownHooks.java:46)
at java.lang.Shutdown.runHooks(Shutdown.java:123)
at java.lang.Shutdown.sequence(Shutdown.java:167)
at java.lang.Shutdown.exit(Shutdown.java:212)
- locked <0x00000000d0080660> (a java.lang.Class for java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:745)
"kafka-scheduler-0" daemon prio=10 tid=0x00007f884c045000 nid=0x26aa waiting on
condition [0x00007f83c9ee5000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000cfb1b800> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197)
at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290)
at kafka.utils.Utils$.inLock(Utils.scala:533)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1149)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1147)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1147)
at
kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1126)
at
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at
kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1126)
at
kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:326)
at
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:99)
at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
"Controller-4-to-broker-1-send-thread" prio=10 tid=0x00007f884c019800
nid=0x26a9 waiting on condition [0x00007f83c9fe6000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000d2828b58> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> Controller can hang on controlled shutdown with auto leader balance enabled
> ---------------------------------------------------------------------------
>
> Key: KAFKA-1305
> URL: https://issues.apache.org/jira/browse/KAFKA-1305
> Project: Kafka
> Issue Type: Bug
> Reporter: Joel Koshy
> Assignee: Sriharsha Chintalapani
> Priority: Blocker
> Fix For: 0.8.2, 0.9.0
>
>
> This is relatively easy to reproduce especially when doing a rolling bounce.
> What happened here is as follows:
> 1. The previous controller was bounced and broker 265 became the new
> controller.
> 2. I went on to do a controlled shutdown of broker 265 (the new controller).
> 3. In the mean time the automatically scheduled preferred replica leader
> election process started doing its thing and starts sending
> LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers).
> (t@113 below).
> 4. While that's happening, the controlled shutdown process on 265 succeeds
> and proceeds to deregister itself from ZooKeeper and shuts down the socket
> server.
> 5. (ReplicaStateMachine actually removes deregistered brokers from the
> controller channel manager's list of brokers to send requests to. However,
> that removal cannot take place (t@18 below) because preferred replica leader
> election task owns the controller lock.)
> 6. So the request thread to broker 265 gets into infinite retries.
> 7. The entire broker shutdown process is blocked on controller shutdown for
> the same reason (it needs to acquire the controller lock).
> Relevant portions from the thread-dump:
> "Controller-265-to-broker-265-send-thread" - Thread t@113
> java.lang.Thread.State: TIMED_WAITING
> at java.lang.Thread.sleep(Native Method)
> at
> kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143)
> at kafka.utils.Utils$.swallow(Utils.scala:167)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:46)
> at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> - locked java.lang.Object@6dbf14a7
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> Locked ownable synchronizers:
> - None
> ...
> "Thread-4" - Thread t@17
> java.lang.Thread.State: WAITING on
> java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by:
> kafka-scheduler-0
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> at kafka.utils.Utils$.inLock(Utils.scala:536)
> at kafka.controller.KafkaController.shutdown(KafkaController.scala:642)
> at
> kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242)
> at kafka.utils.Utils$.swallow(Utils.scala:167)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:46)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:46)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242)
> at
> kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46)
> at kafka.Kafka$$anon$1.run(Kafka.scala:42)
> ...
> "kafka-scheduler-0" - Thread t@117
> java.lang.Thread.State: WAITING on
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@1dc407fc
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
> - locked java.lang.Object@578b748f
> at
> kafka.controller.KafkaController.sendRequest(KafkaController.scala:657)
> at
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
> at
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:282)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> at
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:282)
> at
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:126)
> at
> kafka.controller.KafkaController.onPreferredReplicaElection(KafkaController.scala:612)
> at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply$mcV$sp(KafkaController.scala:1119)
> at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
> at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17$$anonfun$apply$5.apply(KafkaController.scala:1114)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1111)
> at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4$$anonfun$apply$17.apply(KafkaController.scala:1109)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1109)
> at
> kafka.controller.KafkaController$$anonfun$kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance$4.apply(KafkaController.scala:1088)
> at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
> at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
> at
> kafka.controller.KafkaController.kafka$controller$KafkaController$$checkAndTriggerPartitionRebalance(KafkaController.scala:1088)
> at
> kafka.controller.KafkaController$$anonfun$onControllerFailover$1.apply$mcV$sp(KafkaController.scala:323)
> at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> at
> java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Locked ownable synchronizers:
> - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840
> - locked java.util.concurrent.locks.ReentrantLock$NonfairSync@4918530
> ...
> "ZkClient-EventThread-18-/kafka-shadow" - Thread t@18
> java.lang.Thread.State: WAITING on
> java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by:
> kafka-scheduler-0
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
> at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
> at kafka.utils.Utils$.inLock(Utils.scala:536)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:328)
> at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)