[ https://issues.apache.org/jira/browse/KAFKA-618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13498555#comment-13498555 ]
Jun Rao commented on KAFKA-618: ------------------------------- This is a very good finding. The following is one way of breaking the deadlock. In ConsumerFetcherManager, don't expose getPartitionTopicInfo(). Instead, pass partitionMap (which is immutable) to each newly created ConsumerFetcherThread. This way, ConsumerFetcherThread.processPartitionData() and ConsumerFetcherThread.handleOffsetOutOfRange() won't depend on ConsumerFetcherManager any more. If we do that, we can improve ConsumerFetcherManager.stopAllConnections() a bit too. The clearing of noLeaderPartitionSet and partitionMap can be done together before calling closeAllFetchers(). Before, we have to clear partitionMap last because before all fetchers are stopped, the processing of the fetch request still needs to read partitionMap and expects it to be non-null. > Deadlock between leader-finder-thread and consumer-fetcher-thread during > broker failure > --------------------------------------------------------------------------------------- > > Key: KAFKA-618 > URL: https://issues.apache.org/jira/browse/KAFKA-618 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Joel Koshy > Priority: Blocker > Fix For: 0.8 > > > This causes the test failure reported in KAFKA-607. This affects high-level > consumers - if they hit the deadlock then they would get wedged (or at least > until the consumer timeout). > Here is the threaddump output that shows the issue: > Found one Java-level deadlock: > ============================= > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1": > waiting for ownable synchronizer 0x00007f2283ad0000, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > which is held by > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread" > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread": > waiting to lock monitor 0x00007f2288297190 (object 0x00007f2283ab01d0, a > java.lang.Object), > which is held by > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1" > Java stack information for the threads listed above: > =================================================== > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00007f2283ad0000> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at > kafka.consumer.ConsumerFetcherManager.getPartitionTopicInfo(ConsumerFetcherManager.scala:131) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:43) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99) > - locked <0x00007f2283ab01d0> (a java.lang.Object) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread": > at > kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:142) > - waiting to lock <0x00007f2283ab01d0> (a java.lang.Object) > at > kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:49) > - locked <0x00007f2283ab0338> (a java.lang.Object) > at > kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:81) > at > kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:76) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.consumer.ConsumerFetcherManager$$anon$1.doWork(ConsumerFetcherManager.scala:76) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > Found 1 deadlock. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira