hachikuji commented on code in PR #12085:
URL: https://github.com/apache/kafka/pull/12085#discussion_r866948293


##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2105,6 +2106,203 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
+  @Test
+  def testDoNotResetReplicaStateIfLeaderEpochIsNotBumped(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+
+    // Follower's state is initialized with unknown offset because it is not
+    // in the ISR.
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = 0L,
+      logStartOffset = UnifiedLog.UnknownOffset,
+      logEndOffset = UnifiedLog.UnknownOffset
+    )
+
+    // Follower fetches and updates its replica state.
+    partition.updateFollowerFetchState(
+      followerId = followerId,
+      followerFetchOffsetMetadata = LogOffsetMetadata(0L),
+      followerStartOffset = 0L,
+      followerFetchTimeMs = time.milliseconds(),
+      leaderEndOffset = partition.localLogOrException.logEndOffset
+    )
+
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = 0L
+    )
+
+    // makeLeader is called again with the same leader epoch but with
+    // a newer partition epoch.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()

Review Comment:
   I think a legitimate case where this can happen in KRaft is a partition 
reassignment. The leader epoch is not bumped when we add replicas (only later 
when we remove them).



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2105,6 +2106,203 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
+  @Test
+  def testDoNotResetReplicaStateIfLeaderEpochIsNotBumped(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+
+    // Follower's state is initialized with unknown offset because it is not
+    // in the ISR.
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = 0L,
+      logStartOffset = UnifiedLog.UnknownOffset,
+      logEndOffset = UnifiedLog.UnknownOffset
+    )
+
+    // Follower fetches and updates its replica state.
+    partition.updateFollowerFetchState(
+      followerId = followerId,
+      followerFetchOffsetMetadata = LogOffsetMetadata(0L),
+      followerStartOffset = 0L,
+      followerFetchTimeMs = time.milliseconds(),
+      leaderEndOffset = partition.localLogOrException.logEndOffset
+    )
+
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = 0L
+    )
+
+    // makeLeader is called again with the same leader epoch but with
+    // a newer partition epoch.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(2)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(false)
+
+    assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(2, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+
+    // Follower's state has not been reset.
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = 0L
+    )
+  }
+
+  @Test
+  def testDoNotUpdateEpochStartOffsetIfLeaderEpochIsNotBumped(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+    assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
+
+    val leaderLog = partition.localLogOrException
+    assertEquals(Some(EpochEntry(leaderEpoch, 0L)), 
leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+
+    // Write to the log to increment the log end offset.
+    leaderLog.appendAsLeader(MemoryRecords.withRecords(0L, 
CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k1".getBytes, "v1".getBytes)
+    ), leaderEpoch = leaderEpoch)
+
+    // makeLeader is called again with the same leader epoch but with
+    // a newer partition epoch.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(2)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(false)
+
+    assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, 
Some(topicId)))
+    assertEquals(2, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+    assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
+    assertEquals(Some(EpochEntry(leaderEpoch, 0L)), 
leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+  }
+
+  @Test
+  def testIgnoreLeaderPartitionStateChangeWithOlderEpoch(): Unit = {

Review Comment:
   nit: ....WithOlderPartitionEpoch?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -548,21 +548,35 @@ class Partition(val topicPartition: TopicPartition,
                  highWatermarkCheckpoints: OffsetCheckpoints,
                  topicId: Option[Uuid]): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
-      // record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
-      // to maintain the decision maker controller's epoch in the zookeeper 
path
+      // Partition state changes are expected to have an partition epoch 
larger or equal
+      // to the current partition epoch. The later is allowed because the 
partition epoch

Review Comment:
   nit: latter



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -548,21 +548,35 @@ class Partition(val topicPartition: TopicPartition,
                  highWatermarkCheckpoints: OffsetCheckpoints,
                  topicId: Option[Uuid]): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
-      // record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
-      // to maintain the decision maker controller's epoch in the zookeeper 
path
+      // Partition state changes are expected to have an partition epoch 
larger or equal
+      // to the current partition epoch. The later is allowed because the 
partition epoch
+      // is also updated by the AlterPartition response so the new epoch might 
be known
+      // before a LeaderAndIsr request is received or before an update is 
received via
+      // the metadata log.
+      if (partitionState.partitionEpoch < partitionEpoch) {
+        stateChangeLogger.info(s"Skipped the become-leader state change for 
$topicPartition with topic id $topicId " +
+          s"and partition state $partitionState since the leader is already at 
a newer partition epoch $partitionEpoch.")
+        return false
+      }
+
+      // Record the epoch of the controller that made the leadership decision. 
This is useful while updating the isr
+      // to maintain the decision maker controller's epoch in the zookeeper 
path.
       controllerEpoch = partitionState.controllerEpoch
 
+      val currentTimeMs = time.milliseconds
+      val isNewLeader = !isLeader
+      val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
       val isr = partitionState.isr.asScala.map(_.toInt).toSet
       val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
       val removingReplicas = 
partitionState.removingReplicas.asScala.map(_.toInt)
 
-      if (partitionState.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING.value()) {
-        stateChangeLogger.info(
-          s"The topic partition $topicPartition was marked as RECOVERING. 
Leader log recovery is not implemented. " +
-          "Marking the topic partition as RECOVERED."
-        )
+      if (partitionState.leaderRecoveryState == 
LeaderRecoveryState.RECOVERING.value) {
+        stateChangeLogger.info(s"The topic partition $topicPartition was 
marked as RECOVERING. Leader log recovery " +

Review Comment:
   A little outside the scope here, but I think this message might cause 
confusion. Could we leave out the bit about recovery not being implemented?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -576,51 +590,56 @@ class Partition(val topicPartition: TopicPartition,
       } catch {
         case e: ZooKeeperClientException =>
           stateChangeLogger.error(s"A ZooKeeper client exception has occurred 
and makeLeader will be skipping the " +
-            s"state change for the partition $topicPartition with leader 
epoch: $leaderEpoch ", e)
-
+            s"state change for the partition $topicPartition with leader 
epoch: $leaderEpoch.", e)
           return false
       }
 
       val leaderLog = localLogOrException
-      val leaderEpochStartOffset = leaderLog.logEndOffset
-      stateChangeLogger.info(s"Leader $topicPartition starts at leader epoch 
${partitionState.leaderEpoch} from " +
-        s"offset $leaderEpochStartOffset with high watermark 
${leaderLog.highWatermark} " +
-        s"ISR ${isr.mkString("[", ",", "]")} addingReplicas 
${addingReplicas.mkString("[", ",", "]")} " +
-        s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}. 
Previous leader epoch was $leaderEpoch.")
 
-      // We cache the leader epoch here, persisting it only if it's local 
(hence having a log dir)
-      leaderEpoch = partitionState.leaderEpoch
-      leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
-      partitionEpoch = partitionState.partitionEpoch
-
-      // In the case of successive leader elections in a short time period, a 
follower may have
-      // entries in its log from a later epoch than any entry in the new 
leader's log. In order
-      // to ensure that these followers can truncate to the right offset, we 
must cache the new
-      // leader epoch and the start offset since it should be larger than any 
epoch that a follower
-      // would try to query.
-      leaderLog.maybeAssignEpochStartOffset(leaderEpoch, 
leaderEpochStartOffset)
-
-      val isNewLeader = !isLeader
-      val currentTimeMs = time.milliseconds
+      // We update the epoch start offset and the replicas' state only if the 
leader epoch
+      // has changed.
+      if (isNewLeaderEpoch) {
+        val leaderEpochStartOffset = leaderLog.logEndOffset
+        stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId 
starts at leader epoch ${partitionState.leaderEpoch} from " +
+          s"offset $leaderEpochStartOffset with high watermark 
${leaderLog.highWatermark} " +
+          s"ISR ${isr.mkString("[", ",", "]")} addingReplicas 
${addingReplicas.mkString("[", ",", "]")} " +
+          s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}. 
Previous leader epoch was $leaderEpoch.")
+
+        // In the case of successive leader elections in a short time period, 
a follower may have
+        // entries in its log from a later epoch than any entry in the new 
leader's log. In order
+        // to ensure that these followers can truncate to the right offset, we 
must cache the new
+        // leader epoch and the start offset since it should be larger than 
any epoch that a follower
+        // would try to query.
+        leaderLog.maybeAssignEpochStartOffset(partitionState.leaderEpoch, 
leaderEpochStartOffset)
+
+        // Initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and
+        // lastFetchLeaderLogEndOffset.
+        remoteReplicas.foreach { replica =>
+          replica.resetReplicaState(
+            currentTimeMs = currentTimeMs,
+            leaderEndOffset = leaderEpochStartOffset,
+            isNewLeader = isNewLeader,
+            isFollowerInSync = partitionState.isr.contains(replica.brokerId)
+          )
+        }
 
-      // Initialize lastCaughtUpTime of replicas as well as their 
lastFetchTimeMs and
-      // lastFetchLeaderLogEndOffset.
-      remoteReplicas.foreach { replica =>
-        replica.resetReplicaState(
-          currentTimeMs = currentTimeMs,
-          leaderEndOffset = leaderEpochStartOffset,
-          isNewLeader = isNewLeader,
-          isFollowerInSync = partitionState.isr.contains(replica.brokerId)
-        )
+        // We update the leader epoch and the leader epoch start offset iff the
+        // leader epoch changed.
+        leaderEpoch = partitionState.leaderEpoch
+        leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
+      } else {
+        stateChangeLogger.info(s"Skipped the become-leader state change for 
$topicPartition with topic id $topicId " +

Review Comment:
   As long as we're logging at info, I think it would be good to include a bit 
more information (such as ISR and partition epoch).



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -576,51 +590,56 @@ class Partition(val topicPartition: TopicPartition,
       } catch {
         case e: ZooKeeperClientException =>
           stateChangeLogger.error(s"A ZooKeeper client exception has occurred 
and makeLeader will be skipping the " +
-            s"state change for the partition $topicPartition with leader 
epoch: $leaderEpoch ", e)
-
+            s"state change for the partition $topicPartition with leader 
epoch: $leaderEpoch.", e)
           return false
       }
 
       val leaderLog = localLogOrException
-      val leaderEpochStartOffset = leaderLog.logEndOffset
-      stateChangeLogger.info(s"Leader $topicPartition starts at leader epoch 
${partitionState.leaderEpoch} from " +
-        s"offset $leaderEpochStartOffset with high watermark 
${leaderLog.highWatermark} " +
-        s"ISR ${isr.mkString("[", ",", "]")} addingReplicas 
${addingReplicas.mkString("[", ",", "]")} " +
-        s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}. 
Previous leader epoch was $leaderEpoch.")
 
-      // We cache the leader epoch here, persisting it only if it's local 
(hence having a log dir)
-      leaderEpoch = partitionState.leaderEpoch
-      leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
-      partitionEpoch = partitionState.partitionEpoch
-
-      // In the case of successive leader elections in a short time period, a 
follower may have
-      // entries in its log from a later epoch than any entry in the new 
leader's log. In order
-      // to ensure that these followers can truncate to the right offset, we 
must cache the new
-      // leader epoch and the start offset since it should be larger than any 
epoch that a follower
-      // would try to query.
-      leaderLog.maybeAssignEpochStartOffset(leaderEpoch, 
leaderEpochStartOffset)
-
-      val isNewLeader = !isLeader
-      val currentTimeMs = time.milliseconds
+      // We update the epoch start offset and the replicas' state only if the 
leader epoch
+      // has changed.
+      if (isNewLeaderEpoch) {
+        val leaderEpochStartOffset = leaderLog.logEndOffset
+        stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId 
starts at leader epoch ${partitionState.leaderEpoch} from " +

Review Comment:
   Could we add partition epoch to this message? Similarly in the follower 
message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to