[ 
https://issues.apache.org/jira/browse/KAFKA-1447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14112372#comment-14112372
 ] 

Rudolf Šíma commented on KAFKA-1447:
------------------------------------

The bug seems to be still present in 0.8.2. We ran into the issue when bouncing 
18 brokers at once with controlled shutdown enabled, which led to this kind of 
deadlock. As a workaround, we have increased controller.message.queue.size to 
10000 (10 is default). Are there any pitfalls of using large controller message 
queue sizes? With the default size of 10, the deadlock seems very likely when 
restarting larger numbers of nodes at once, since all threads capable of 
polling from the RequestChannel's requestQueue will be blocked on 
requestQueue.put(request) in sendRequest(Request).

> Controlled shutdown deadlock when trying to send state updates
> --------------------------------------------------------------
>
>                 Key: KAFKA-1447
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1447
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.8.0
>            Reporter: Sam Meder
>            Assignee: Neha Narkhede
>
> We're seeing controlled shutdown indefinitely stuck on trying to send out 
> state change messages to the other brokers:
> [2014-05-03 04:01:30,580] INFO [Socket Server on Broker 4], Shutdown 
> completed (kafka.network.SocketServer)
> [2014-05-03 04:01:30,581] INFO [Kafka Request Handler on Broker 4], shutting 
> down (kafka.server.KafkaRequestHandlerPool)
> and stuck on:
> "kafka-request-handler-12" daemon prio=10 tid=0x00007f1f04a66800 nid=0x6e79 
> waiting on condition [0x00007f1ad5767000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> parking to wait for <0x000000078e91dc20> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> at 
> kafka.controller.ControllerChannelManager.sendRequest(ControllerChannelManager.scala:57)
> locked <0x000000078e91dc38> (a java.lang.Object)
> at kafka.controller.KafkaController.sendRequest(KafkaController.scala:655)
> at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:298)
> at 
> kafkler.ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2.apply(ControllerChannelManager.scala:290)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> at 
> kafka.controller.ControllerBrokerRequestBatch.sendRequestsToBrokers(ControllerChannelManager.scala:290)
> at 
> kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:97)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:269)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1$$anonfun$apply$mcV$sp$3.apply(KafkaController.scala:253)
> at scala.Option.foreach(Option.scala:197)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply$mcV$sp(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3$$anonfun$apply$1.apply(KafkaController.scala:253)
> at kafka.utils.Utils$.inLock(Utils.scala:538)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:252)
> at 
> kafka.controller.KafkaController$$anonfun$shutdownBroker$3.apply(KafkaController.scala:249)
> at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:130)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:275)
> at kafka.controller.KafkaController.shutdownBroker(KafkaController.scala:249)
> locked <0x000000078b495af0> (a java.lang.Object)
> at kafka.server.KafkaApis.handleControlledShutdownRequest(KafkaApis.scala:264)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:192)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
> at java.lang.Thread.run(Thread.java:722)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to