[
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15766182#comment-15766182
]
ASF GitHub Bot commented on KAFKA-4485:
---------------------------------------
Github user asfgit closed the pull request at:
https://github.com/apache/kafka/pull/2208
> Follower should be in the isr if its FetchRequest has fetched up to the
> logEndOffset of leader
> ----------------------------------------------------------------------------------------------
>
> Key: KAFKA-4485
> URL: https://issues.apache.org/jira/browse/KAFKA-4485
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.10.1.0
> Reporter: Dong Lin
> Assignee: Dong Lin
> Fix For: 0.10.2.0
>
>
> As of current implementation, we will exclude follower from ISR if the begin
> offset of FetchRequest from this follower is smaller than logEndOffset of
> leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR
> if the beginOffset of FetchRequest from this follower is equal or larger than
> high watermark of this partition.
> This is problematic for the following reasons:
> 1) The criteria for ISR is inconsistent between maybeExpandIsr() and
> maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to
> the ISR (e.g. in the scenario described below).
> 2) A follower may be removed from the ISR even if its fetch rate can keep up
> with produce rate. Suppose a produce keeps producing a lot of small requests
> at high request rate but low byte rate (e.g. many mirror makers), and the
> follower is always able to read all the available data at the time leader
> receives it. However, the begin offset of fetch request will always be
> smaller than logEndOffset of leader. Thus the follower will be removed from
> ISR after replicaLagTimeMaxMs.
> In the following we describe the solution to this problem.
> Terminology:
> - Definition of replica lag: we say a replica lags behind leader by X ms if
> its current log end offset if equivalent to the log end offset of leader X ms
> ago.
> - Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica |
> replica belongs to the given partition AND replica's lag <=
> replicaLagTimeMaxMs}
> - Definition of high-watermark of a partition: high-watermark of a partition
> is the max(current high-watermark of the partition, min(offset of replicas in
> the pseudo-ISR set of this partition))
> - Definition of ISR set: ISR set of a partition = {replica | replica is in
> pseudo-ISR set of the given partition AND log end offset of replica >=
> high-watermark of the given partition}
> Guarantee:
> 1) If a follower is close enough to the replica in the sense that its replica
> lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set.
> Thus the high-watermark will stop to increase until this follower's log end
> offset >= high-watermark, at which moment this follower will be added to the
> ISR set. This allows us the solve the 2nd problem described above.
> 2) If a follower lags behind leader for more than X ms, it will be removed
> out of ISR set.
> 3) High watermark of a partition will never decrease.
> 4) For any replica in ISR set, its log end offset >= high-watermark.
> Implementation:
> 1) For each follower, the leader keeps track of the time of the last fetch
> request from this follower. Let's call it lastFetchTime. In addition, the
> leader also maintains the log end offset of the leader at the lastFetchTime
> for each follower. Let's call it lastFetchLeaderLEO. Both variables will be
> updated after leader has processed a FetchRequest from a follower.
> 2) When leader receives FetchRequest from a follower, if begin offset of the
> FetchRequest >= current leader's LEO, follower's lastCatchUpTimeMs will be
> set to current system time. Otherwise, if begin offset of the FetchRequest >=
> lastFetchLeaderLEO, follower's lastCatchUpTimeMs will be set to
> lastFetchTime. Replica's lag = current system time - lastCatchUpTimeMs.
> 3) The leader can update pseudo-ISR set, high-watermark and ISR set of the
> partition based on the lag of replicas of this partition, according to the
> definition described above.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)