[ 
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-914:
-----------------------------

    Attachment: KAFKA-914-v1.patch

Patch with the mentioned fix.

1 - I added comments with some detail since the manager/fetcher/connector 
interaction is very tricky.
2 - Passing through throwables while shutting down. The isRunning check is 
probably unnecessary, but safer to keep.
3 - Made the following changes to the mirrormaker - I can put that in a 
separate jira as well.
  a - Currently if no streams are created, the mirrormaker doesn't quit. 
Setting streams to empty/nil fixes that issue.
  b - If a consumer-side exception (e.g., iterator timeout) gets thrown the 
mirror-maker does not exit. Addressed this by awaiting on the consumer threads 
at the end of the main method.


                
> Deadlock between initial rebalance and watcher-triggered rebalances
> -------------------------------------------------------------------
>
>                 Key: KAFKA-914
>                 URL: https://issues.apache.org/jira/browse/KAFKA-914
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Joel Koshy
>             Fix For: 0.8
>
>         Attachments: KAFKA-914-v1.patch
>
>
> Summary doesn't give the full picture and the fetcher-manager/fetcher-thread  
>                                                                               
>                                                 
> code is very complex so it's a bit hard to articulate the following very      
>                                                                               
>                                                 
> clearly. I will try and describe the sequence that results in a deadlock      
>                                                                               
>                                                 
> when starting up a large number of consumers at around the same time:         
>                                                                               
>                                                 
>                                                                               
>                                                                               
>                                                 
> - When a consumer's createMessageStream method is called, it initiates an     
>                                                                               
>                                                 
>   initial inline rebalance.                                                   
>                                                                               
>                                                 
> - However, before the above initial rebalance actually begins, a ZK watch     
>                                                                               
>                                                 
>   may trigger (due to some other consumers starting up) and initiate a        
>                                                                               
>                                                 
>   rebalance. This happens successfully so fetchers start and start filling    
>                                                                               
>                                                 
>   up the chunk queues.                                                        
>                                                                               
>                                                 
> - Another watch triggers and initiates yet another rebalance. This rebalance  
>                                                                               
>                                                 
>   attempt tries to close the fetchers. Before the fetchers are stopped, we    
>                                                                               
>                                                 
>   shutdown the leader-finder-thread to prevent new fetchers from being        
>                                                                               
>                                                 
>   started.                                                                    
>                                                                               
>                                                 
> - The shutdown is accomplished by interrupting the leader-finder-thread and   
>                                                                               
>                                                 
>   then awaiting its shutdown latch.                                           
>                                                                               
>                                                 
> - If the leader-finder-thread still has a partition without leader to         
>                                                                               
>                                                 
>   process and tries to add a fetcher for it, it will get an exception         
>                                                                               
>                                                 
>   (InterruptedException if acquiring the partitionMapLock or                  
>                                                                               
>                                                 
>   ClosedByInterruptException if performing an offset request). If we get an   
>                                                                               
>                                                 
>   InterruptedException the thread's interrupted flag is cleared.              
>                                                                               
>                                                 
> - However, the leader-finder-thread may have multiple partitions without      
>                                                                               
>                                                 
>   leader that it is currently processing. So if the interrupted flag is       
>                                                                               
>                                                 
>   cleared and the leader-finder-thread tries to add a fetcher for a another   
>                                                                               
>                                                 
>   partition, it does not receive an InterruptedException when it tries to     
>                                                                               
>                                                 
>   acquire the partitionMapLock. It can end up blocking indefinitely at that   
>                                                                               
>                                                 
>   point.                                                                      
>                                                                               
>                                                 
> - The problem is that until now, the createMessageStream's initial inline     
>                                                                               
>                                                 
>   rebalance has not yet returned - it is blocked on the rebalance lock        
>                                                                               
>                                                 
>   waiting on the second watch-triggered rebalance to complete. i.e., the      
>                                                                               
>                                                 
>   consumer iterators have not been created yet and thus the fetcher queues    
>                                                                               
>                                                 
>   get filled up. [td1]                                                        
>                                                                               
>                                                 
> - As a result, processPartitionData (which holds on to the partitionMapLock)  
>                                                                               
>                                                 
>   in one or more fetchers will be blocked trying to enqueue into a full       
>                                                                               
>                                                 
>   chunk queue.[td2]                                                           
>                                                                               
>                                                 
> - So the leader-finder-thread cannot finish adding fetchers for the           
>                                                                               
>                                                 
>   remaining partitions without leader and thus cannot shutdown.               
>                                                                               
>                                                 
>                                                                               
>                                                                               
>                                                 
> One way to fix would be to let the exception from the leader-finder-thread    
>                                                                               
>                                                 
> propagate outside if the leader-finder-thread is currently shutting down -    
>                                                                               
>                                                 
> and avoid the subsequent (unnecessary) attempt to add a fetcher and lock      
>                                                                               
>                                                 
> partitionMapLock. There may be more elegant fixes (such as rewriting the      
>                                                                               
>                                                 
> whole consumer manager logic) but obviously we want to avoid extensive        
>                                                                               
>                                                 
> changes at this point in 0.8.                                                 
>                                                                               
>                                                 
>                                                                               
>                                                                               
>                                                 
> Relevant portions of the thread-dump are here:                                
>                                                                               
>                                                 
>                                                                               
>                                                                               
>                                                 
> [td1] createMessageStream's initial inline rebalance (blocked on the ongoing  
>                                                                               
>                                                 
> watch-triggered rebalance)                                                    
>                                                                               
>                                                 
>                                                                               
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04848 "main" prio=10 tid=0x00007f5960008000 nid=0x3772 
> waiting for monitor entry [0x00007f59666c3000]                                
>                                                    
> 2013-05-20_17:50:13.04848    java.lang.Thread.State: BLOCKED (on object 
> monitor)                                                                      
>                                                       
> 2013-05-20_17:50:13.04848       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
>                                                      
> 2013-05-20_17:50:13.04849       - waiting to lock <0x00007f58637dfe10> (a 
> java.lang.Object)                                                             
>                                                     
> 2013-05-20_17:50:13.04849       at 
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
>                           
> 2013-05-20_17:50:13.04850       at 
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712)
>                                                             
> 2013-05-20_17:50:13.04850       at 
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
>                                                              
> 2013-05-20_17:50:13.04850       at 
> kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)               
>                                                                               
>              
> 2013-05-20_17:50:13.04850       at 
> kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)               
>                                                                               
>              
> 2013-05-20_17:50:13.04850       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>                                                                               
>            
> 2013-05-20_17:50:13.04851       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>                                                                               
>            
> 2013-05-20_17:50:13.04851       at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>                                                                               
>              
> 2013-05-20_17:50:13.04851       at 
> scala.collection.immutable.List.foreach(List.scala:45)                        
>                                                                               
>              
> 2013-05-20_17:50:13.04851       at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)         
>                                                                               
>              
> 2013-05-20_17:50:13.04852       at 
> scala.collection.immutable.List.map(List.scala:45)                            
>                                                                               
>              
> 2013-05-20_17:50:13.04852       at 
> kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)                          
>                                                                               
>              
> 2013-05-20_17:50:13.04852       at 
> kafka.tools.MirrorMaker.main(MirrorMaker.scala)                               
>                                                                               
>              
>                                                                               
>                                                                               
>                                                 
> [td2] A consumer fetcher thread blocked on full queue.                        
>                                                                               
>                                                 
>                                                                               
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04703 
> "ConsumerFetcherThread-xxxx-1368836182178-2009023c-0-3248" prio=10 
> tid=0x00007f57a4010800 nid=0x3920 waiting on condition [0x00                  
>                                  
> 007f58316ae000]                                                               
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04703    java.lang.Thread.State: WAITING (parking)        
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04703       at sun.misc.Unsafe.park(Native Method)        
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04704       - parking to wait for  <0x00007f586381d6c0> 
> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     
>                                                   
> 2013-05-20_17:50:13.04704       at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)             
>                                                                               
>              
> 2013-05-20_17:50:13.04704       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>                                                         
> 2013-05-20_17:50:13.04704       at 
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)    
>                                                                               
>              
> 2013-05-20_17:50:13.04704       at 
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)        
>                                                                               
>              
> 2013-05-20_17:50:13.04705       at 
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:50)
>                                                                               
>   
> 2013-05-20_17:50:13.04706       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:131)
>                                                                  
> 2013-05-20_17:50:13.04707       at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112)
>                                                                  
> 2013-05-20_17:50:13.04708       at 
> scala.collection.immutable.Map$Map1.foreach(Map.scala:105)                    
>                                                                               
>              
> 2013-05-20_17:50:13.04709       at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112)
>                                                                               
>     
> 2013-05-20_17:50:13.04709       at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)     
>                                                                               
>              
> 2013-05-20_17:50:13.04709       at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)               
>                                                                               
>              
> 2                                                                             
>                                                                               
>                                                 
>                                                                               
>                                                                               
>                                                 
> [td3] Second watch-triggered rebalance                                        
>                                                                               
>                                                 
>                                                                               
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04725 "xxxx-1368836182178-2009023c_watcher_executor" 
> prio=10 tid=0x00007f5960777800 nid=0x37af waiting on condition 
> [0x00007f58318b00                                                   
> 00]                                                                           
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04725    java.lang.Thread.State: WAITING (parking)        
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04726       at sun.misc.Unsafe.park(Native Method)        
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04726       - parking to wait for  <0x00007f5863728de8> 
> (a java.util.concurrent.CountDownLatch$Sync)                                  
>                                                   
> 2013-05-20_17:50:13.04726       at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)             
>                                                                               
>              
> 2013-05-20_17:50:13.04727       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>                                                          
> 2013-05-20_17:50:13.04727       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>                                                   
> 2013-05-20_17:50:13.04728       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>                                                    
> 2013-05-20_17:50:13.04728       at 
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)            
>                                                                               
>              
> 2013-05-20_17:50:13.04729       at 
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)          
>                                                                               
>              
> 2013-05-20_17:50:13.04729       at 
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
>                                                                               
>     
> 2013-05-20_17:50:13.04730       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
> nnector.scala:486)                                                            
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04730       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
>                                                        
> 2013-05-20_17:50:13.04731       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
> :420)                                                                         
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04731       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
>                             
> 2013-05-20_17:50:13.04732       at 
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)  
>                                                                               
>              
> 2013-05-20_17:50:13.04733       at 
> scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)      
>                                                                               
>              
> 2013-05-20_17:50:13.04733       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
>                                                      
> 2013-05-20_17:50:13.04733       - locked <0x00007f58637dfe10> (a 
> java.lang.Object)                                                             
>                                                              
> 2013-05-20_17:50:13.04734       at 
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:325)
>                                                          
>                                                                               
>                                                                               
>                                                 
> [td4] leader-finder-thread still trying to process partitions without leader, 
> blocked on the partitionMapLock held by processPartitionData in td2.          
>                                                 
>                                                                               
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04712 "xxxx-1368836182178-2009023c-leader-finder-thread" 
> prio=10 tid=0x00007f57b0027800 nid=0x38d8 waiting on condition [0x00007f5831  
>                                                  
> 7af000]                                                                       
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04712    java.lang.Thread.State: WAITING (parking)        
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04713       at sun.misc.Unsafe.park(Native Method)        
>                                                                               
>                                                 
> 2013-05-20_17:50:13.04713       - parking to wait for  <0x00007f586375e3d8> 
> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)                      
>                                                   
> 2013-05-20_17:50:13.04713       at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)             
>                                                                               
>              
> 2013-05-20_17:50:13.04714       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>                                                          
> 2013-05-20_17:50:13.04714       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:867)
>                                                         
> 2013-05-20_17:50:13.04717       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1201)
>                                                          
> 2013-05-20_17:50:13.04718       at 
> java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312)
>                                                                               
>          
> 2013-05-20_17:50:13.04718       at 
> kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:173)
>                                                                               
>            
> 2013-05-20_17:50:13.04719       at 
> kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48)
>                                                                               
>             
> 2013-05-20_17:50:13.04719       - locked <0x00007f586374b040> (a 
> java.lang.Object)                                                             
>                                                              
> 2013-05-20_17:50:13.04719       at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:83)
>                                                         
> 2013-05-20_17:50:13.04720       at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
>                                                         
> 2013-05-20_17:50:13.04721       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)   
>                                                                               
>              
> 2013-05-20_17:50:13.04721       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)   
>                                                                               
>              
> 2013-05-20_17:50:13.04721       at 
> scala.collection.Iterator$class.foreach(Iterator.scala:631)                   
>                                                                               
>              
> 2013-05-20_17:50:13.04722       at 
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)       
>                                                                               
>              
> 2013-05-20_17:50:13.04723       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)    
>                                                                               
>              
> 2013-05-20_17:50:13.04723       at 
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)               
>                                                                               
>              
> 2013-05-20_17:50:13.04723       at 
> scala.collection.mutable.HashMap.foreach(HashMap.scala:80)                    
>                                                                               
>              
> 2013-05-20_17:50:13.04724       at 
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
>                                                                          
> 2013-05-20_17:50:13.04724       at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)               
>                                                                               
>              

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to