jolshan commented on a change in pull request #10282: URL: https://github.com/apache/kafka/pull/10282#discussion_r594558352
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File, // Delete partition metadata file if the version does not support topic IDs. // Recover topic ID if present and topic IDs are supported + // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist + // write to the partition metadata file. + // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file. if (partitionMetadataFile.exists()) { if (!keepPartitionMetadataFile) partitionMetadataFile.delete() - else - topicId = partitionMetadataFile.read().topicId + else { + val fileTopicId = partitionMetadataFile.read().topicId + if (topicId.isDefined && fileTopicId != topicId.get) + throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId") Review comment: I don't know if it is possible to get to this error message. I think in most cases, the log should be grabbed if it already exists in the makeLeader/makeFollower path. In the log loading path, the topicId should be None. I thought it would be good to throw this error to know that something was wrong with the code, but maybe there is a better way. (Like maybe if topicId is defined in general) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org