Joel Koshy created KAFKA-914:
--------------------------------

             Summary: Deadlock between initial rebalance and watcher-triggered 
rebalances
                 Key: KAFKA-914
                 URL: https://issues.apache.org/jira/browse/KAFKA-914
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.8
            Reporter: Joel Koshy
             Fix For: 0.8


Summary doesn't give the full picture and the fetcher-manager/fetcher-thread    
                                                                                
                                            
code is very complex so it's a bit hard to articulate the following very        
                                                                                
                                            
clearly. I will try and describe the sequence that results in a deadlock        
                                                                                
                                            
when starting up a large number of consumers at around the same time:           
                                                                                
                                            
                                                                                
                                                                                
                                            
- When a consumer's createMessageStream method is called, it initiates an       
                                                                                
                                            
  initial inline rebalance.                                                     
                                                                                
                                            
- However, before the above initial rebalance actually begins, a ZK watch       
                                                                                
                                            
  may trigger (due to some other consumers starting up) and initiate a          
                                                                                
                                            
  rebalance. This happens successfully so fetchers start and start filling      
                                                                                
                                            
  up the chunk queues.                                                          
                                                                                
                                            
- Another watch triggers and initiates yet another rebalance. This rebalance    
                                                                                
                                            
  attempt tries to close the fetchers. Before the fetchers are stopped, we      
                                                                                
                                            
  shutdown the leader-finder-thread to prevent new fetchers from being          
                                                                                
                                            
  started.                                                                      
                                                                                
                                            
- The shutdown is accomplished by interrupting the leader-finder-thread and     
                                                                                
                                            
  then awaiting its shutdown latch.                                             
                                                                                
                                            
- If the leader-finder-thread still has a partition without leader to           
                                                                                
                                            
  process and tries to add a fetcher for it, it will get an exception           
                                                                                
                                            
  (InterruptedException if acquiring the partitionMapLock or                    
                                                                                
                                            
  ClosedByInterruptException if performing an offset request). If we get an     
                                                                                
                                            
  InterruptedException the thread's interrupted flag is cleared.                
                                                                                
                                            
- However, the leader-finder-thread may have multiple partitions without        
                                                                                
                                            
  leader that it is currently processing. So if the interrupted flag is         
                                                                                
                                            
  cleared and the leader-finder-thread tries to add a fetcher for a another     
                                                                                
                                            
  partition, it does not receive an InterruptedException when it tries to       
                                                                                
                                            
  acquire the partitionMapLock. It can end up blocking indefinitely at that     
                                                                                
                                            
  point.                                                                        
                                                                                
                                            
- The problem is that until now, the createMessageStream's initial inline       
                                                                                
                                            
  rebalance has not yet returned - it is blocked on the rebalance lock          
                                                                                
                                            
  waiting on the second watch-triggered rebalance to complete. i.e., the        
                                                                                
                                            
  consumer iterators have not been created yet and thus the fetcher queues      
                                                                                
                                            
  get filled up. [td1]                                                          
                                                                                
                                            
- As a result, processPartitionData (which holds on to the partitionMapLock)    
                                                                                
                                            
  in one or more fetchers will be blocked trying to enqueue into a full         
                                                                                
                                            
  chunk queue.[td2]                                                             
                                                                                
                                            
- So the leader-finder-thread cannot finish adding fetchers for the             
                                                                                
                                            
  remaining partitions without leader and thus cannot shutdown.                 
                                                                                
                                            
                                                                                
                                                                                
                                            
One way to fix would be to let the exception from the leader-finder-thread      
                                                                                
                                            
propagate outside if the leader-finder-thread is currently shutting down -      
                                                                                
                                            
and avoid the subsequent (unnecessary) attempt to add a fetcher and lock        
                                                                                
                                            
partitionMapLock. There may be more elegant fixes (such as rewriting the        
                                                                                
                                            
whole consumer manager logic) but obviously we want to avoid extensive          
                                                                                
                                            
changes at this point in 0.8.                                                   
                                                                                
                                            
                                                                                
                                                                                
                                            
Relevant portions of the thread-dump are here:                                  
                                                                                
                                            
                                                                                
                                                                                
                                            
[td1] createMessageStream's initial inline rebalance (blocked on the ongoing    
                                                                                
                                            
watch-triggered rebalance)                                                      
                                                                                
                                            
                                                                                
                                                                                
                                            
2013-05-20_17:50:13.04848 "main" prio=10 tid=0x00007f5960008000 nid=0x3772 
waiting for monitor entry [0x00007f59666c3000]                                  
                                                 
2013-05-20_17:50:13.04848    java.lang.Thread.State: BLOCKED (on object 
monitor)                                                                        
                                                    
2013-05-20_17:50:13.04848       at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
                                                     
2013-05-20_17:50:13.04849       - waiting to lock <0x00007f58637dfe10> (a 
java.lang.Object)                                                               
                                                  
2013-05-20_17:50:13.04849       at 
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
                          
2013-05-20_17:50:13.04850       at 
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712)
                                                            
2013-05-20_17:50:13.04850       at 
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
                                                             
2013-05-20_17:50:13.04850       at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)                 
                                                                                
         
2013-05-20_17:50:13.04850       at 
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)                 
                                                                                
         
2013-05-20_17:50:13.04850       at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
                                                                                
         
2013-05-20_17:50:13.04851       at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
                                                                                
         
2013-05-20_17:50:13.04851       at 
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)  
                                                                                
         
2013-05-20_17:50:13.04851       at 
scala.collection.immutable.List.foreach(List.scala:45)                          
                                                                                
         
2013-05-20_17:50:13.04851       at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:206)           
                                                                                
         
2013-05-20_17:50:13.04852       at 
scala.collection.immutable.List.map(List.scala:45)                              
                                                                                
         
2013-05-20_17:50:13.04852       at 
kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)                            
                                                                                
         
2013-05-20_17:50:13.04852       at 
kafka.tools.MirrorMaker.main(MirrorMaker.scala)                                 
                                                                                
         
                                                                                
                                                                                
                                            
[td2] A consumer fetcher thread blocked on full queue.                          
                                                                                
                                            
                                                                                
                                                                                
                                            
2013-05-20_17:50:13.04703 
"ConsumerFetcherThread-xxxx-1368836182178-2009023c-0-3248" prio=10 
tid=0x00007f57a4010800 nid=0x3920 waiting on condition [0x00                    
                               
007f58316ae000]                                                                 
                                                                                
                                            
2013-05-20_17:50:13.04703    java.lang.Thread.State: WAITING (parking)          
                                                                                
                                            
2013-05-20_17:50:13.04703       at sun.misc.Unsafe.park(Native Method)          
                                                                                
                                            
2013-05-20_17:50:13.04704       - parking to wait for  <0x00007f586381d6c0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)          
                                             
2013-05-20_17:50:13.04704       at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)               
                                                                                
         
2013-05-20_17:50:13.04704       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
                                                        
2013-05-20_17:50:13.04704       at 
java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)      
                                                                                
         
2013-05-20_17:50:13.04704       at 
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)          
                                                                                
         
2013-05-20_17:50:13.04705       at 
kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:50)
                                                                                
2013-05-20_17:50:13.04706       at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:131)
                                                                 
2013-05-20_17:50:13.04707       at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112)
                                                                 
2013-05-20_17:50:13.04708       at 
scala.collection.immutable.Map$Map1.foreach(Map.scala:105)                      
                                                                                
         
2013-05-20_17:50:13.04709       at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112)
                                                                                
  
2013-05-20_17:50:13.04709       at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)       
                                                                                
         
2013-05-20_17:50:13.04709       at 
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)                 
                                                                                
         
2                                                                               
                                                                                
                                            
                                                                                
                                                                                
                                            
[td3] Second watch-triggered rebalance                                          
                                                                                
                                            
                                                                                
                                                                                
                                            
2013-05-20_17:50:13.04725 "xxxx-1368836182178-2009023c_watcher_executor" 
prio=10 tid=0x00007f5960777800 nid=0x37af waiting on condition 
[0x00007f58318b00                                                   
00]                                                                             
                                                                                
                                            
2013-05-20_17:50:13.04725    java.lang.Thread.State: WAITING (parking)          
                                                                                
                                            
2013-05-20_17:50:13.04726       at sun.misc.Unsafe.park(Native Method)          
                                                                                
                                            
2013-05-20_17:50:13.04726       - parking to wait for  <0x00007f5863728de8> (a 
java.util.concurrent.CountDownLatch$Sync)                                       
                                             
2013-05-20_17:50:13.04726       at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)               
                                                                                
         
2013-05-20_17:50:13.04727       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
                                                         
2013-05-20_17:50:13.04727       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
                                                  
2013-05-20_17:50:13.04728       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
                                                   
2013-05-20_17:50:13.04728       at 
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)              
                                                                                
         
2013-05-20_17:50:13.04729       at 
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)            
                                                                                
         
2013-05-20_17:50:13.04729       at 
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
                                                                                
  
2013-05-20_17:50:13.04730       at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
nnector.scala:486)                                                              
                                                                                
                                            
2013-05-20_17:50:13.04730       at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
                                                       
2013-05-20_17:50:13.04731       at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
:420)                                                                           
                                                                                
                                            
2013-05-20_17:50:13.04731       at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
                            
2013-05-20_17:50:13.04732       at 
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)    
                                                                                
         
2013-05-20_17:50:13.04733       at 
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)        
                                                                                
         
2013-05-20_17:50:13.04733       at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
                                                     
2013-05-20_17:50:13.04733       - locked <0x00007f58637dfe10> (a 
java.lang.Object)                                                               
                                                           
2013-05-20_17:50:13.04734       at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:325)
                                                         
                                                                                
                                                                                
                                            
[td4] leader-finder-thread still trying to process partitions without leader, 
blocked on the partitionMapLock held by processPartitionData in td2.            
                                              
                                                                                
                                                                                
                                            
2013-05-20_17:50:13.04712 "xxxx-1368836182178-2009023c-leader-finder-thread" 
prio=10 tid=0x00007f57b0027800 nid=0x38d8 waiting on condition [0x00007f5831    
                                               
7af000]                                                                         
                                                                                
                                            
2013-05-20_17:50:13.04712    java.lang.Thread.State: WAITING (parking)          
                                                                                
                                            
2013-05-20_17:50:13.04713       at sun.misc.Unsafe.park(Native Method)          
                                                                                
                                            
2013-05-20_17:50:13.04713       - parking to wait for  <0x00007f586375e3d8> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)                           
                                             
2013-05-20_17:50:13.04713       at 
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)               
                                                                                
         
2013-05-20_17:50:13.04714       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
                                                         
2013-05-20_17:50:13.04714       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:867)
                                                        
2013-05-20_17:50:13.04717       at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1201)
                                                         
2013-05-20_17:50:13.04718       at 
java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312)
                                                                                
       
2013-05-20_17:50:13.04718       at 
kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:173)
                                                                                
         
2013-05-20_17:50:13.04719       at 
kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48) 
                                                                                
         
2013-05-20_17:50:13.04719       - locked <0x00007f586374b040> (a 
java.lang.Object)                                                               
                                                           
2013-05-20_17:50:13.04719       at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:83)
                                                        
2013-05-20_17:50:13.04720       at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
                                                        
2013-05-20_17:50:13.04721       at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)     
                                                                                
         
2013-05-20_17:50:13.04721       at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)     
                                                                                
         
2013-05-20_17:50:13.04721       at 
scala.collection.Iterator$class.foreach(Iterator.scala:631)                     
                                                                                
         
2013-05-20_17:50:13.04722       at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)         
                                                                                
         
2013-05-20_17:50:13.04723       at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)      
                                                                                
         
2013-05-20_17:50:13.04723       at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)                 
                                                                                
         
2013-05-20_17:50:13.04723       at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:80)                      
                                                                                
         
2013-05-20_17:50:13.04724       at 
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
                                                                         
2013-05-20_17:50:13.04724       at 
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)                 
                                                                                
         


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to