[ https://issues.apache.org/jira/browse/KAFKA-618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joel Koshy updated KAFKA-618: ----------------------------- Attachment: KAFKA-618-v1.patch Ran 20 iterations of testcase 4011 and they all pass. One potential concern is partitionMap in the ConsumerFetcherThread being null/inconsistent wrt the partitionMap in ConsumerFetcherManager, but I looked at it closely and don't think it is possible. > Deadlock between leader-finder-thread and consumer-fetcher-thread during > broker failure > --------------------------------------------------------------------------------------- > > Key: KAFKA-618 > URL: https://issues.apache.org/jira/browse/KAFKA-618 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Joel Koshy > Priority: Blocker > Fix For: 0.8 > > Attachments: KAFKA-618-v1.patch > > > This causes the test failure reported in KAFKA-607. This affects high-level > consumers - if they hit the deadlock then they would get wedged (or at least > until the consumer timeout). > Here is the threaddump output that shows the issue: > Found one Java-level deadlock: > ============================= > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1": > waiting for ownable synchronizer 0x00007f2283ad0000, (a > java.util.concurrent.locks.ReentrantLock$NonfairSync), > which is held by > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread" > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread": > waiting to lock monitor 0x00007f2288297190 (object 0x00007f2283ab01d0, a > java.lang.Object), > which is held by > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1" > Java stack information for the threads listed above: > =================================================== > "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1": > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00007f2283ad0000> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) > at > kafka.consumer.ConsumerFetcherManager.getPartitionTopicInfo(ConsumerFetcherManager.scala:131) > at > kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:43) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116) > at > kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99) > - locked <0x00007f2283ab01d0> (a java.lang.Object) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread": > at > kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:142) > - waiting to lock <0x00007f2283ab01d0> (a java.lang.Object) > at > kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:49) > - locked <0x00007f2283ab0338> (a java.lang.Object) > at > kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:81) > at > kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:76) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > at > kafka.consumer.ConsumerFetcherManager$$anon$1.doWork(ConsumerFetcherManager.scala:76) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50) > Found 1 deadlock. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira