[ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551848#comment-16551848
 ] 

ASF GitHub Bot commented on KAFKA-7152:
---------------------------------------

lindong28 closed pull request #5412: KAFKA-7152: Avoid moving a replica out of 
isr if its LEO equals leader's LEO
URL: https://github.com/apache/kafka/pull/5412
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index b80c34475d3..154a8f969c5 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -553,7 +553,8 @@ class Partition(val topic: String,
 
   def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): 
Set[Replica] = {
     /**
-     * there are two cases that will be handled here -
+     * If the follower already has the same leo as the leader, it will not be 
considered as out-of-sync,
+     * otherwise there are two cases that will be handled here -
      * 1. Stuck followers: If the leo of the replica hasn't been updated for 
maxLagMs ms,
      *                     the follower is stuck and should be removed from 
the ISR
      * 2. Slow followers: If the replica has not read up to the leo within the 
last maxLagMs ms,
@@ -565,7 +566,8 @@ class Partition(val topic: String,
      **/
     val candidateReplicas = inSyncReplicas - leaderReplica
 
-    val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - 
r.lastCaughtUpTimeMs) > maxLagMs)
+    val laggingReplicas = candidateReplicas.filter(r =>
+      r.logEndOffset.messageOffset != leaderReplica.logEndOffset.messageOffset 
&& (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
     if (laggingReplicas.nonEmpty)
       debug("Lagging replicas are 
%s".format(laggingReplicas.map(_.brokerId).mkString(",")))
 
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 8212ed680c5..c90a5b97a1a 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -40,6 +40,7 @@ class IsrExpirationTest {
   var topicPartitionIsr: Map[(String, Int), Seq[Int]] = new HashMap[(String, 
Int), Seq[Int]]()
   val replicaLagTimeMaxMs = 100L
   val replicaFetchWaitMaxMs = 100
+  val leaderLogEndOffset = 20
 
   val overridingProps = new Properties()
   overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, 
replicaLagTimeMaxMs.toString)
@@ -81,12 +82,12 @@ class IsrExpirationTest {
     assertEquals("All replicas should be in ISR", 
configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
-    // let the follower catch up to the Leader logEndOffset (15)
+    // let the follower catch up to the Leader logEndOffset - 1
     for (replica <- partition0.assignedReplicas - leaderReplica)
-      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
-                                                    highWatermark = 15L,
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset - 1), MemoryRecords.EMPTY),
+                                                    highWatermark = 
leaderLogEndOffset - 1,
                                                     leaderLogStartOffset = 0L,
-                                                    leaderLogEndOffset = 15L,
+                                                    leaderLogEndOffset = 
leaderLogEndOffset,
                                                     followerLogStartOffset = 
0L,
                                                     fetchTimeMs = 
time.milliseconds,
                                                     readSize = -1,
@@ -138,10 +139,10 @@ class IsrExpirationTest {
 
     // Make the remote replica not read to the end of log. It should be not be 
out of sync for at least 100 ms
     for (replica <- partition0.assignedReplicas - leaderReplica)
-      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(10L), MemoryRecords.EMPTY),
-                                                    highWatermark = 10L,
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset - 2), MemoryRecords.EMPTY),
+                                                    highWatermark = 
leaderLogEndOffset - 2,
                                                     leaderLogStartOffset = 0L,
-                                                    leaderLogEndOffset = 15L,
+                                                    leaderLogEndOffset = 
leaderLogEndOffset,
                                                     followerLogStartOffset = 
0L,
                                                     fetchTimeMs = 
time.milliseconds,
                                                     readSize = -1,
@@ -155,10 +156,10 @@ class IsrExpirationTest {
     time.sleep(75)
 
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
-      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(11L), MemoryRecords.EMPTY),
-                            highWatermark = 11L,
+      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset - 1), MemoryRecords.EMPTY),
+                            highWatermark = leaderLogEndOffset - 1,
                             leaderLogStartOffset = 0L,
-                            leaderLogEndOffset = 15L,
+                            leaderLogEndOffset = leaderLogEndOffset,
                             followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1,
@@ -175,10 +176,10 @@ class IsrExpirationTest {
 
     // Now actually make a fetch to the end of the log. The replicas should be 
back in ISR
     (partition0.assignedReplicas - leaderReplica).foreach { r =>
-      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(15L), MemoryRecords.EMPTY),
-                            highWatermark = 15L,
+      r.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset), MemoryRecords.EMPTY),
+                            highWatermark = leaderLogEndOffset,
                             leaderLogStartOffset = 0L,
-                            leaderLogEndOffset = 15L,
+                            leaderLogEndOffset = leaderLogEndOffset,
                             followerLogStartOffset = 0L,
                             fetchTimeMs = time.milliseconds,
                             readSize = -1,
@@ -190,6 +191,40 @@ class IsrExpirationTest {
     EasyMock.verify(log)
   }
 
+  /*
+   * Test the case where a follower has already caught up with same log end 
offset with the leader. This follower should not be considered as out-of-sync
+   */
+  @Test
+  def testIsrExpirationForCaughtUpFollowers() {
+    val log = logMock
+
+    // create one partition and all replicas
+    val partition0 = getPartitionWithAllReplicasInIsr(topic, 0, time, 
configs.head, log)
+    assertEquals("All replicas should be in ISR", 
configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
+    val leaderReplica = partition0.getReplica(configs.head.brokerId).get
+
+    // let the follower catch up to the Leader logEndOffset
+    for (replica <- partition0.assignedReplicas - leaderReplica)
+      replica.updateLogReadResult(new LogReadResult(info = FetchDataInfo(new 
LogOffsetMetadata(leaderLogEndOffset), MemoryRecords.EMPTY),
+        highWatermark = leaderLogEndOffset,
+        leaderLogStartOffset = 0L,
+        leaderLogEndOffset = leaderLogEndOffset,
+        followerLogStartOffset = 0L,
+        fetchTimeMs = time.milliseconds,
+        readSize = -1,
+        lastStableOffset = None))
+    var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, 
configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], 
partition0OSR.map(_.brokerId))
+
+    // let some time pass
+    time.sleep(150)
+
+    // even though follower hasn't pulled any data for > replicaMaxLagTimeMs 
ms, the follower has already caught up. So it is not out-of-sync.
+    partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, 
configs.head.replicaLagTimeMaxMs)
+    assertEquals("No replica should be out of sync", Set.empty[Int], 
partition0OSR.map(_.brokerId))
+    EasyMock.verify(log)
+  }
+
   private def getPartitionWithAllReplicasInIsr(topic: String, partitionId: 
Int, time: Time, config: KafkaConfig,
                                                localLog: Log): Partition = {
     val leaderId = config.brokerId
@@ -222,6 +257,7 @@ class IsrExpirationTest {
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
+    
EasyMock.expect(log.logEndOffsetMetadata).andReturn(LogOffsetMetadata(leaderLogEndOffset)).anyTimes()
     EasyMock.replay(log)
     log
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> replica should be in-sync if its LEO equals leader's LEO
> --------------------------------------------------------
>
>                 Key: KAFKA-7152
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7152
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Dong Lin
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Major
>             Fix For: 2.1.0
>
>
> Currently a replica will be moved out of ISR if follower has not fetched from 
> leader for 10 sec (default replica.lag.time.max.ms). This cases problem in 
> the following scenario:
> Say follower's ReplicaFetchThread needs to fetch 2k partitions from the 
> leader broker. Only 100 out of 2k partitions are actively being produced to 
> and therefore the total bytes in rate for those 2k partitions are small. The 
> following will happen:
>  
> 1) The follower's ReplicaFetcherThread sends FetchRequest for those 2k 
> partitions.
> 2) Because the total bytes-in-rate for those 2k partitions is very small, 
> follower is able to catch up and leader broker adds these 2k partitions to 
> ISR. Follower's lastCaughtUpTimeMs for all partitions are updated to the 
> current time T0.
> 3) Since follower has caught up for all 2k partitions, leader updates 2k 
> partition znodes to include the follower in the ISR. It may take 20 seconds 
> to write 2k partition znodes if each znode write operation takes 10 ms.
> 4) At T0 + 15, maybeShrinkIsr() is invoked on leader broker. Since there is 
> no FetchRequet from the follower for more than 10 seconds after T0, all those 
> 2k partitions will be considered as out of syn and the follower will be 
> removed from ISR.
> 5) The follower receives FetchResponse at least 20 seconds after T0. That 
> means the next FetchRequest from follower to leader will be after T0 + 20.
> The sequence of events described above will loop over time. There will be 
> constant churn of URP in the cluster even if follower can catch up with 
> leader's byte-in-rate. This reduces the cluster availability.
>  
> In order to address this problem, one simple approach is to keep follower in 
> the ISR as long as follower's LEO equals leader's LEO regardless of 
> follower's lastCaughtUpTimeMs. This is particularly useful if there are a lot 
> of inactive partitions in the cluster.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to