[
https://issues.apache.org/jira/browse/KAFKA-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Wang resolved KAFKA-6974.
-------------------------------
Resolution: Won't Fix
> Changes the interaction between request handler threads and fetcher threads
> into an ASYNC model
> -----------------------------------------------------------------------------------------------
>
> Key: KAFKA-6974
> URL: https://issues.apache.org/jira/browse/KAFKA-6974
> Project: Kafka
> Issue Type: Improvement
> Reporter: Lucas Wang
> Priority: Minor
>
> Problem Statement:
> At LinkedIn, occasionally our clients complain about receiving consant
> NotLeaderForPartition exceptions
> Investigations:
> For one investigated case, the cluster was going through a rolling bounce.
> And we saw there was a ~8 minutes delay between an old partition leader
> resigning and the new leader becoming active, based on entries of "Broker xxx
> handling LeaderAndIsr request" in the state change log.
> Our monitoring shows the LeaderAndISR request local time during the incident
> went up to ~4 minutes.
> Explanations:
> One possible explanation of the ~8 minutes of delay is:
> During controlled shutdown of a broker, the partitions whose leaders lie on
> the shutting down broker need to go through leadership transitions. And the
> controller process partitions in batches with each batch having
> config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
> If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4
> minutes, then the subsequent LeaderAndISR requests can have an accumulated
> delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that
> subsequent LeaderAndISR requests are blocked in a muted channel, given only
> one LeaderAndISR request can be processed at a time with a
> maxInFlightRequestsPerConnection setting of 1. When that happens, no existing
> metric would show the total delay of 8 or 12 minutes for muted requests.
> Now the question is why it took ~4 minutes for the the 1st LeaderAndISR
> request to finish.
> Explanation for the ~4 minutes of local time for LeaderAndISR request:
> During processing of an LeaderAndISR request, the request handler thread
> needs to add partitions to or remove partitions from partitionStates field of
> the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the
> size of the partitionStates field. On the other hand, background fetcher
> threads need to iterate through all the partitions in partitionStates in
> order to build fetch request, and process fetch responses. The
> synchronization between request handler thread and the fetcher threads is
> done through a partitionMapLock.
> Specifically, the fetcher threads may acquire the partitionMapLock, and then
> calls the following functions for processing the fetch response
> (1) processPartitionData, which in turn calls
> (2) Replica.maybeIncrementLogStartOffset, which calls
> (3) Log.maybeIncrementLogStartOffset, which calls
> (4) LeaderEpochCache.clearAndFlushEarliest.
> Now two factors contribute to the long holding of the partitionMapLock,
> 1. function (4) above entails calling sync() to make sure data gets
> persistent to the disk, which may potentially have a long latency
> 2. All the 4 functions above can potentially be called for each partition in
> the fetch response, multiplying the sync() latency by a factor of n.
> The end result is that the request handler thread got blocked for a long time
> trying to acquire the partitionMapLock of some fetcher inside
> AbstractFetcherManager.shutdownIdleFetcherThreads since checking each
> fetcher's partitionCount requires getting the partitionMapLock.
> In our testing environment, we reproduced the problem and confirmed the
> explanation above with a request handler thread getting blocked for 10
> seconds trying to acquire the partitionMapLock of one particular fetcher
> thread, while there are many log entries showing "Incrementing log start
> offset of partition..."
> Proposed change:
> We propose to change the interaction between the request handler threads and
> the fetcher threads to an ASYNC model by using an event queue. All requests
> to add or remove partitions, or shutdown idle fetcher threads are modeled as
> items in the event queue. And only the fetcher threads can take items out of
> the event queue and actually process them.
> In the new ASYNC model, in order to be able to process an infinite sequence
> of FetchRequests, a fetcher thread initially has one FetchRequest, and after
> it's done processing one FetchRequest, it enqueues one more into its own
> event queue.
> Also since the current AbstractFetcherThread logic is inherited by both the
> replica-fetcher-threads and the consumer-fetcher-threads for the old
> consumer, and the latter has been deprecated, we plan to implement the ASYNC
> model with a clean-slate approach, and only support the
> replica-fetcher-threads, in order to make the code cleaner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)