hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578575340



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,11 +1364,19 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
+              val requestTopicId = topicIds.get(topicPartition.topic)
+              val (consistentTopicId, logTopicId) = 
partition.checkOrSetTopicId(requestTopicId)

Review comment:
       The return type here is a little awkward. It looks like we only need 
`logTopicId` for the log message below. Since `Partition` also has access to 
the state change logger, how about we move it into `checkOrSetTopicId`?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -428,6 +428,35 @@ class Partition(val topicPartition: TopicPartition,
       this.log = Some(log)
   }
 
+  /**
+   * This method checks if the topic ID provided in the request is consistent 
with the topic ID in the log.
+   * If a valid topic ID is provided, but the log has no ID set, set the log 
ID to be the request ID.
+   * Returns a boolean representing whether the topic ID was consistent and 
the final log ID if it exists.
+   */
+  def checkOrSetTopicId(requestTopicId: Uuid): (Boolean, Option[Uuid]) = {
+    // If the request had an invalid topic ID, then we assume that topic IDs 
are not supported.
+    // The topic ID was not inconsistent, so return true.
+    // If the log is empty, then we can not say that topic ID is inconsistent, 
so return true.
+    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
+      (true, None)
+    else if (log.isEmpty)

Review comment:
       nit: whenever you see an `isEmpty` or `isDefined` followed by a `get`, 
it's a good sign that we could probably simplify with a `match`. For example:
   ```scala
   else {
    log match {
      case None => (true, None)
      case Some(log) => 
   ...
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to