jolshan commented on a change in pull request #11126: URL: https://github.com/apache/kafka/pull/11126#discussion_r676920282
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1367,13 +1367,23 @@ class ReplicaManager(val config: KafkaConfig, val currentLeaderEpoch = partition.getLeaderEpoch val requestLeaderEpoch = partitionState.leaderEpoch val requestTopicId = topicIdFromRequest(topicPartition.topic) + val logTopicId = partition.topicId + + // We propagate the partition state down if: + // 1. The leader epoch is higher than the current leader epoch of the partition + // 2. The leader epoch is same as the current leader epoch but a new topic id is being assigned. This is + // needed to handle the case where a topic id is assigned for the first time after upgrade. + def propagatePartitionState(requestLeaderEpoch: Int, currentLeaderEpoch: Int): Boolean = { + requestLeaderEpoch > currentLeaderEpoch || + (requestLeaderEpoch == currentLeaderEpoch && logTopicId.isEmpty && requestTopicId.isDefined) + } - if (!hasConsistentTopicId(requestTopicId, partition.topicId)) { - stateChangeLogger.error(s"Topic ID in memory: ${partition.topicId.get} does not" + + if (!hasConsistentTopicId(requestTopicId, logTopicId)) { + stateChangeLogger.error(s"Topic ID in memory: ${logTopicId.get} does not" + s" match the topic ID for partition $topicPartition received: " + s"${requestTopicId.get}.") responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID) - } else if (requestLeaderEpoch > currentLeaderEpoch) { + } else if (propagatePartitionState(requestLeaderEpoch, currentLeaderEpoch)) { Review comment: I took a closer look at the code in makeLeader and it seems that we do not reassign the value for a given epoch. ``` /** * Assigns the supplied Leader Epoch to the supplied Offset * Once the epoch is assigned it cannot be reassigned */ def assign(epoch: Int, startOffset: Long): Unit = { val entry = EpochEntry(epoch, startOffset) if (assign(entry)) { debug(s"Appended new epoch entry $entry. Cache now contains ${epochs.size} entries.") flush() } } ``` It is true that we may not initially assign a value if there is no epoch cache. But I'm not sure if that is a case we need to consider (ie, first request there is not a cache, second request there is) ``` def maybeAssignEpochStartOffset(leaderEpoch: Int, startOffset: Long): Unit = { leaderEpochCache.foreach { cache => cache.assign(leaderEpoch, startOffset) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org