This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 991a10c19fd KAFKA-18486 Migrate 
ReplicaManagerTest#testOffsetOutOfRangeExceptionWhenFetchMessages to use 
applyDelta (#19952)
991a10c19fd is described below

commit 991a10c19fde604f734c83ed7f52c5b75633b05f
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Jun 13 15:47:51 2025 +0800

    KAFKA-18486 Migrate 
ReplicaManagerTest#testOffsetOutOfRangeExceptionWhenFetchMessages to use 
applyDelta (#19952)
    
    Change `becomeLeaderOrFollower` to `applyDelta` in following test cases
    * testOffsetOutOfRangeExceptionWhenFetchMessages
    * testOffsetOutOfRangeExceptionWhenReadFromLog
    * testOldFollowerLosesMetricsWhenReassignPartitions
    * testOldLeaderLosesMetricsWhenReassignPartitions
    
    Reviewers: Bolin Lin <[email protected]>, Lan Ding
     <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 223 +++++++--------------
 1 file changed, 69 insertions(+), 154 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0499dd2f6f2..1cd340aa329 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3418,11 +3418,8 @@ class ReplicaManagerTest {
 
   @Test
   def testOldLeaderLosesMetricsWhenReassignPartitions(): Unit = {
-    val controllerEpoch = 0
     val leaderEpoch = 0
     val leaderEpochIncrement = 1
-    val correlationId = 0
-    val controllerId = 0
     val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats])
     val (rm0, rm1) = 
prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]), 
mockTopicStats1)
 
@@ -3435,64 +3432,38 @@ class ReplicaManagerTest {
       val partition1Replicas = Seq[Integer](1, 0).asJava
       val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> 
Uuid.randomUuid()).asJava
 
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(controllerId, 
0, brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(0)
-            .setLeaderEpoch(leaderEpoch)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true),
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp1.topic)
-            .setPartitionIndex(tp1.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(1)
-            .setLeaderEpoch(leaderEpoch)
-            .setIsr(partition1Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition1Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
-
-      rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) 
=> ())
-      rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) 
=> ())
+      val delta1 = createLeaderDelta(topicIds.get(topic), tp0, 
partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch)
+      delta1.replay(new PartitionRecord()
+        .setPartitionId(tp1.partition)
+        .setTopicId(topicIds.get(topic))
+        .setIsr(partition1Replicas)
+        .setReplicas(partition1Replicas)
+        .setRemovingReplicas(util.List.of())
+        .setAddingReplicas(util.List.of())
+        .setLeader(partition1Replicas.get(0))
+        .setLeaderEpoch(leaderEpoch)
+        .setPartitionEpoch(0)
+      )
+      val leaderMetadataImage1 = imageFromTopics(delta1.apply())
+      rm0.applyDelta(delta1, leaderMetadataImage1)
+      rm1.applyDelta(delta1, leaderMetadataImage1)
 
       // make broker 0 the leader of partition 1 so broker 1 loses its 
leadership position
-      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder( 
controllerId, controllerEpoch, brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(0)
-            .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true),
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp1.topic)
-            .setPartitionIndex(tp1.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(0)
-            .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
-            .setIsr(partition1Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition1Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
-
-      rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) 
=> ())
-      rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) 
=> ())
+      val delta2 = createLeaderDelta(topicIds.get(topic), tp0, 
partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch 
+ leaderEpochIncrement)
+      delta2.replay(new PartitionRecord()
+        .setPartitionId(tp1.partition)
+        .setTopicId(topicIds.get(topic))
+        .setIsr(partition1Replicas)
+        .setReplicas(partition1Replicas)
+        .setRemovingReplicas(util.List.of())
+        .setAddingReplicas(util.List.of())
+        .setLeader(partition1Replicas.get(1))
+        .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
+        .setPartitionEpoch(0)
+      )
+      val leaderMetadataImage2 = imageFromTopics(delta2.apply())
+      rm0.applyDelta(delta2, leaderMetadataImage2)
+      rm1.applyDelta(delta2, leaderMetadataImage2)
     } finally {
       Utils.tryAll(util.Arrays.asList[Callable[Void]](
         () => {
@@ -3512,11 +3483,8 @@ class ReplicaManagerTest {
 
   @Test
   def testOldFollowerLosesMetricsWhenReassignPartitions(): Unit = {
-    val controllerEpoch = 0
     val leaderEpoch = 0
     val leaderEpochIncrement = 1
-    val correlationId = 0
-    val controllerId = 0
     val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats])
     val (rm0, rm1) = 
prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]), 
mockTopicStats1)
 
@@ -3529,65 +3497,38 @@ class ReplicaManagerTest {
       val partition1Replicas = Seq[Integer](1, 0).asJava
       val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> 
Uuid.randomUuid()).asJava
 
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(controllerId, 
0, brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(1)
-            .setLeaderEpoch(leaderEpoch)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true),
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp1.topic)
-            .setPartitionIndex(tp1.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(1)
-            .setLeaderEpoch(leaderEpoch)
-            .setIsr(partition1Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition1Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
-
-      rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) 
=> ())
-      rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) 
=> ())
+      val delta = createLeaderDelta(topicIds.get(topic), tp0, 
partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch)
+      delta.replay(new PartitionRecord()
+        .setPartitionId(tp1.partition)
+        .setTopicId(topicIds.get(topic))
+        .setIsr(partition1Replicas)
+        .setReplicas(partition1Replicas)
+        .setRemovingReplicas(util.List.of())
+        .setAddingReplicas(util.List.of())
+        .setLeader(partition1Replicas.get(0))
+        .setLeaderEpoch(leaderEpoch)
+        .setPartitionEpoch(0)
+      )
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+      rm0.applyDelta(delta, leaderMetadataImage)
+      rm1.applyDelta(delta, leaderMetadataImage)
 
       // make broker 0 the leader of partition 1 so broker 1 loses its 
leadership position
-      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(controllerId,
-        controllerEpoch, brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(0)
-            .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true),
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp1.topic)
-            .setPartitionIndex(tp1.partition)
-            .setControllerEpoch(controllerEpoch)
-            .setLeader(0)
-            .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
-            .setIsr(partition1Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition1Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
-
-      rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) 
=> ())
-      rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) 
=> ())
+      val delta2 = createLeaderDelta(topicIds.get(topic), tp0, 
partition0Replicas.get(1), partition0Replicas, partition0Replicas, leaderEpoch 
+ leaderEpochIncrement)
+      delta2.replay(new PartitionRecord()
+        .setPartitionId(tp1.partition)
+        .setTopicId(topicIds.get(topic))
+        .setIsr(partition1Replicas)
+        .setReplicas(partition1Replicas)
+        .setRemovingReplicas(util.List.of())
+        .setAddingReplicas(util.List.of())
+        .setLeader(partition1Replicas.get(1))
+        .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
+        .setPartitionEpoch(0)
+      )
+      val leaderMetadataImage2 = imageFromTopics(delta2.apply())
+      rm0.applyDelta(delta2, leaderMetadataImage2)
+      rm1.applyDelta(delta2, leaderMetadataImage2)
     } finally {
       Utils.tryAll(util.Arrays.asList[Callable[Void]](
         () => {
@@ -3603,7 +3544,6 @@ class ReplicaManagerTest {
 
     // verify that broker 1 did remove its metrics when no longer being the 
leader of partition 1
     verify(mockTopicStats1).removeOldLeaderMetrics(topic)
-    verify(mockTopicStats1).removeOldFollowerMetrics(topic)
   }
 
   private def prepareDifferentReplicaManagers(brokerTopicStats1: 
BrokerTopicStats,
@@ -3669,22 +3609,9 @@ class ReplicaManagerTest {
       val partition0Replicas = Seq[Integer](0, 1).asJava
       val topicIds = Map(tp0.topic -> topicId).asJava
       val leaderEpoch = 0
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(0)
-            .setLeader(leaderEpoch)
-            .setLeaderEpoch(0)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+      val delta = createLeaderDelta(topicIds.get(topic), tp0, 
partition0Replicas.get(0), partition0Replicas, partition0Replicas)
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+      replicaManager.applyDelta(delta, leaderMetadataImage)
 
       val params = new FetchParams(replicaId, 1, 1000, 0, 100, 
FetchIsolation.LOG_END, Optional.empty)
       // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
@@ -3713,6 +3640,7 @@ class ReplicaManagerTest {
   @ParameterizedTest
   @ValueSource(booleans = Array(true, false))
   def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): 
Unit = {
+    val brokerList = Seq[Integer](0, 1).asJava
     val replicaId = if (isFromFollower) 1 else -1
     val tp0 = new TopicPartition(topic, 0)
     val tidp0 = new TopicIdPartition(topicId, tp0)
@@ -3721,25 +3649,11 @@ class ReplicaManagerTest {
     try {
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
-      val partition0Replicas = Seq[Integer](0, 1).asJava
       val topicIds = Map(tp0.topic -> topicId).asJava
       val leaderEpoch = 0
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(0)
-            .setLeader(leaderEpoch)
-            .setLeaderEpoch(0)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+      val delta = createLeaderDelta(topicIds.get(topic), tp0, 
brokerList.get(0), brokerList, brokerList)
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+      replicaManager.applyDelta(delta, leaderMetadataImage)
 
       val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, Optional.empty)
       val fetchOffset = 1
@@ -4087,7 +4001,8 @@ class ReplicaManagerTest {
       assertEquals(0, 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
       assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
 
-      val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 
0), 1, util.List.of(0, 1), util.List.of(0, 1))
+      val brokerList = Seq[Integer](0, 1).asJava
+      val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 
0), brokerList.get(1), brokerList, brokerList)
       val leaderMetadataImage = imageFromTopics(delta.apply())
       replicaManager.applyDelta(delta, leaderMetadataImage)
 

Reply via email to