jolshan commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578010946



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the 
controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the 
decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from 
controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for 
partition $topicPartition as itself is not " +
-                    s"in assigned replica list 
${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to 
the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to 
file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the 
log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: 
${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
+                    invalidId = true
+                  }
+                }
+              }
+
+              // If we found an invalid ID, we don't need to check the leader 
epoch
+              if (!invalidId) {
+                if (requestLeaderEpoch > currentLeaderEpoch) {

Review comment:
       In this case, if log is None, does it make sense to error here? If it is 
None, we are unable to check the topic ID




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to