This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7c715c02c06 KAFKA-18486 Update testClearPurgatoryOnBecomingFollower 
etc with KRaft mechanism in ReplicaManagerTest (#19924)
7c715c02c06 is described below

commit 7c715c02c06f16475faff8aa72048cddb7382c8a
Author: Ken Huang <[email protected]>
AuthorDate: Wed Jun 11 18:52:08 2025 +0800

    KAFKA-18486 Update testClearPurgatoryOnBecomingFollower etc with KRaft 
mechanism in ReplicaManagerTest (#19924)
    
    update the following test to avoid using `becomeLeaderOrFollower`
    - testClearPurgatoryOnBecomingFollower
    - testDelayedFetchIncludesAbortedTransactions
    - testDisabledTransactionVerification
    - testFailedBuildRemoteLogAuxStateMetrics
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 81 +++++-----------------
 1 file changed, 17 insertions(+), 64 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 04a1677b474..ee2115c635f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -445,26 +445,15 @@ class ReplicaManagerTest {
 
     try {
       val brokerList = Seq[Integer](0, 1).asJava
-      val topicIds = Collections.singletonMap(topic, topicId)
 
-      val partition = rm.createPartition(new TopicPartition(topic, 0))
+      val topicPartition = new TopicPartition(topic, 0)
+      val partition = rm.createPartition(topicPartition)
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(0)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
+      val delta = createLeaderDelta(topicIds(topic), topicPartition, 
brokerList.get(0), brokerList, brokerList)
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+      rm.applyDelta(delta, leaderMetadataImage)
       rm.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
@@ -474,20 +463,9 @@ class ReplicaManagerTest {
       }
 
       // Make this replica the follower
-      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(1)
-          .setLeaderEpoch(1)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
+      val delta1 = createLeaderDelta(topicIds(topic), topicPartition, 
brokerList.get(1), brokerList, brokerList, 1)
+      val followerMetadataImage = imageFromTopics(delta1.apply())
+      rm.applyDelta(delta1, followerMetadataImage)
 
       assertTrue(appendResult.hasFired)
     } finally {
@@ -945,20 +923,9 @@ class ReplicaManagerTest {
         new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
 
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(0)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => 
())
+      val delta = topicsCreateDelta(brokerList.get(0), isStartIdLeader = true, 
partitions = List(0), List.empty, topic, topicIds(topic))
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+      replicaManager.applyDelta(delta, leaderMetadataImage)
       replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
@@ -2548,8 +2515,9 @@ class ReplicaManagerTest {
     val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp), config = config)
 
     try {
-      val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), 
tp, Seq(0, 1), new LeaderAndIsr(0, List(0, 1).map(Int.box).asJava))
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+      val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = 
List(0), List.empty, topic, topicIds(topic))
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+      replicaManager.applyDelta(delta, leaderMetadataImage)
 
       val transactionalRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, 
producerEpoch, sequence,
         new SimpleRecord(s"message $sequence".getBytes))
@@ -4110,23 +4078,6 @@ class ReplicaManagerTest {
     try {
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
-      val partition0Replicas = Seq[Integer](0, 1).asJava
-      val topicIds = Map(tp0.topic -> topicId).asJava
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(0)
-            .setLeader(1)
-            .setLeaderEpoch(0)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
 
       // Verify the metrics for build remote log state and for failures is 
zero before replicas start to fetch
       assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
@@ -4135,7 +4086,9 @@ class ReplicaManagerTest {
       assertEquals(0, 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
       assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
 
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+      val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 
0), 1, util.List.of(0, 1), util.List.of(0, 1))
+      val leaderMetadataImage = imageFromTopics(delta.apply())
+      replicaManager.applyDelta(delta, leaderMetadataImage)
 
       // Replicas fetch from the leader periodically, therefore we check that 
the metric value is increasing
       // We expect failedBuildRemoteLogAuxStateRate to increase because there 
is no remoteLogSegmentMetadata

Reply via email to