This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd702906331 KAFKA-18486 Remove ReplicaManager#becomeLeaderOrFollower 
from testFencedErrorCausedByBecomeLeader and other similar methods (#19966)
fd702906331 is described below

commit fd70290633191b6f53a9d4ddb24e3a8b619fcd3f
Author: Nick Guo <[email protected]>
AuthorDate: Mon Jun 16 21:43:41 2025 +0800

    KAFKA-18486 Remove ReplicaManager#becomeLeaderOrFollower from 
testFencedErrorCausedByBecomeLeader and other similar methods (#19966)
    
    The included tests are as follows:
    
    - testFencedErrorCausedByBecomeLeader
    - testFetchBeyondHighWatermark
    - testFetchFollowerNotAllowedForOlderClients
    - testFetchFromFollowerShouldNotRunPreferLeaderSelect
    - testFetchFromLeaderAlwaysAllowed
    - testFetchMessagesWhenNotFollowerForOnePartition
    - testFetchRequestRateMetrics
    - testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined
    - testFollowerFetchWithDefaultSelectorNoForcedHwPropagation
    - testFollowerStateNotUpdatedIfLogReadFails
    
    I removed `testFetchMessagesWithInconsistentTopicId ` as it's no longer
    needed, the "topicId" is now mandatory and cannot be null in our new
    implementation.
    
    Reviewers: Jhen-Yung Hsu <[email protected]>, Lan Ding
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 326 ++++-----------------
 1 file changed, 65 insertions(+), 261 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index d5d500c87c7..e3b37a8df2e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -512,37 +512,21 @@ class ReplicaManagerTest {
     }
   }
 
-  @Test
-  def testFencedErrorCausedByBecomeLeader(): Unit = {
-    testFencedErrorCausedByBecomeLeader(0)
-    testFencedErrorCausedByBecomeLeader(1)
-    testFencedErrorCausedByBecomeLeader(10)
-  }
-
-  private[this] def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): 
Unit = {
+  @ParameterizedTest
+  @ValueSource(ints = Array(0, 1, 10))
+  def testFencedErrorCausedByBecomeLeader(loopEpochChange: Int): Unit = {
+    val localId = 0
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
     try {
-      val brokerList = Seq[Integer](0, 1).asJava
       val topicPartition = new TopicPartition(topic, 0)
       replicaManager.createPartition(topicPartition)
         .createLogIfNotExists(isNew = false, isFutureReplica = false,
           new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
 
-      def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(epoch)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
topicName = topic, topicId = topicIds(topic))
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
 
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) 
=> ())
       val partition = replicaManager.getPartitionOrException(new 
TopicPartition(topic, 0))
       assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == 
partition.log.get.dir.getParentFile).size)
 
@@ -554,7 +538,12 @@ class ReplicaManagerTest {
       // make sure the future log is created
       replicaManager.futureLocalLogOrException(topicPartition)
       assertEquals(1, 
replicaManager.replicaAlterLogDirsManager.fetcherThreadMap.size)
-      (1 to loopEpochChange).foreach(epoch => 
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(epoch), (_, _) => 
()))
+      (1 to loopEpochChange).foreach(
+        epoch => {
+          val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
topicName = topic, topicId = topicIds(topic), leaderEpoch = epoch)
+          replicaManager.applyDelta(leaderDelta, 
imageFromTopics(leaderDelta.apply()))
+        }
+      )
       // wait for the ReplicaAlterLogDirsThread to complete
       TestUtils.waitUntilTrue(() => {
         replicaManager.replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
@@ -974,25 +963,16 @@ class ReplicaManagerTest {
     try {
       val brokerList = Seq[Integer](0, 1, 2).asJava
 
-      val partition = rm.createPartition(new TopicPartition(topic, 0))
+      val tp = new TopicPartition(topic, 0)
+      val partition = rm.createPartition(tp)
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
 
       // Make this replica the leader.
-      val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(0)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, 
"host2", 2)).asJava).build()
-      rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
+      val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, replicas 
= brokerList, isr = brokerList)
+      val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+      rm.applyDelta(leaderDelta, leaderMetadataImage)
+
       rm.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
@@ -1030,6 +1010,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFollowerStateNotUpdatedIfLogReadFails(): Unit = {
+    val localId = 0
     val maxFetchBytes = 1024 * 1024
     val aliveBrokersIds = Seq(0, 1)
     val leaderEpoch = 5
@@ -1038,25 +1019,11 @@ class ReplicaManagerTest {
     try {
       val tp = new TopicPartition(topic, 0)
       val tidp = new TopicIdPartition(topicId, tp)
-      val replicas = aliveBrokersIds.toList.map(Int.box).asJava
 
       // Broker 0 becomes leader of the partition
-      val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(0)
-        .setControllerEpoch(0)
-        .setLeader(0)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(replicas)
-        .setPartitionEpoch(0)
-        .setReplicas(replicas)
-        .setIsNew(true)
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(leaderAndIsrPartitionState).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest, (_, _) => ())
-      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
 
       // Follower replica state is initialized, but initial state is not known
       assertTrue(replicaManager.onlinePartition(tp).isDefined)
@@ -1129,6 +1096,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchMessagesWithInconsistentTopicId(): Unit = {
+    val localId = 0
     val maxFetchBytes = 1024 * 1024
     val aliveBrokersIds = Seq(0, 1)
     val leaderEpoch = 5
@@ -1137,25 +1105,11 @@ class ReplicaManagerTest {
     try {
       val tp = new TopicPartition(topic, 0)
       val tidp = new TopicIdPartition(topicId, tp)
-      val replicas = aliveBrokersIds.toList.map(Int.box).asJava
 
       // Broker 0 becomes leader of the partition
-      val leaderAndIsrPartitionState = new LeaderAndIsrRequest.PartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(0)
-        .setControllerEpoch(0)
-        .setLeader(0)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(replicas)
-        .setPartitionEpoch(0)
-        .setReplicas(replicas)
-        .setIsNew(true)
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(leaderAndIsrPartitionState).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      val leaderAndIsrResponse = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest, (_, _) => ())
-      assertEquals(Errors.NONE, leaderAndIsrResponse.error)
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
 
       assertEquals(Some(topicId), 
replicaManager.getPartitionOrException(tp).topicId)
 
@@ -1195,54 +1149,6 @@ class ReplicaManagerTest {
       val fetch2 = successfulFetch.headOption.filter(_._1 == 
zeroTidp).map(_._2)
       assertTrue(fetch2.isDefined)
       assertEquals(Errors.NONE, fetch2.get.error)
-
-      // Next create a topic without a topic ID written in the log.
-      val tp2 = new TopicPartition("noIdTopic", 0)
-      val tidp2 = new TopicIdPartition(Uuid.randomUuid(), tp2)
-
-      // Broker 0 becomes leader of the partition
-      val leaderAndIsrPartitionState2 = new 
LeaderAndIsrRequest.PartitionState()
-        .setTopicName("noIdTopic")
-        .setPartitionIndex(0)
-        .setControllerEpoch(0)
-        .setLeader(0)
-        .setLeaderEpoch(leaderEpoch)
-        .setIsr(replicas)
-        .setPartitionEpoch(0)
-        .setReplicas(replicas)
-        .setIsNew(true)
-      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(leaderAndIsrPartitionState2).asJava,
-        Collections.emptyMap(),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest2, (_, _) => ())
-      assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
-
-      assertEquals(None, replicaManager.getPartitionOrException(tp2).topicId)
-
-      // Fetch messages simulating the request containing a topic ID. We 
should not have an error.
-      fetchPartitions(
-        replicaManager,
-        replicaId = 1,
-        fetchInfos = Seq(tidp2 -> validFetchPartitionData),
-        responseCallback = callback
-      )
-      val fetch3 = successfulFetch.headOption.filter(_._1 == tidp2).map(_._2)
-      assertTrue(fetch3.isDefined)
-      assertEquals(Errors.NONE, fetch3.get.error)
-
-      // Fetch messages simulating the request not containing a topic ID. We 
should not have an error.
-      val zeroTidp2 = new TopicIdPartition(Uuid.ZERO_UUID, 
tidp2.topicPartition)
-      fetchPartitions(
-        replicaManager,
-        replicaId = 1,
-        fetchInfos = Seq(zeroTidp2 -> validFetchPartitionData),
-        responseCallback = callback
-      )
-      val fetch4 = successfulFetch.headOption.filter(_._1 == 
zeroTidp2).map(_._2)
-      assertTrue(fetch4.isDefined)
-      assertEquals(Errors.NONE, fetch4.get.error)
-
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -1257,6 +1163,7 @@ class ReplicaManagerTest {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2))
 
     try {
+      val leaderEpoch = 0
       // Create 2 partitions, assign replica 0 as the leader for both a 
different follower (1 and 2) for each
       val tp0 = new TopicPartition(topic, 0)
       val tp1 = new TopicPartition(topic, 1)
@@ -1267,34 +1174,14 @@ class ReplicaManagerTest {
       replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
       val partition0Replicas = Seq[Integer](0, 1).asJava
       val partition1Replicas = Seq[Integer](0, 2).asJava
-      val topicIds = Map(tp0.topic -> topicId, tp1.topic -> topicId).asJava
-      val leaderEpoch = 0
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(0)
-            .setLeader(leaderEpoch)
-            .setLeaderEpoch(0)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true),
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp1.topic)
-            .setPartitionIndex(tp1.partition)
-            .setControllerEpoch(0)
-            .setLeader(0)
-            .setLeaderEpoch(leaderEpoch)
-            .setIsr(partition1Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition1Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+
+      val leaderDelta0 = createLeaderDelta(topicIds(topic), tp0, 0, 
partition0Replicas, partition0Replicas)
+      val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
+      replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
+
+      val leaderDelta1 = createLeaderDelta(topicIds(topic), tp1, 0, 
partition1Replicas, partition1Replicas)
+      val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
+      replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
 
       // Append a couple of messages.
       for (i <- 1 to 2) {
@@ -1642,27 +1529,13 @@ class ReplicaManagerTest {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
       propsModifier = props => 
props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, 
classOf[MockReplicaSelector].getName))
     try {
-      val leaderBrokerId = 0
-      val followerBrokerId = 1
-      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
       val tp0 = new TopicPartition(topic, 0)
       val tidp0 = new TopicIdPartition(topicId, tp0)
 
       // Make this replica the follower
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(1)
-          .setLeaderEpoch(1)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => 
())
+      val followerDelta = createFollowerDelta(topicId, tp0, 0, 1, 1)
+      val followerMetadataImage = imageFromTopics(followerDelta.apply())
+      replicaManager.applyDelta(followerDelta, followerMetadataImage)
 
       val metadata = new DefaultClientMetadata("rack-a", "client-id",
         InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default")
@@ -1687,13 +1560,13 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchShouldReturnImmediatelyWhenPreferredReadReplicaIsDefined(): 
Unit = {
+    val localId = 0
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time),
       propsModifier = props => 
props.put(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector"))
 
     try {
       val leaderBrokerId = 0
       val followerBrokerId = 1
-      val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava
       val tp0 = new TopicPartition(topic, 0)
       val tidp0 = new TopicIdPartition(topicId, tp0)
 
@@ -1707,20 +1580,9 @@ class ReplicaManagerTest {
 
       // Make this replica the leader
       val leaderEpoch = 1
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(leaderEpoch)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => 
())
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
topicName = topic, topicId = topicIds(topic), leaderEpoch = leaderEpoch)
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
 
       // The leader must record the follower's fetch offset to make it 
eligible for follower fetch selection
       val followerFetchData = new PartitionData(topicId, 0L, 0L, Int.MaxValue, 
Optional.of(Int.box(leaderEpoch)), Optional.empty[Integer])
@@ -1767,27 +1629,15 @@ class ReplicaManagerTest {
       topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
       leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Optional.of(topicId))
     try {
-
-      val brokerList = Seq[Integer](0, 1).asJava
-
       val tp0 = new TopicPartition(topic, 0)
       val tidp0 = new TopicIdPartition(topicId, tp0)
 
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
       // Make this replica the follower
-      val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(1)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(false)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+      val followerDelta = createFollowerDelta(topicId, tp0, 1, 0, 1)
+      val followerImage = imageFromTopics(followerDelta.apply())
+      replicaManager.applyDelta(followerDelta, followerImage)
 
       val simpleRecords = Seq(new SimpleRecord("a".getBytes), new 
SimpleRecord("b".getBytes))
       val appendResult = appendRecords(replicaManager, tp0,
@@ -1873,21 +1723,10 @@ class ReplicaManagerTest {
       val tidp0 = new TopicIdPartition(topicId, tp0)
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
-      val partition0Replicas = Seq[Integer](0, 1).asJava
-      val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(tp0.topic)
-          .setPartitionIndex(tp0.partition)
-          .setControllerEpoch(0)
-          .setLeader(1)
-          .setLeaderEpoch(0)
-          .setIsr(partition0Replicas)
-          .setPartitionEpoch(0)
-          .setReplicas(partition0Replicas)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) 
=> ())
+
+      val followerDelta = createFollowerDelta(topicId, tp0, 0, 1)
+      val followerImage = imageFromTopics(followerDelta.apply())
+      replicaManager.applyDelta(followerDelta, followerImage)
 
       // Fetch from follower, with non-empty ClientMetadata (FetchRequest v11+)
       val clientMetadata = new DefaultClientMetadata("", "", null, 
KafkaPrincipal.ANONYMOUS, "")
@@ -1909,6 +1748,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchRequestRateMetrics(): Unit = {
+    val localId = 0
     val mockTimer = new MockTimer(time)
     val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer, 
aliveBrokerIds = Seq(0, 1))
 
@@ -1917,22 +1757,10 @@ class ReplicaManagerTest {
       val tidp0 = new TopicIdPartition(topicId, tp0)
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
-      val partition0Replicas = Seq[Integer](0, 1).asJava
 
-      val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(tp0.topic)
-          .setPartitionIndex(tp0.partition)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(1)
-          .setIsr(partition0Replicas)
-          .setPartitionEpoch(0)
-          .setReplicas(partition0Replicas)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
topicName = topic, topicId = topicIds(topic), leaderEpoch = 1)
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
 
       def assertMetricCount(expected: Int): Unit = {
         assertEquals(expected, 
replicaManager.brokerTopicStats.allTopicsStats.totalFetchRequestRate.count)
@@ -2026,6 +1854,7 @@ class ReplicaManagerTest {
 
   @Test
   def testFetchFromLeaderAlwaysAllowed(): Unit = {
+    val localId = 0
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1))
 
     try {
@@ -2033,22 +1862,10 @@ class ReplicaManagerTest {
       val tidp0 = new TopicIdPartition(topicId, tp0)
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
-      val partition0Replicas = Seq[Integer](0, 1).asJava
 
-      val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(tp0.topic)
-          .setPartitionIndex(tp0.partition)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(1)
-          .setIsr(partition0Replicas)
-          .setPartitionEpoch(0)
-          .setReplicas(partition0Replicas)
-          .setIsNew(true)).asJava,
-        topicIds.asJava,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => 
())
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
topicName = topic, topicId = topicIds(topic), leaderEpoch = 1)
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
 
       val clientMetadata = new DefaultClientMetadata("", "", null, 
KafkaPrincipal.ANONYMOUS, "")
       var partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 
0L, 100,
@@ -2828,6 +2645,7 @@ class ReplicaManagerTest {
     allLogs.put(topicPartitionObj, mockLog)
     when(mockLogMgr.allLogs).thenReturn(allLogs.values.asScala)
     when(mockLogMgr.isLogDirOnline(anyString)).thenReturn(true)
+    when(mockLogMgr.directoryId(anyString)).thenReturn(None)
 
     val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
     val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, 
s"host$brokerId", brokerId))
@@ -3414,8 +3232,6 @@ class ReplicaManagerTest {
         .setTopicId(topicIds.get(topic))
         .setIsr(partition1Replicas)
         .setReplicas(partition1Replicas)
-        .setRemovingReplicas(util.List.of())
-        .setAddingReplicas(util.List.of())
         .setLeader(partition1Replicas.get(0))
         .setLeaderEpoch(leaderEpoch)
         .setPartitionEpoch(0)
@@ -3431,8 +3247,6 @@ class ReplicaManagerTest {
         .setTopicId(topicIds.get(topic))
         .setIsr(partition1Replicas)
         .setReplicas(partition1Replicas)
-        .setRemovingReplicas(util.List.of())
-        .setAddingReplicas(util.List.of())
         .setLeader(partition1Replicas.get(1))
         .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
         .setPartitionEpoch(0)
@@ -3479,8 +3293,6 @@ class ReplicaManagerTest {
         .setTopicId(topicIds.get(topic))
         .setIsr(partition1Replicas)
         .setReplicas(partition1Replicas)
-        .setRemovingReplicas(util.List.of())
-        .setAddingReplicas(util.List.of())
         .setLeader(partition1Replicas.get(0))
         .setLeaderEpoch(leaderEpoch)
         .setPartitionEpoch(0)
@@ -3496,8 +3308,6 @@ class ReplicaManagerTest {
         .setTopicId(topicIds.get(topic))
         .setIsr(partition1Replicas)
         .setReplicas(partition1Replicas)
-        .setRemovingReplicas(util.List.of())
-        .setAddingReplicas(util.List.of())
         .setLeader(partition1Replicas.get(1))
         .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
         .setPartitionEpoch(0)
@@ -4321,8 +4131,6 @@ class ReplicaManagerTest {
       .setTopicId(topicId)
       .setReplicas(effectiveReplicas)
       .setIsr(effectiveIsr)
-      .setRemovingReplicas(Collections.emptyList())
-      .setAddingReplicas(Collections.emptyList())
       .setLeader(leaderId)
       .setLeaderEpoch(leaderEpoch)
       .setPartitionEpoch(0)
@@ -4349,8 +4157,6 @@ class ReplicaManagerTest {
       .setTopicId(topicId)
       .setReplicas(util.Arrays.asList(followerId, leaderId))
       .setIsr(util.Arrays.asList(followerId, leaderId))
-      .setRemovingReplicas(Collections.emptyList())
-      .setAddingReplicas(Collections.emptyList())
       .setLeader(leaderId)
       .setLeaderEpoch(leaderEpoch)
       .setPartitionEpoch(0)
@@ -5539,13 +5345,13 @@ class ReplicaManagerTest {
     }
   }
 
-  private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, 
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, 
topicName: String = "foo", topicId: Uuid = FOO_UUID): TopicsDelta = {
+  private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, 
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, 
topicName: String = "foo", topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0): 
TopicsDelta = {
     val leader = if (isStartIdLeader) startId else startId + 1
     val delta = new TopicsDelta(TopicsImage.EMPTY)
     delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
 
     partitions.foreach { partition =>
-      val record = partitionRecord(startId, leader, partition, topicId)
+      val record = partitionRecord(startId, leader, partition, topicId, 
leaderEpoch)
       if (directoryIds.nonEmpty) {
         record.setDirectories(directoryIds.asJava)
       }
@@ -5555,16 +5361,14 @@ class ReplicaManagerTest {
     delta
   }
 
-  private def partitionRecord(startId: Int, leader: Int, partition: Int = 0, 
topicId: Uuid = FOO_UUID) = {
+  private def partitionRecord(startId: Int, leader: Int, partition: Int = 0, 
topicId: Uuid = FOO_UUID, leaderEpoch: Int = 0) = {
     new PartitionRecord()
       .setPartitionId(partition)
       .setTopicId(topicId)
       .setReplicas(util.Arrays.asList(startId, startId + 1))
       .setIsr(util.Arrays.asList(startId, startId + 1))
-      .setRemovingReplicas(Collections.emptyList())
-      .setAddingReplicas(Collections.emptyList())
       .setLeader(leader)
-      .setLeaderEpoch(0)
+      .setLeaderEpoch(leaderEpoch)
       .setPartitionEpoch(0)
   }
 

Reply via email to