jolshan commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r677854946



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1367,12 +1367,25 @@ class ReplicaManager(val config: KafkaConfig,
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIdFromRequest(topicPartition.topic)
+              val logTopicId = partition.topicId
 
-              if (!hasConsistentTopicId(requestTopicId, partition.topicId)) {
-                stateChangeLogger.error(s"Topic ID in memory: 
${partition.topicId.get} does not" +
+              // When running a ZK controller and upgrading to topic IDs we 
may receive a request with leader epoch
+              // that is equal to the current leader epoch. In this case, we 
want to assign topic ID to the log.
+              def isUpgradingToTopicIdWithExistingLog: Boolean = {
+                requestLeaderEpoch == currentLeaderEpoch &&

Review comment:
       Ah yeah. That would work. I felt it was a bit odd too, but didn't think 
of this alternative. This placement is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to