[ 
https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-916:
-----------------------------

    Attachment: KAFKA-916-v1.patch

Agreed - I think that should fix the issue.
                
> Deadlock between fetcher shutdown and handling partitions with error
> --------------------------------------------------------------------
>
>                 Key: KAFKA-916
>                 URL: https://issues.apache.org/jira/browse/KAFKA-916
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Joel Koshy
>             Fix For: 0.8
>
>         Attachments: KAFKA-916-v1.patch
>
>
> Here is another consumer deadlock that we encountered. All consumers are
> vulnerable to this during a rebalance if there happen to be partitions in
> error.
> On a rebalance, the fetcher manager closes all fetchers and this holds on to
> the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1]
> While the fetcher manager is iterating over fetchers to stop them, a fetcher
> that is yet to be stopped hits an error on a partition and proceeds to
> handle partitions with error [t2]. This handling involves looking up the
> fetcher for that partition and then removing it from the fetcher's set of
> partitions to consume. This requires grabbing the same map lock in [t1],
> hence the deadlock.
> [t1]
> 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b 
> waiting on condition [0x00007f1b2bd38000]
> 2013-05-22_20:23:11.95767    java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95767     at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95767     - parking to wait for  <0x00007f1a25780598> (a 
> java.util.concurrent.CountDownLatch$Sync)
> 2013-05-22_20:23:11.95767     at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95767     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 2013-05-22_20:23:11.95768     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> 2013-05-22_20:23:11.95768     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> 2013-05-22_20:23:11.95768     at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> 2013-05-22_20:23:11.95768     at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> 2013-05-22_20:23:11.95769     at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68)
> 2013-05-22_20:23:11.95769     at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79)
> 2013-05-22_20:23:11.95769     at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78)
> 2013-05-22_20:23:11.95769     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95769     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> 2013-05-22_20:23:11.95770     at 
> scala.collection.Iterator$class.foreach(Iterator.scala:631)
> 2013-05-22_20:23:11.95770     at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> 2013-05-22_20:23:11.95770     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> 2013-05-22_20:23:11.95770     at 
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 2013-05-22_20:23:11.95771     at 
> scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> 2013-05-22_20:23:11.95771     at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78)
> ---> 2013-05-22_20:23:11.95771        - locked <0x00007f1a2ae92510> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95771     at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156)
> 2013-05-22_20:23:11.95771     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488)
> 2013-05-22_20:23:11.95772     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525)
> 2013-05-22_20:23:11.95772     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422)
> 2013-05-22_20:23:11.95772     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374)
> 2013-05-22_20:23:11.95772     at 
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> 2013-05-22_20:23:11.95773     at 
> scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> 2013-05-22_20:23:11.95773     at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369)
> 2013-05-22_20:23:11.95773     - locked <0x00007f1a2a29b450> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95773     at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680)
> 2013-05-22_20:23:11.95774     at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754)
> 2013-05-22_20:23:11.95774     at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74)
> 2013-05-22_20:23:11.95774     at 
> kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69)
> 2013-05-22_20:23:11.95774     - locked <0x00007f1a2a69b1d8> (a 
> java.lang.Object)
> 2013-05-22_20:23:11.95774     at 
> kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46)
> 2013-05-22_20:23:11.95775     at 
> kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33)
> 2013-05-22_20:23:11.95775     at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721)
> 2013-05-22_20:23:11.95775     at 
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
> 2013-05-22_20:23:11.95776     at 
> kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95776     at 
> kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95776     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95776     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95776     at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> 2013-05-22_20:23:11.95777     at 
> scala.collection.immutable.List.foreach(List.scala:45)
> 2013-05-22_20:23:11.95777     at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> 2013-05-22_20:23:11.95777     at 
> scala.collection.immutable.List.map(List.scala:45)
> 2013-05-22_20:23:11.95777     at 
> kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
> 2013-05-22_20:23:11.95777     at 
> kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> [t2]
> 2013-05-22_20:23:11.87465 
> "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 
> tid=0x00007f196401a800 nid=0x717a waiting for monitor entry 
> [0x00007f19bf0ef000]
> 2013-05-22_20:23:11.87466    java.lang.Thread.State: BLOCKED (on object 
> monitor)
> 2013-05-22_20:23:11.87467     at 
> kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57)
> ---> 2013-05-22_20:23:11.87467        - waiting to lock <0x00007f1a2ae92510> 
> (a java.lang.Object)
> 2013-05-22_20:23:11.87468     at 
> kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95682     at 
> kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95683     at 
> scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
> 2013-05-22_20:23:11.95684     at 
> kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170)
> 2013-05-22_20:23:11.95684     at 
> kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69)
> 2013-05-22_20:23:11.95684     at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168)
> 2013-05-22_20:23:11.95684     at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> 2013-05-22_20:23:11.95684     at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> 2013-05-22_20:23:11.95686 
> 2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 
> tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000]
> 2013-05-22_20:23:11.95686    java.lang.Thread.State: WAITING (parking)
> 2013-05-22_20:23:11.95686     at sun.misc.Unsafe.park(Native Method)
> 2013-05-22_20:23:11.95686     - parking to wait for  <0x00007f1a2a4426f8> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2013-05-22_20:23:11.95687     at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
> 2013-05-22_20:23:11.95687     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> 2013-05-22_20:23:11.95687     at 
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> 2013-05-22_20:23:11.95687     at 
> org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to