[ 
https://issues.apache.org/jira/browse/KAFKA-618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13498555#comment-13498555
 ] 

Jun Rao commented on KAFKA-618:
-------------------------------

This is a very good finding. The following is one way of breaking the deadlock.

In ConsumerFetcherManager, don't expose getPartitionTopicInfo(). Instead, pass 
partitionMap (which is immutable) to each newly created ConsumerFetcherThread. 
This way, ConsumerFetcherThread.processPartitionData() and 
ConsumerFetcherThread.handleOffsetOutOfRange() won't depend on 
ConsumerFetcherManager any more. If we do that, we can improve 
ConsumerFetcherManager.stopAllConnections() a bit too. The clearing of 
noLeaderPartitionSet and partitionMap can be done together before calling 
closeAllFetchers(). Before, we have to clear partitionMap last because before 
all fetchers are stopped, the processing of the fetch request still needs to 
read partitionMap and expects it to be non-null. 
                
> Deadlock between leader-finder-thread and consumer-fetcher-thread during 
> broker failure
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-618
>                 URL: https://issues.apache.org/jira/browse/KAFKA-618
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Joel Koshy
>            Priority: Blocker
>             Fix For: 0.8
>
>
> This causes the test failure reported in KAFKA-607. This affects high-level 
> consumers - if they hit the deadlock then they would get wedged (or at least 
> until the consumer timeout).
> Here is the threaddump output that shows the issue:
> Found one Java-level deadlock:
> =============================
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1":
>   waiting for ownable synchronizer 0x00007f2283ad0000, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by 
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread"
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread":
>   waiting to lock monitor 0x00007f2288297190 (object 0x00007f2283ab01d0, a 
> java.lang.Object),
>   which is held by 
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1"
> Java stack information for the threads listed above:
> ===================================================
> "ConsumerFetcherThread-console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-0-1":
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00007f2283ad0000> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178)
>         at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
>         at 
> java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
>         at 
> kafka.consumer.ConsumerFetcherManager.getPartitionTopicInfo(ConsumerFetcherManager.scala:131)
>         at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:43)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:116)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$doWork$5.apply(AbstractFetcherThread.scala:99)
>         at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:99)
>         - locked <0x00007f2283ab01d0> (a java.lang.Object)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)
> "console-consumer-41755_jkoshy-ld-1353026496639-b0e24a70-leader-finder-thread":
>         at 
> kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:142)
>         - waiting to lock <0x00007f2283ab01d0> (a java.lang.Object)
>         at 
> kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:49)
>         - locked <0x00007f2283ab0338> (a java.lang.Object)
>         at 
> kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:81)
>         at 
> kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$5.apply(ConsumerFetcherManager.scala:76)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>         at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
>         at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>         at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>         at 
> kafka.consumer.ConsumerFetcherManager$$anon$1.doWork(ConsumerFetcherManager.scala:76)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:50)
> Found 1 deadlock.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to