Yunyung commented on code in PR #20009: URL: https://github.com/apache/kafka/pull/20009#discussion_r2160128739
########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -4148,43 +4113,25 @@ class ReplicaManagerTest { } @Test - def testPartitionMetadataFileNotCreated(): Unit = { + def testPartitionMetadataFileCreated(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) try { val brokerList = Seq[Integer](0, 1).asJava val topicPartition = new TopicPartition(topic, 0) - val topicPartitionFake = new TopicPartition("fakeTopic", 0) - val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> Uuid.randomUuid()).asJava - val topicNames = topicIds.asScala.map(_.swap).asJava - def leaderAndIsrRequest(epoch: Int, name: String): LeaderAndIsrRequest = - new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(name) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(epoch) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - - // There is no file if the topic does not have an associated topic ID. - val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic"), (_, _) => ()) - assertTrue(replicaManager.localLog(topicPartitionFake).isDefined) - val log = replicaManager.localLog(topicPartitionFake).get - assertFalse(log.partitionMetadataFile.get.exists()) - assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) + val leaderDelta = createLeaderDelta( + topicId = Uuid.ZERO_UUID, + partition = topicPartition, + leaderId = 0, + replicas = brokerList, + isr = brokerList, + ) - // There is no file if the topic has the default UUID. - val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic), (_, _) => ()) + // The file exists if the topic has the default UUID. + replicaManager.applyDelta(leaderDelta, imageFromTopics(leaderDelta.apply())) assertTrue(replicaManager.localLog(topicPartition).isDefined) val log2 = replicaManager.localLog(topicPartition).get Review Comment: ```suggestion val log = replicaManager.localLog(topicPartition).get ``` ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -635,35 +635,27 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava + val tp0 = new TopicPartition(topic, 0) + val tp1 = new TopicPartition(topic, 1) // Create a couple partition for the topic. - val partition0 = replicaManager.createPartition(new TopicPartition(topic, 0)) + val partition0 = replicaManager.createPartition(tp0) partition0.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) - val partition1 = replicaManager.createPartition(new TopicPartition(topic, 1)) + val partition1 = replicaManager.createPartition(tp1) partition1.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) // Make this replica the leader for the partitions. - Seq(0, 1).foreach { partition => - val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - Collections.singletonMap(topic, topicId), - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava, - LeaderAndIsrRequest.Type.UNKNOWN - ).build() - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) - replicaManager.getPartitionOrException(new TopicPartition(topic, partition)) Review Comment: Let's keep this line. ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -4148,43 +4113,25 @@ class ReplicaManagerTest { } @Test - def testPartitionMetadataFileNotCreated(): Unit = { + def testPartitionMetadataFileCreated(): Unit = { val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time)) try { val brokerList = Seq[Integer](0, 1).asJava val topicPartition = new TopicPartition(topic, 0) - val topicPartitionFake = new TopicPartition("fakeTopic", 0) - val topicIds = Map(topic -> Uuid.ZERO_UUID, "foo" -> Uuid.randomUuid()).asJava - val topicNames = topicIds.asScala.map(_.swap).asJava - def leaderAndIsrRequest(epoch: Int, name: String): LeaderAndIsrRequest = - new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(name) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(epoch) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - - // There is no file if the topic does not have an associated topic ID. - val response = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic"), (_, _) => ()) - assertTrue(replicaManager.localLog(topicPartitionFake).isDefined) - val log = replicaManager.localLog(topicPartitionFake).get - assertFalse(log.partitionMetadataFile.get.exists()) - assertEquals(Errors.NONE, response.partitionErrors(topicNames).get(topicPartition)) + val leaderDelta = createLeaderDelta( + topicId = Uuid.ZERO_UUID, + partition = topicPartition, + leaderId = 0, + replicas = brokerList, + isr = brokerList, + ) - // There is no file if the topic has the default UUID. - val response2 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, topic), (_, _) => ()) + // The file exists if the topic has the default UUID. + replicaManager.applyDelta(leaderDelta, imageFromTopics(leaderDelta.apply())) assertTrue(replicaManager.localLog(topicPartition).isDefined) val log2 = replicaManager.localLog(topicPartition).get - assertFalse(log2.partitionMetadataFile.get.exists()) - assertEquals(Errors.NONE, response2.partitionErrors(topicNames).get(topicPartition)) + assertTrue(log2.partitionMetadataFile.get.exists()) Review Comment: ```suggestion assertTrue(log.partitionMetadataFile.get.exists()) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org