jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r598142353



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic 
IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition 
metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID 
$topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       We would rewrite in ZK, not the file. So we would always get an 
inconsistent topic ID.
   
   We can overwrite in ZK when we add partitions/change ISR since this 
information is stored in the ZNode along with topic ID. When we write to it 
using an older version, we can't keep the topic ID field and it is lost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to