hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605109950



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -341,9 +341,13 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica, topicId)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      // When running a ZK controller, we may get a log that does not have a 
topic ID. Assign it here.
+      if (log.topicId == None) {

Review comment:
       nit: use `isEmpty`?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -341,9 +341,13 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica, topicId)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      // When running a ZK controller, we may get a log that does not have a 
topic ID. Assign it here.
+      if (log.topicId == None) {
+        topicId.foreach(topicId => log.writeTopicIdToExistingLog(topicId))

Review comment:
       nit: how about `assignTopicId` instead of `writeTopicIdToExistingLog`?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic 
IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition 
metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID 
$topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       Ok, I think I get it.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,21 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic 
IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition 
metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && !topicId.contains(fileTopicId))
+            throw new IllegalStateException(s"Tried to assign topic ID 
$topicId to log for topic partition $topicPartition," +
+              s"but log already contained topic ID $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {
+      partitionMetadataFile.write(topicId.get)

Review comment:
       nit: we can use `foreach`
   ```scala
   } else if (keepPartitionMetadataFile) {
     topicId.foreach(partitionMetadataFile.write)
   }
   ```

##########
File path: core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
##########
@@ -72,7 +72,8 @@ class RaftReplicaChangeDelegate(helper: 
RaftReplicaChangeDelegateHelper) {
   def makeLeaders(prevPartitionsAlreadyExisting: Set[MetadataPartition],
                   partitionStates: Map[Partition, MetadataPartition],
                   highWatermarkCheckpoints: OffsetCheckpoints,
-                  metadataOffset: Option[Long]): Set[Partition] = {
+                  metadataOffset: Option[Long],
+                  topicIds: String => Option[Uuid]): Set[Partition] = {

Review comment:
       I guess we could use a strong type here since topicIds are required for 
KIP-500, but maybe not worth it since we are delegating to `Partition` in the 
end.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -775,9 +776,10 @@ class LogManager(logDirs: Seq[File],
    * @param topicPartition The partition whose log needs to be returned or 
created
    * @param isNew Whether the replica should have existed on the broker or not
    * @param isFuture True if the future log of the specified partition should 
be returned or created
+   * @param topicId The topic ID of the topic used in the case of log creation.
    * @throws KafkaStorageException if isNew=false, log is not found in the 
cache and there is offline log directory on the broker
    */
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false): Log = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, 
isFuture: Boolean = false, topicId: Option[Uuid]): Log = {

Review comment:
       It seems useful to validate here that when the `Log` already exists, its 
topicId does not conflict with the parameter.

##########
File path: 
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
##########
@@ -145,7 +145,7 @@ public void setup() {
         OffsetCheckpoints checkpoints = (logDir, topicPartition) -> 
Option.apply(0L);
         for (TopicPartition topicPartition : topicPartitions) {
             final Partition partition = 
this.replicaManager.createPartition(topicPartition);
-            partition.createLogIfNotExists(true, false, checkpoints);
+            partition.createLogIfNotExists(true, false, checkpoints, 
Option.empty());

Review comment:
       Seems no harm either way, but why not set one anyway since that will be 
the default behavior going forward?

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -374,4 +375,30 @@ class RaftReplicaManager(config: KafkaConfig,
     metadataImage.partitions.topicPartition(partition.topic, 
partition.partitionId).getOrElse(
       throw new IllegalStateException(s"Partition has metadata changes but 
does not exist in the metadata cache: ${partition.topicPartition}"))
   }
+
+  /**
+   * Checks if the topic ID received from the MetadataPartitionsBuilder is 
consistent with the topic ID in the log.
+   * If the log does not exist, logTopicIdOpt will be None. In this case, the 
ID is not inconsistent.
+   *
+   * @param receivedTopicIdOpt the topic ID received from the MetadataRecords 
if it exists
+   * @param logTopicIdOpt the topic ID in the log if the log exists
+   * @param topicPartition the topicPartition for the Partition being checked
+   * @throws InconsistentTopicIdException if the topic ids are not consistent
+   * @throws IllegalArgumentException if the MetadataPartitionsBuilder did not 
have a topic ID associated with the topic
+   */
+  private def checkTopicId(receivedTopicIdOpt: Option[Uuid], logTopicIdOpt: 
Option[Uuid], topicPartition: TopicPartition): Unit = {
+    receivedTopicIdOpt match {
+      case Some(receivedTopicId) =>
+        logTopicIdOpt.foreach(logTopicId => {

Review comment:
       nit: usually we write this like this:
   ```scala
   logTopicIdOpt.foreach { logTopicId => 
   ```

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1633,10 +1633,102 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(replicas)
       .setIsNew(false)
-    partition.makeLeader(leaderState, offsetCheckpoints)
+    partition.makeLeader(leaderState, offsetCheckpoints, None)
     assertEquals(4, partition.localLogOrException.highWatermark)
   }
 
+  @Test
+  def testTopicIdAndPartitionMetadataFileForLeader(): Unit = {
+    val controllerEpoch = 3
+    val leaderEpoch = 5
+    val topicId = Uuid.randomUuid()
+    val replicas = List[Integer](brokerId, brokerId + 1).asJava
+    val leaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(brokerId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(false)
+    partition.makeLeader(leaderState, offsetCheckpoints, Some(topicId))
+
+    checkTopicId(topicId, partition)
+
+    // Create new Partition object for same topicPartition
+    val partition2 = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      localBrokerId = brokerId,
+      time,
+      isrChangeListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterIsrManager)
+
+    // partition2 should not yet be associated with the log, but should be 
able to get ID
+    assertTrue(partition2.topicId.isDefined)
+    assertEquals(topicId, partition2.topicId.get)
+    assertFalse(partition2.log.isDefined)
+
+    // Calling makeLeader with a new topic ID should not overwrite the old 
topic ID. We should get the same log.

Review comment:
       Do we have test cases for the InconsistentTopicId scenario as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to