rajinisivaram commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r545009813



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -2176,4 +2212,84 @@ class ReplicaManagerTest {
       replicaManager.shutdown(false)
     }
   }
+
+  @Test
+  def testPartitionMetadataFile() = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+
+      def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(epoch)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(true)).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) 
=> ())
+      assertTrue(!replicaManager.localLog(topicPartition).isEmpty)
+      val id = topicIds.get(topicPartition.topic())
+      val log = replicaManager.localLog(topicPartition).get
+      assertTrue(!log.partitionMetadataFile.isEmpty)
+      assertTrue(!log.partitionMetadataFile.get.isEmpty())
+      val partitionMetadata = log.partitionMetadataFile.get.read()
+
+      // Current version of PartitionMetadataFile is 0.
+      assertEquals(0, partitionMetadata.version)
+      assertEquals(id, partitionMetadata.topicId)
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
+  def testPartitionMetadataFileNotCreated() = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time))
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Collections.singletonMap(topic, Uuid.ZERO_UUID)
+
+      def leaderAndIsrRequest(epoch: Int, name: String): LeaderAndIsrRequest = 
new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(name)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(epoch)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(true)).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      // The file has no contents if the topic does not have an associated 
topic ID.
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, 
"fakeTopic"), (_, _) => ())
+      assertTrue(!replicaManager.localLog(topicPartition).isEmpty)

Review comment:
       nit: `assertFalse` instead of `assertTrue(!...)`. There are a few of 
these below as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to