[ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Joel Koshy updated KAFKA-916: ----------------------------- Attachment: KAFKA-916-v1.patch Agreed - I think that should fix the issue. > Deadlock between fetcher shutdown and handling partitions with error > -------------------------------------------------------------------- > > Key: KAFKA-916 > URL: https://issues.apache.org/jira/browse/KAFKA-916 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Joel Koshy > Fix For: 0.8 > > Attachments: KAFKA-916-v1.patch > > > Here is another consumer deadlock that we encountered. All consumers are > vulnerable to this during a rebalance if there happen to be partitions in > error. > On a rebalance, the fetcher manager closes all fetchers and this holds on to > the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] > While the fetcher manager is iterating over fetchers to stop them, a fetcher > that is yet to be stopped hits an error on a partition and proceeds to > handle partitions with error [t2]. This handling involves looking up the > fetcher for that partition and then removing it from the fetcher's set of > partitions to consume. This requires grabbing the same map lock in [t1], > hence the deadlock. > [t1] > 2013-05-22_20:23:11.95767 "main" prio=10 tid=0x00007f1b24007800 nid=0x573b > waiting on condition [0x00007f1b2bd38000] > 2013-05-22_20:23:11.95767 java.lang.Thread.State: WAITING (parking) > 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) > 2013-05-22_20:23:11.95767 - parking to wait for <0x00007f1a25780598> (a > java.util.concurrent.CountDownLatch$Sync) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > 2013-05-22_20:23:11.95767 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) > 2013-05-22_20:23:11.95768 at > java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) > 2013-05-22_20:23:11.95768 at > kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) > 2013-05-22_20:23:11.95769 at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95769 at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) > 2013-05-22_20:23:11.95770 at > scala.collection.Iterator$class.foreach(Iterator.scala:631) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) > 2013-05-22_20:23:11.95770 at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) > 2013-05-22_20:23:11.95771 at > scala.collection.mutable.HashMap.foreach(HashMap.scala:80) > 2013-05-22_20:23:11.95771 at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) > ---> 2013-05-22_20:23:11.95771 - locked <0x00007f1a2ae92510> (a > java.lang.Object) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) > 2013-05-22_20:23:11.95771 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) > 2013-05-22_20:23:11.95772 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) > 2013-05-22_20:23:11.95772 at > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > 2013-05-22_20:23:11.95773 at > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > 2013-05-22_20:23:11.95773 at > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:369) > 2013-05-22_20:23:11.95773 - locked <0x00007f1a2a29b450> (a > java.lang.Object) > 2013-05-22_20:23:11.95773 at > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:680) > 2013-05-22_20:23:11.95774 at > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.handleTopicEvent(ZookeeperConsumerConnector.scala:754) > 2013-05-22_20:23:11.95774 at > kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree1$1(ZookeeperTopicEventWatcher.scala:74) > 2013-05-22_20:23:11.95774 at > kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:69) > 2013-05-22_20:23:11.95774 - locked <0x00007f1a2a69b1d8> (a > java.lang.Object) > 2013-05-22_20:23:11.95774 at > kafka.consumer.ZookeeperTopicEventWatcher.startWatchingTopicEvents(ZookeeperTopicEventWatcher.scala:46) > 2013-05-22_20:23:11.95775 at > kafka.consumer.ZookeeperTopicEventWatcher.<init>(ZookeeperTopicEventWatcher.scala:33) > 2013-05-22_20:23:11.95775 at > kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:721) > 2013-05-22_20:23:11.95775 at > kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) > 2013-05-22_20:23:11.95776 at > kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118) > 2013-05-22_20:23:11.95776 at > kafka.tools.MirrorMaker$$anonfun$main$3.apply(MirrorMaker.scala:118) > 2013-05-22_20:23:11.95776 at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > 2013-05-22_20:23:11.95776 at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > 2013-05-22_20:23:11.95776 at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > 2013-05-22_20:23:11.95777 at > scala.collection.immutable.List.foreach(List.scala:45) > 2013-05-22_20:23:11.95777 at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > 2013-05-22_20:23:11.95777 at > scala.collection.immutable.List.map(List.scala:45) > 2013-05-22_20:23:11.95777 at > kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) > 2013-05-22_20:23:11.95777 at > kafka.tools.MirrorMaker.main(MirrorMaker.scala) > [t2] > 2013-05-22_20:23:11.87465 > "ConsumerFetcherThread-xxxx-1369238724254-cff180ff-0-505" prio=10 > tid=0x00007f196401a800 nid=0x717a waiting for monitor entry > [0x00007f19bf0ef000] > 2013-05-22_20:23:11.87466 java.lang.Thread.State: BLOCKED (on object > monitor) > 2013-05-22_20:23:11.87467 at > kafka.server.AbstractFetcherManager.removeFetcher(AbstractFetcherManager.scala:57) > ---> 2013-05-22_20:23:11.87467 - waiting to lock <0x00007f1a2ae92510> > (a java.lang.Object) > 2013-05-22_20:23:11.87468 at > kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170) > 2013-05-22_20:23:11.95682 at > kafka.consumer.ConsumerFetcherManager$$anonfun$addPartitionsWithError$2.apply(ConsumerFetcherManager.scala:170) > 2013-05-22_20:23:11.95683 at > scala.collection.mutable.HashSet.foreach(HashSet.scala:61) > 2013-05-22_20:23:11.95684 at > kafka.consumer.ConsumerFetcherManager.addPartitionsWithError(ConsumerFetcherManager.scala:170) > 2013-05-22_20:23:11.95684 at > kafka.consumer.ConsumerFetcherThread.handlePartitionsWithErrors(ConsumerFetcherThread.scala:69) > 2013-05-22_20:23:11.95684 at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:168) > 2013-05-22_20:23:11.95684 at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > 2013-05-22_20:23:11.95684 at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > 2013-05-22_20:23:11.95686 > 2013-05-22_20:23:11.95686 "main-EventThread" daemon prio=10 > tid=0x00007f1b2471d000 nid=0x605a waiting on condition [0x00007f19bedec000] > 2013-05-22_20:23:11.95686 java.lang.Thread.State: WAITING (parking) > 2013-05-22_20:23:11.95686 at sun.misc.Unsafe.park(Native Method) > 2013-05-22_20:23:11.95686 - parking to wait for <0x00007f1a2a4426f8> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > 2013-05-22_20:23:11.95687 at > java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) > 2013-05-22_20:23:11.95687 at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) > 2013-05-22_20:23:11.95687 at > java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) > 2013-05-22_20:23:11.95687 at > org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:503) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira