Joel Koshy created KAFKA-914: -------------------------------- Summary: Deadlock between initial rebalance and watcher-triggered rebalances Key: KAFKA-914 URL: https://issues.apache.org/jira/browse/KAFKA-914 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8
Summary doesn't give the full picture and the fetcher-manager/fetcher-thread code is very complex so it's a bit hard to articulate the following very clearly. I will try and describe the sequence that results in a deadlock when starting up a large number of consumers at around the same time: - When a consumer's createMessageStream method is called, it initiates an initial inline rebalance. - However, before the above initial rebalance actually begins, a ZK watch may trigger (due to some other consumers starting up) and initiate a rebalance. This happens successfully so fetchers start and start filling up the chunk queues. - Another watch triggers and initiates yet another rebalance. This rebalance attempt tries to close the fetchers. Before the fetchers are stopped, we shutdown the leader-finder-thread to prevent new fetchers from being started. - The shutdown is accomplished by interrupting the leader-finder-thread and then awaiting its shutdown latch. - If the leader-finder-thread still has a partition without leader to process and tries to add a fetcher for it, it will get an exception (InterruptedException if acquiring the partitionMapLock or ClosedByInterruptException if performing an offset request). If we get an InterruptedException the thread's interrupted flag is cleared. - However, the leader-finder-thread may have multiple partitions without leader that it is currently processing. So if the interrupted flag is cleared and the leader-finder-thread tries to add a fetcher for a another partition, it does not receive an InterruptedException when it tries to acquire the partitionMapLock. It can end up blocking indefinitely at that point. - The problem is that until now, the createMessageStream's initial inline rebalance has not yet returned - it is blocked on the rebalance lock waiting on the second watch-triggered rebalance to complete. i.e., the consumer iterators have not been created yet and thus the fetcher queues get filled up. [td1] - As a result, processPartitionData (which holds on to the partitionMapLock) in one or more fetchers will be blocked trying to enqueue into a full chunk queue.[td2] - So the leader-finder-thread cannot finish adding fetchers for the remaining partitions without leader and thus cannot shutdown. One way to fix would be to let the exception from the leader-finder-thread propagate outside if the leader-finder-thread is currently shutting down - and avoid the subsequent (unnecessary) attempt to add a fetcher and lock partitionMapLock. There may be more elegant fixes (such as rewriting the whole consumer manager logic) but obviously we want to avoid extensive changes at this point in 0.8. Relevant portions of the thread-dump are here: [td1] createMessageStream's initial inline rebalance (blocked on the ongoing watch-triggered rebalance) 2013-05-20_17:50:13.04848 "main" prio=10 tid=0x00007f5960008000 nid=0x3772 waiting for monitor entry [0x00007f59666c3000] 2013-05-20_17:50:13.04848 java.lang.Thread.State: BLOCKED (on object monitor) 2013-05-20_17:50:13.04848 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368) 2013-05-20_17:50:13.04849 - waiting to lock <0x00007f58637dfe10> (a java.lang.Object) 2013-05-20_17:50:13.04849 at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678) 2013-05-20_17:50:13.04850 at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712) 2013-05-20_17:50:13.04850 at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140) 2013-05-20_17:50:13.04850 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) 2013-05-20_17:50:13.04850 at kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118) 2013-05-20_17:50:13.04850 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-20_17:50:13.04851 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) 2013-05-20_17:50:13.04851 at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) 2013-05-20_17:50:13.04851 at scala.collection.immutable.List.foreach(List.scala:45) 2013-05-20_17:50:13.04851 at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) 2013-05-20_17:50:13.04852 at scala.collection.immutable.List.map(List.scala:45) 2013-05-20_17:50:13.04852 at kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118) 2013-05-20_17:50:13.04852 at kafka.tools.MirrorMaker.main(MirrorMaker.scala) [td2] A consumer fetcher thread blocked on full queue. 2013-05-20_17:50:13.04703 "ConsumerFetcherThread-xxxx-1368836182178-2009023c-0-3248" prio=10 tid=0x00007f57a4010800 nid=0x3920 waiting on condition [0x00 007f58316ae000] 2013-05-20_17:50:13.04703 java.lang.Thread.State: WAITING (parking) 2013-05-20_17:50:13.04703 at sun.misc.Unsafe.park(Native Method) 2013-05-20_17:50:13.04704 - parking to wait for <0x00007f586381d6c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 2013-05-20_17:50:13.04704 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-20_17:50:13.04704 at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) 2013-05-20_17:50:13.04704 at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) 2013-05-20_17:50:13.04704 at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) 2013-05-20_17:50:13.04705 at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:50) 2013-05-20_17:50:13.04706 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:131) 2013-05-20_17:50:13.04707 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112) 2013-05-20_17:50:13.04708 at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) 2013-05-20_17:50:13.04709 at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112) 2013-05-20_17:50:13.04709 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) 2013-05-20_17:50:13.04709 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 2 [td3] Second watch-triggered rebalance 2013-05-20_17:50:13.04725 "xxxx-1368836182178-2009023c_watcher_executor" prio=10 tid=0x00007f5960777800 nid=0x37af waiting on condition [0x00007f58318b00 00] 2013-05-20_17:50:13.04725 java.lang.Thread.State: WAITING (parking) 2013-05-20_17:50:13.04726 at sun.misc.Unsafe.park(Native Method) 2013-05-20_17:50:13.04726 - parking to wait for <0x00007f5863728de8> (a java.util.concurrent.CountDownLatch$Sync) 2013-05-20_17:50:13.04726 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-20_17:50:13.04727 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-20_17:50:13.04727 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) 2013-05-20_17:50:13.04728 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) 2013-05-20_17:50:13.04728 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 2013-05-20_17:50:13.04729 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) 2013-05-20_17:50:13.04729 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125) 2013-05-20_17:50:13.04730 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo nnector.scala:486) 2013-05-20_17:50:13.04730 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523) 2013-05-20_17:50:13.04731 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala :420) 2013-05-20_17:50:13.04731 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373) 2013-05-20_17:50:13.04732 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 2013-05-20_17:50:13.04733 at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) 2013-05-20_17:50:13.04733 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368) 2013-05-20_17:50:13.04733 - locked <0x00007f58637dfe10> (a java.lang.Object) 2013-05-20_17:50:13.04734 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:325) [td4] leader-finder-thread still trying to process partitions without leader, blocked on the partitionMapLock held by processPartitionData in td2. 2013-05-20_17:50:13.04712 "xxxx-1368836182178-2009023c-leader-finder-thread" prio=10 tid=0x00007f57b0027800 nid=0x38d8 waiting on condition [0x00007f5831 7af000] 2013-05-20_17:50:13.04712 java.lang.Thread.State: WAITING (parking) 2013-05-20_17:50:13.04713 at sun.misc.Unsafe.park(Native Method) 2013-05-20_17:50:13.04713 - parking to wait for <0x00007f586375e3d8> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) 2013-05-20_17:50:13.04713 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-20_17:50:13.04714 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-20_17:50:13.04714 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:867) 2013-05-20_17:50:13.04717 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1201) 2013-05-20_17:50:13.04718 at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312) 2013-05-20_17:50:13.04718 at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:173) 2013-05-20_17:50:13.04719 at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48) 2013-05-20_17:50:13.04719 - locked <0x00007f586374b040> (a java.lang.Object) 2013-05-20_17:50:13.04719 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:83) 2013-05-20_17:50:13.04720 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79) 2013-05-20_17:50:13.04721 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-20_17:50:13.04721 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-20_17:50:13.04721 at scala.collection.Iterator$class.foreach(Iterator.scala:631) 2013-05-20_17:50:13.04722 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) 2013-05-20_17:50:13.04723 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) 2013-05-20_17:50:13.04723 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 2013-05-20_17:50:13.04723 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) 2013-05-20_17:50:13.04724 at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79) 2013-05-20_17:50:13.04724 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira