jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597910287



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic 
IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition 
metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID 
$topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       Yeah. We could run into issues with downgrade and reupgrade. If we 
downgrade below 2.8 on ZK brokers, we may overwrite the topic ID on reupgrade. 
But the file will have the old version. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to