[ 
https://issues.apache.org/jira/browse/KAFKA-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6974.
-------------------------------
    Resolution: Won't Fix

> Changes the interaction between request handler threads and fetcher threads 
> into an ASYNC model
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6974
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6974
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Lucas Wang
>            Priority: Minor
>
> Problem Statement:
> At LinkedIn, occasionally our clients complain about receiving consant 
> NotLeaderForPartition exceptions 
> Investigations:
> For one investigated case, the cluster was going through a rolling bounce. 
> And we saw there was a ~8 minutes delay between an old partition leader 
> resigning and the new leader becoming active, based on entries of "Broker xxx 
> handling LeaderAndIsr request" in the state change log.
> Our monitoring shows the LeaderAndISR request local time during the incident 
> went up to ~4 minutes.
> Explanations:
> One possible explanation of the ~8 minutes of delay is:
> During controlled shutdown of a broker, the partitions whose leaders lie on 
> the shutting down broker need to go through leadership transitions. And the 
> controller process partitions in batches with each batch having 
> config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
> If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 
> minutes, then the subsequent LeaderAndISR requests can have an accumulated 
> delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that 
> subsequent LeaderAndISR requests are blocked in a muted channel, given only 
> one LeaderAndISR request can be processed at a time with a 
> maxInFlightRequestsPerConnection setting of 1. When that happens, no existing 
> metric would show the total delay of 8 or 12 minutes for muted requests.
> Now the question is why it took ~4 minutes for the the 1st LeaderAndISR 
> request to finish.
> Explanation for the ~4 minutes of local time for LeaderAndISR request:
> During processing of an LeaderAndISR request, the request handler thread 
> needs to add partitions to or remove partitions from partitionStates field of 
> the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the 
> size of the partitionStates field. On the other hand, background fetcher 
> threads need to iterate through all the partitions in partitionStates in 
> order to build fetch request, and process fetch responses. The 
> synchronization between request handler thread and the fetcher threads is 
> done through a partitionMapLock. 
> Specifically, the fetcher threads may acquire the partitionMapLock, and then 
> calls the following functions for processing the fetch response
> (1) processPartitionData, which in turn calls 
> (2) Replica.maybeIncrementLogStartOffset, which calls 
> (3) Log.maybeIncrementLogStartOffset, which calls 
> (4) LeaderEpochCache.clearAndFlushEarliest.
> Now two factors contribute to the long holding of the partitionMapLock,
> 1. function (4) above entails calling sync() to make sure data gets 
> persistent to the disk, which may potentially have a long latency
> 2. All the 4 functions above can potentially be called for each partition in 
> the fetch response, multiplying the sync() latency by a factor of n.
> The end result is that the request handler thread got blocked for a long time 
> trying to acquire the partitionMapLock of some fetcher inside 
> AbstractFetcherManager.shutdownIdleFetcherThreads since checking each 
> fetcher's partitionCount requires getting the partitionMapLock.
> In our testing environment, we reproduced the problem and confirmed the 
> explanation above with a request handler thread getting blocked for 10 
> seconds trying to acquire the partitionMapLock of one particular fetcher 
> thread, while there are many log entries showing "Incrementing log start 
> offset of partition..."
> Proposed change:
> We propose to change the interaction between the request handler threads and 
> the fetcher threads to an ASYNC model by using an event queue. All requests 
> to add or remove partitions, or shutdown idle fetcher threads are modeled as 
> items in the event queue. And only the fetcher threads can take items out of 
> the event queue and actually process them.
> In the new ASYNC model, in order to be able to process an infinite sequence 
> of FetchRequests, a fetcher thread initially has one FetchRequest, and after 
> it's done processing one FetchRequest, it enqueues one more into its own 
> event queue.
> Also since the current AbstractFetcherThread logic is inherited by both the 
> replica-fetcher-threads and the consumer-fetcher-threads for the old 
> consumer, and the latter has been deprecated, we plan to implement the ASYNC 
> model with a clean-slate approach, and only support the 
> replica-fetcher-threads, in order to make the code cleaner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to