hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578575340
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,11 +1364,19 @@ class ReplicaManager(val config: KafkaConfig,
Some(partition)
}
- // Next check partition's leader epoch
+ // Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
- if (requestLeaderEpoch > currentLeaderEpoch) {
+ val requestTopicId = topicIds.get(topicPartition.topic)
+ val (consistentTopicId, logTopicId) =
partition.checkOrSetTopicId(requestTopicId)
Review comment:
The return type here is a little awkward. It looks like we only need
`logTopicId` for the log message below. Since `Partition` also has access to
the state change logger, how about we move it into `checkOrSetTopicId`?
##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -428,6 +428,35 @@ class Partition(val topicPartition: TopicPartition,
this.log = Some(log)
}
+ /**
+ * This method checks if the topic ID provided in the request is consistent
with the topic ID in the log.
+ * If a valid topic ID is provided, but the log has no ID set, set the log
ID to be the request ID.
+ * Returns a boolean representing whether the topic ID was consistent and
the final log ID if it exists.
+ */
+ def checkOrSetTopicId(requestTopicId: Uuid): (Boolean, Option[Uuid]) = {
+ // If the request had an invalid topic ID, then we assume that topic IDs
are not supported.
+ // The topic ID was not inconsistent, so return true.
+ // If the log is empty, then we can not say that topic ID is inconsistent,
so return true.
+ if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
+ (true, None)
+ else if (log.isEmpty)
Review comment:
nit: whenever you see an `isEmpty` or `isDefined` followed by a `get`,
it's a good sign that we could probably simplify with a `match`. For example:
```scala
else {
log match {
case None => (true, None)
case Some(log) =>
...
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]