This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e23df0c8da KAFKA-18486 Migrate tests to use applyDelta instead of 
becomeLeaderOrFollower for testInconsistentIdReturnsError and others (#20014)
5e23df0c8da is described below

commit 5e23df0c8daf11f478f63c9f50aefa5b7e636d7f
Author: Jing-Jia Hung <[email protected]>
AuthorDate: Wed Jun 25 08:02:27 2025 -0400

    KAFKA-18486 Migrate tests to use applyDelta instead of 
becomeLeaderOrFollower for testInconsistentIdReturnsError and others (#20014)
    
    continues the migration effort for KAFKA-18486 by replacing usage of the
    deprecated `becomeLeaderOrFollower` API with `applyDelta` in several
    test cases.
    
    #### Updated tests:
    - `testInconsistentIdReturnsError`
    - `testMaybeAddLogDirFetchers`
    - `testMaybeAddLogDirFetchersPausingCleaning`
    - `testSuccessfulBuildRemoteLogAuxStateMetrics`
    - `testVerificationForTransactionalPartitionsOnly`
    - `testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate`
    
    Reviewers: Jhen-Yung Hsu <[email protected]>, TaiJuWu
     <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 259 +++++++--------------
 1 file changed, 78 insertions(+), 181 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 8e7c6181dd3..12f7d23164a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -55,7 +55,7 @@ import org.apache.kafka.common.utils.{LogContext, Time, Utils}
 import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, 
TransactionLogConfig}
 import org.apache.kafka.image._
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
-import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
+import org.apache.kafka.metadata.MetadataCache
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, 
MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerLogConfigs}
@@ -133,7 +133,6 @@ class ReplicaManagerTest {
 
   // Constants defined for readability
   private val zkVersion = 0
-  private val correlationId = 0
   private val controllerEpoch = 0
   private val brokerEpoch = 0L
 
@@ -312,38 +311,26 @@ class ReplicaManagerTest {
       alterPartitionManager = alterPartitionManager)
 
     try {
-      val partition = rm.createPartition(new TopicPartition(topic, 0))
-      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
-        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
+      val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = 
List(0), topicName = topic, topicId = topicIds(topic))
+      val image = imageFromTopics(delta.apply())
+      rm.applyDelta(delta, image)
+      val partition = rm.getPartitionOrException(topicPartition)
 
-      rm.becomeLeaderOrFollower(0, new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(0)
-          .setIsr(Seq[Integer](0).asJava)
-          .setPartitionEpoch(0)
-          .setReplicas(Seq[Integer](0).asJava)
-          .setIsNew(false)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
-      appendRecords(rm, new TopicPartition(topic, 0),
+      appendRecords(rm, topicPartition,
         MemoryRecords.withRecords(Compression.NONE, new SimpleRecord("first 
message".getBytes()), new SimpleRecord("second message".getBytes())))
-      logManager.maybeUpdatePreferredLogDir(new TopicPartition(topic, 0), 
dir2.getAbsolutePath)
+      logManager.maybeUpdatePreferredLogDir(topicPartition, 
dir2.getAbsolutePath)
 
       partition.createLogIfNotExists(isNew = true, isFutureReplica = true,
         new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
 
       // this method should use hw of future log to create log dir fetcher. 
Otherwise, it causes offset mismatch error
       rm.maybeAddLogDirFetchers(Set(partition), new 
LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), _ => None)
-      rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => 
t.fetchState(new TopicPartition(topic, 0)).foreach(s => assertEquals(0L, 
s.fetchOffset)))
+      rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => 
t.fetchState(topicPartition).foreach(s => assertEquals(0L, s.fetchOffset)))
       // make sure alter log dir thread has processed the data
       rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => 
t.doWork())
       assertEquals(Set.empty, 
rm.replicaAlterLogDirsManager.failedPartitions.partitions())
       // the future log becomes the current log, so the partition state should 
get removed
-      rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => 
assertEquals(None, t.fetchState(new TopicPartition(topic, 0))))
+      rm.replicaAlterLogDirsManager.fetcherThreadMap.values.foreach(t => 
assertEquals(None, t.fetchState(topicPartition)))
     } finally {
       rm.shutdown(checkpointHW = false)
     }
@@ -362,7 +349,6 @@ class ReplicaManagerTest {
     val metadataCache: MetadataCache = mock(classOf[MetadataCache])
     mockGetAliveBrokerFunctions(metadataCache, Seq(new Node(0, "host0", 0)))
     
when(metadataCache.metadataVersion()).thenReturn(MetadataVersion.MINIMUM_VERSION)
-    val tp0 = new TopicPartition(topic, 0)
     val rm = new ReplicaManager(
       metrics = metrics,
       config = config,
@@ -375,28 +361,13 @@ class ReplicaManagerTest {
       alterPartitionManager = alterPartitionManager)
 
     try {
-      val partition = rm.createPartition(tp0)
-      partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
-        new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), 
Option.apply(topicId))
-
-      val response = rm.becomeLeaderOrFollower(0, new 
LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(0)
-          .setIsr(Seq[Integer](0).asJava)
-          .setPartitionEpoch(0)
-          .setReplicas(Seq[Integer](0).asJava)
-          .setIsNew(false)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(0, "host1", 0)).asJava).build(), (_, _) => ())
-      // expect the errorCounts only has 1 entry with Errors.NONE
-      val errorCounts = response.errorCounts()
-      assertEquals(1, response.errorCounts().size())
-      assertNotNull(errorCounts.get(Errors.NONE))
-      spyLogManager.maybeUpdatePreferredLogDir(tp0, dir2.getAbsolutePath)
+      val delta = topicsCreateDelta(startId = 0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicId)
+      val image = imageFromTopics(delta.apply())
+      rm.applyDelta(delta, image)
+      val partition = rm.getPartitionOrException(topicPartition)
+
+      spyLogManager.maybeUpdatePreferredLogDir(topicPartition, 
dir2.getAbsolutePath)
 
       if (futureLogCreated) {
         // create future log before maybeAddLogDirFetchers invoked
@@ -404,7 +375,7 @@ class ReplicaManagerTest {
           new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
       } else {
         val mockLog = mock(classOf[UnifiedLog])
-        when(spyLogManager.getLog(tp0, isFuture = 
true)).thenReturn(Option.apply(mockLog))
+        when(spyLogManager.getLog(topicPartition, isFuture = 
true)).thenReturn(Option.apply(mockLog))
         when(mockLog.topicId).thenReturn(Optional.of(topicId))
         when(mockLog.parentDir).thenReturn(dir2.getAbsolutePath)
       }
@@ -1225,65 +1196,51 @@ class ReplicaManagerTest {
     }
   }
 
-  @Test
-  def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = {
-    verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(new 
Properties, expectTruncation = false)
-  }
-
   /**
    * If a partition becomes a follower and the leader is unchanged it should 
check for truncation
    * if the epoch has increased by more than one (which suggests it has missed 
an update). For
    * IBP version 2.7 onwards, we don't require this since we can truncate at 
any time based
    * on diverging epochs returned in fetch responses.
+   * This test assumes IBP >= 2.7 behavior, so `expectTruncation` is set to 
false and truncation is not expected.
    */
-  private def 
verifyBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(extraProps: 
Properties,
-                                                                             
expectTruncation: Boolean): Unit = {
-    val topicPartition = 0
+  @Test
+  def testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate(): Unit = {
+    val extraProps = new Properties
     val followerBrokerId = 0
     val leaderBrokerId = 1
-    val controllerId = 0
-    val controllerEpoch = 0
     var leaderEpoch = 1
     val leaderEpochIncrement = 2
-    val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
     val countDownLatch = new CountDownLatch(1)
     val offsetFromLeader = 5
-
     // Prepare the mocked components for the test
     val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
-      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, 
leaderBrokerId, countDownLatch,
-      expectTruncation = expectTruncation, localLogOffset = Optional.of(10), 
offsetFromLeader = offsetFromLeader, extraProps = extraProps, topicId = 
Optional.of(topicId))
+      topicPartition.partition(), leaderEpoch + leaderEpochIncrement, 
followerBrokerId, leaderBrokerId, countDownLatch,
+      expectTruncation = false, localLogOffset = Optional.of(10), 
offsetFromLeader = offsetFromLeader, extraProps = extraProps, topicId = 
Optional.of(topicId))
 
     try {
       // Initialize partition state to follower, with leader = 1, leaderEpoch 
= 1
-      val tp = new TopicPartition(topic, topicPartition)
-      val partition = replicaManager.createPartition(tp)
+      val partition = replicaManager.createPartition(topicPartition)
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       partition.createLogIfNotExists(isNew = false, isFutureReplica = false, 
offsetCheckpoints, None)
-      partition.makeFollower(
-        leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, 
aliveBrokerIds),
-        offsetCheckpoints,
-        None)
+      val followerDelta = topicsCreateDelta(startId = followerBrokerId, 
isStartIdLeader = false, partitions = List(topicPartition.partition()), 
List.empty, topic, topicIds(topic), leaderEpoch)
+      replicaManager.applyDelta(followerDelta, 
imageFromTopics(followerDelta.apply()))
+
+      // Verify log created and partition is hosted
+      val localLog = replicaManager.localLog(topicPartition)
+      assertTrue(localLog.isDefined, "Log should be created for follower after 
applyDelta")
+      val hostedPartition = replicaManager.getPartition(topicPartition)
+      assertTrue(hostedPartition.isInstanceOf[HostedPartition.Online])
 
       // Make local partition a follower - because epoch increased by more 
than 1, truncation should
       // trigger even though leader does not change
       leaderEpoch += leaderEpochIncrement
-      val leaderAndIsrRequest0 = new LeaderAndIsrRequest.Builder(
-        controllerId, controllerEpoch, brokerEpoch,
-        Seq(leaderAndIsrPartitionState(tp, leaderEpoch, leaderBrokerId, 
aliveBrokerIds)).asJava,
-        Collections.singletonMap(topic, topicId),
-        Set(new Node(followerBrokerId, "host1", 0),
-          new Node(leaderBrokerId, "host2", 1)).asJava).build()
-      replicaManager.becomeLeaderOrFollower(correlationId, 
leaderAndIsrRequest0,
-        (_, followers) => assertEquals(followerBrokerId, 
followers.head.partitionId))
+      val epochJumpDelta = topicsCreateDelta(startId = followerBrokerId, 
isStartIdLeader = false, partitions = List(topicPartition.partition()), 
List.empty, topic, topicIds(topic), leaderEpoch)
+      replicaManager.applyDelta(epochJumpDelta, 
imageFromTopics(epochJumpDelta.apply()))
+
       assertTrue(countDownLatch.await(1000L, TimeUnit.MILLISECONDS))
 
-      // Truncation should have happened once
-      if (expectTruncation) {
-        verify(mockLogMgr).truncateTo(Map(tp -> offsetFromLeader), isFuture = 
false)
-      }
 
-      verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(tp), 
any())
+      
verify(mockLogMgr).finishedInitializingLog(ArgumentMatchers.eq(topicPartition), 
any())
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -1859,16 +1816,16 @@ class ReplicaManagerTest {
     val producerEpoch = 0.toShort
     val sequence = 0
     val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
-
     val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0, tp1))
+    val brokerList = Seq[Integer](0, 1).asJava
     try {
-      replicaManager.becomeLeaderOrFollower(1,
-        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new 
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
-        (_, _) => ())
+      val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 1, 
replicas = brokerList, isr = brokerList)
+      val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 1, 
replicas = brokerList, isr = brokerList)
+      val image0 = imageFromTopics(leaderDelta0.apply())
+      replicaManager.applyDelta(leaderDelta0, image0)
 
-      replicaManager.becomeLeaderOrFollower(1,
-        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new 
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
-        (_, _) => ())
+      val image1 = imageFromTopics(leaderDelta1.apply())
+      replicaManager.applyDelta(leaderDelta1, image1)
 
       // If we supply no transactional ID and idempotent records, we do not 
verify.
       val idempotentRecords = 
MemoryRecords.withIdempotentRecords(Compression.NONE, producerId, 
producerEpoch, sequence,
@@ -3651,8 +3608,6 @@ class ReplicaManagerTest {
 
   @Test
   def testSuccessfulBuildRemoteLogAuxStateMetrics(): Unit = {
-    val tp0 = new TopicPartition(topic, 0)
-
     val remoteLogManager = mock(classOf[RemoteLogManager])
     val remoteLogSegmentMetadata = mock(classOf[RemoteLogSegmentMetadata])
     when(remoteLogManager.fetchRemoteLogSegmentMetadata(any(), anyInt(), 
anyLong())).thenReturn(
@@ -3664,40 +3619,25 @@ class ReplicaManagerTest {
 
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteLogManager = Some(remoteLogManager), 
buildRemoteLogAuxState = true)
     try {
-
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
-      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      
replicaManager.createPartition(topicPartition).createLogIfNotExists(isNew = 
false, isFutureReplica = false, offsetCheckpoints, None)
       val partition0Replicas = Seq[Integer](0, 1).asJava
-      val topicIds = Map(tp0.topic -> topicId).asJava
-      val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, 
brokerEpoch,
-        Seq(
-          new LeaderAndIsrRequest.PartitionState()
-            .setTopicName(tp0.topic)
-            .setPartitionIndex(tp0.partition)
-            .setControllerEpoch(0)
-            .setLeader(1)
-            .setLeaderEpoch(0)
-            .setIsr(partition0Replicas)
-            .setPartitionEpoch(0)
-            .setReplicas(partition0Replicas)
-            .setIsNew(true)
-        ).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
 
       // Verify the metrics for build remote log state and for failures is 
zero before replicas start to fetch
-      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
-      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+      assertEquals(0, 
brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count)
+      assertEquals(0, 
brokerTopicStats.topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate.count)
       // Verify aggregate metrics
       assertEquals(0, 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
       assertEquals(0, 
brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
 
-      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => 
())
+      val leaderDelta = createLeaderDelta(topicId, topicPartition, leaderId = 
1, replicas = partition0Replicas, isr = partition0Replicas)
+      val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
 
       // Replicas fetch from the leader periodically, therefore we check that 
the metric value is increasing
-      waitUntilTrue(() => 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count
 > 0,
-        "Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
-      assertEquals(0, 
brokerTopicStats.topicStats(tp0.topic()).failedBuildRemoteLogAuxStateRate.count)
+      waitUntilTrue(() => 
brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count
 > 0,
+        "Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + 
brokerTopicStats.topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate.count)
+      assertEquals(0, 
brokerTopicStats.topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate.count)
       // Verify aggregate metrics
       waitUntilTrue(() => 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count > 0,
         "Should have all topic buildRemoteLogAuxStateRequestRate count > 0, 
but got:" + 
brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
@@ -3881,41 +3821,35 @@ class ReplicaManagerTest {
   def testInconsistentIdReturnsError(): Unit = {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
     try {
-      val brokerList = Seq[Integer](0, 1).asJava
-      val topicPartition = new TopicPartition(topic, 0)
-      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val topicNames = topicIds.asScala.map(_.swap).asJava
-
-      val invalidTopicIds = Collections.singletonMap(topic, Uuid.randomUuid())
-      val invalidTopicNames = invalidTopicIds.asScala.map(_.swap).asJava
-
-      def leaderAndIsrRequest(epoch: Int, topicIds: java.util.Map[String, 
Uuid]): LeaderAndIsrRequest =
-        new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
-        Seq(new LeaderAndIsrRequest.PartitionState()
-          .setTopicName(topic)
-          .setPartitionIndex(0)
-          .setControllerEpoch(0)
-          .setLeader(0)
-          .setLeaderEpoch(epoch)
-          .setIsr(brokerList)
-          .setPartitionEpoch(0)
-          .setReplicas(brokerList)
-          .setIsNew(true)).asJava,
-        topicIds,
-        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
-
-      val response = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(0, topicIds), (_, _) => ())
-      assertEquals(Errors.NONE, 
response.partitionErrors(topicNames).get(topicPartition))
-
-      val response2 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, topicIds), (_, _) => ())
-      assertEquals(Errors.NONE, 
response2.partitionErrors(topicNames).get(topicPartition))
+      val invalidTopicId = Uuid.randomUuid()
+
+      val initialDelta = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicIds(topic))
+      val initialImage = imageFromTopics(initialDelta.apply())
+      replicaManager.applyDelta(initialDelta, initialImage)
+
+      val updateDelta = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = topicIds(topic), 
leaderEpoch = 1)
+      val updateImage = imageFromTopics(updateDelta.apply())
+      replicaManager.applyDelta(updateDelta, updateImage)
 
       // Send request with inconsistent ID.
-      val response3 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(1, invalidTopicIds), (_, _) => ())
-      assertEquals(Errors.INCONSISTENT_TOPIC_ID, 
response3.partitionErrors(invalidTopicNames).get(topicPartition))
+      val inconsistentDelta1 = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = invalidTopicId, 
leaderEpoch = 1)
+      val inconsistentImage1 = imageFromTopics(inconsistentDelta1.apply())
+      val exception1 = assertThrows(classOf[IllegalStateException], () => {
+        replicaManager.applyDelta(inconsistentDelta1, inconsistentImage1)
+      })
+      assertEquals(s"Topic ${topic}-0 exists, but its ID is ${topicId}, not 
${invalidTopicId} as expected", exception1.getMessage)
+
+      val inconsistentDelta2 = topicsCreateDelta(0, isStartIdLeader = true,
+        partitions = List(0), topicName = topic, topicId = invalidTopicId, 
leaderEpoch = 2)
+      val inconsistentImage2 = imageFromTopics(inconsistentDelta2.apply())
+      val exception2 = assertThrows(classOf[IllegalStateException], () => {
+        replicaManager.applyDelta(inconsistentDelta2, inconsistentImage2)
+      })
+      assertEquals(s"Topic ${topic}-0 exists, but its ID is ${topicId}, not 
${invalidTopicId} as expected", exception2.getMessage)
 
-      val response4 = replicaManager.becomeLeaderOrFollower(0, 
leaderAndIsrRequest(2, invalidTopicIds), (_, _) => ())
-      assertEquals(Errors.INCONSISTENT_TOPIC_ID, 
response4.partitionErrors(invalidTopicNames).get(topicPartition))
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }
@@ -3984,43 +3918,6 @@ class ReplicaManagerTest {
     }
   }
 
-  private def makeLeaderAndIsrRequest(
-    topicId: Uuid,
-    topicPartition: TopicPartition,
-    replicas: Seq[Int],
-    leaderAndIsr: LeaderAndIsr,
-    isNew: Boolean = true,
-    brokerEpoch: Int = 0,
-    controllerId: Int = 0,
-    controllerEpoch: Int = 0
-  ): LeaderAndIsrRequest = {
-    val partitionState = new LeaderAndIsrRequest.PartitionState()
-      .setTopicName(topicPartition.topic)
-      .setPartitionIndex(topicPartition.partition)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leaderAndIsr.leader)
-      .setLeaderEpoch(leaderAndIsr.leaderEpoch)
-      .setIsr(leaderAndIsr.isr)
-      .setPartitionEpoch(leaderAndIsr.partitionEpoch)
-      .setReplicas(replicas.map(Int.box).asJava)
-      .setIsNew(isNew)
-
-    def mkNode(replicaId: Int): Node = {
-      new Node(replicaId, s"host-$replicaId", 9092)
-    }
-
-    val nodes = Set(mkNode(controllerId)) ++ replicas.map(mkNode).toSet
-
-    new LeaderAndIsrRequest.Builder(
-      controllerId,
-      controllerEpoch,
-      brokerEpoch,
-      Seq(partitionState).asJava,
-      Map(topicPartition.topic -> topicId).asJava,
-      nodes.asJava
-    ).build()
-  }
-
   @Test
   def testActiveProducerState(): Unit = {
     val brokerId = 0

Reply via email to