hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597863289



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -341,9 +341,14 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica, topicId)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      // When running a ZK controller, we may get a log that does not have a 
topic ID. Assign it here.
+      if (log.topicId == None && topicId.isDefined) {

Review comment:
       nit: we can do `topicId.foreach`

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -374,4 +375,31 @@ class RaftReplicaManager(config: KafkaConfig,
     metadataImage.partitions.topicPartition(partition.topic, 
partition.partitionId).getOrElse(
       throw new IllegalStateException(s"Partition has metadata changes but 
does not exist in the metadata cache: ${partition.topicPartition}"))
   }
+
+  /**
+   * Checks if the topic ID received from the MetadataPartitionsBuilder is 
consistent with the topic ID in the log.
+   * If the log does not exist, logTopicIdOpt will be None. In this case, the 
ID is not inconsistent.
+   *
+   * @param receivedTopicIdOpt the topic ID received from the MetadataRecords 
if it exists
+   * @param logTopicIdOpt the topic ID in the log if the log exists
+   * @param topicPartition the topicPartition for the Partition being checked
+   * @throws InconsistentTopicIdException if the topic ids are not consistent
+   * @throws IllegalArgumentException if the MetadataPartitionsBuilder did not 
have a topic ID associated with the topic
+   */
+  private def checkTopicId(receivedTopicIdOpt: Option[Uuid], logTopicIdOpt: 
Option[Uuid], topicPartition: TopicPartition): Unit = {
+    receivedTopicIdOpt match {
+      case Some(receivedTopicId) =>
+        logTopicIdOpt.foreach(logTopicId => {
+          if (receivedTopicId != logTopicId) {
+            // not sure if we need both the logger and the error thrown
+            stateChangeLogger.error(s"Topic Id in memory: $logTopicId does 
not" +

Review comment:
       nit: can we be consistent about naming topicId? Here we use "Topic Id" 
while in the message below we use "Topic ID." I slightly prefer the more 
compact camel-case "topicId," but I don't feel strongly about it.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic 
IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition 
metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)

Review comment:
       You should be able to do something like `!topicId.contains(fileTopicId)`

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -374,4 +375,31 @@ class RaftReplicaManager(config: KafkaConfig,
     metadataImage.partitions.topicPartition(partition.topic, 
partition.partitionId).getOrElse(
       throw new IllegalStateException(s"Partition has metadata changes but 
does not exist in the metadata cache: ${partition.topicPartition}"))
   }
+
+  /**
+   * Checks if the topic ID received from the MetadataPartitionsBuilder is 
consistent with the topic ID in the log.
+   * If the log does not exist, logTopicIdOpt will be None. In this case, the 
ID is not inconsistent.
+   *
+   * @param receivedTopicIdOpt the topic ID received from the MetadataRecords 
if it exists
+   * @param logTopicIdOpt the topic ID in the log if the log exists
+   * @param topicPartition the topicPartition for the Partition being checked
+   * @throws InconsistentTopicIdException if the topic ids are not consistent
+   * @throws IllegalArgumentException if the MetadataPartitionsBuilder did not 
have a topic ID associated with the topic
+   */
+  private def checkTopicId(receivedTopicIdOpt: Option[Uuid], logTopicIdOpt: 
Option[Uuid], topicPartition: TopicPartition): Unit = {
+    receivedTopicIdOpt match {
+      case Some(receivedTopicId) =>
+        logTopicIdOpt.foreach(logTopicId => {
+          if (receivedTopicId != logTopicId) {
+            // not sure if we need both the logger and the error thrown
+            stateChangeLogger.error(s"Topic Id in memory: $logTopicId does 
not" +
+              s" match the topic Id for partition $topicPartition received: " +
+              s"$receivedTopicId.")
+            throw new InconsistentTopicIdException(s"Topic partition 
$topicPartition had an inconsistent topic ID.")
+          }
+        })
+      case None => throw new IllegalStateException(

Review comment:
       nit: it looks a little weird that we indent here, but we don't for 
`InconsistentTopicIdException` above

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1473,6 +1484,30 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Checks if the topic ID provided in the LeaderAndIsr request is consistent 
with the topic ID in the log.
+   *
+   * If the request had an invalid topic ID (null or zero), then we assume 
that topic IDs are not supported.
+   * The topic ID was not inconsistent, so return true.
+   * If the log does not exist or the topic ID is not yet set, logTopicIdOpt 
will be None.
+   * In both cases, the ID is not inconsistent so return true.
+   *
+   * @param requestTopicId the topic ID from the LeaderAndIsr request
+   * @param logTopicIdOpt the topic ID in the log if the log and the topic ID 
exist
+   * @return true if the request topic id is consistent, false otherwise
+   */
+  private def checkTopicId(requestTopicId: Uuid, logTopicIdOpt: Option[Uuid]): 
Boolean = {

Review comment:
       It will read a little easier if we name this `hasConsistentTopicId`.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic 
IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition 
metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID 
$topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       My feeling is that we should be able to get rid of 
`keepPartitionMetadataFile`. We should just create the file whenever a topicId 
is provided. Not sure if there is anything I'm missing. I don't think the file 
would cause any harm if we downgraded, would it?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -341,9 +341,14 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, 
isFutureReplica, topicId)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      // When running a ZK controller, we may get a log that does not have a 
topic ID. Assign it here.
+      if (log.topicId == None && topicId.isDefined) {
+        log.partitionMetadataFile.write(topicId.get)
+        log.topicId = Some(topicId.get)

Review comment:
       We can create a method in `Log` to update the topicId which does both of 
these operations.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -262,6 +266,7 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           private val hadCleanShutdown: Boolean = true,
+          @volatile var topicId : Option[Uuid] = None,

Review comment:
       Let's make this a mandatory parameter. Optional parameters are dangerous 
because they can be overlooked. As an example of the danger, the optional 
argument just above this one led to this bug: 
https://issues.apache.org/jira/browse/KAFKA-12504.
   
   Also nit: remove space before the colon.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1326,6 +1326,14 @@ class ReplicaManager(val config: KafkaConfig,
             s"epoch ${leaderAndIsrRequest.controllerEpoch}")
         }
       val topicIds = leaderAndIsrRequest.topicIds()
+      def getTopicId (topicName: String): Option[Uuid] = {

Review comment:
       nit: remove space before parenthesis

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster 
with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =

Review comment:
       I was looking into the uses of this and found that this one in 
`KafkaApis.handleDeleteTopicsRequest`. 
   
   ```scala
           if (!config.usesTopicId && 
topicIdsFromRequest.contains(topic.topicId)) {
             topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
             topic.setErrorMessage("Topic IDs are not supported on the server.")
   ```
   
   I'm having a little trouble understanding why this check is necessary. If we 
are not using topic ids, then it seems reasonable if we returned 
UNKNOWN_TOPIC_ID. 
   
   My concern is the following case. Say I am doing a roll of the cluster to 
upgrade the IBP to a version which supports topicIds. The controller may get 
upgraded first and begin creating topics with topicIds before the rest of the 
cluster has an upgraded IBP. Now if the user tries to delete the topic using 
the topicId, it could be sent to one of the brokers which does not have the 
bumped IBP. This would cause an UNSUPPORTED_VERSION error, which would be very 
surprising.
   
   If you agree this is a problem, we can open a separate issue to address it.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1473,6 +1484,30 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Checks if the topic ID provided in the LeaderAndIsr request is consistent 
with the topic ID in the log.
+   *
+   * If the request had an invalid topic ID (null or zero), then we assume 
that topic IDs are not supported.
+   * The topic ID was not inconsistent, so return true.
+   * If the log does not exist or the topic ID is not yet set, logTopicIdOpt 
will be None.
+   * In both cases, the ID is not inconsistent so return true.
+   *
+   * @param requestTopicId the topic ID from the LeaderAndIsr request
+   * @param logTopicIdOpt the topic ID in the log if the log and the topic ID 
exist
+   * @return true if the request topic id is consistent, false otherwise
+   */
+  private def checkTopicId(requestTopicId: Uuid, logTopicIdOpt: Option[Uuid]): 
Boolean = {
+    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) {
+      true
+    } else
+      logTopicIdOpt match {

Review comment:
       You can use `logTopicIdOpt.contains(requestTopicId)`

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic 
IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition 
metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID 
$topicId to log, but log already contained topicId $fileTopicId")

Review comment:
       Can you include the topic partition in this error message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to