hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r577972328
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
Some(partition)
}
- // Next check partition's leader epoch
+ // Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
- if (requestLeaderEpoch > currentLeaderEpoch) {
- // If the leader epoch is valid record the epoch of the
controller that made the leadership decision.
- // This is useful while updating the isr to maintain the
decision maker controller's epoch in the zookeeper path
- if (partitionState.replicas.contains(localBrokerId))
- partitionStates.put(partition, partitionState)
- else {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from
controller $controllerId with " +
- s"correlation id $correlationId epoch $controllerEpoch for
partition $topicPartition as itself is not " +
- s"in assigned replica list
${partitionState.replicas.asScala.mkString(",")}")
- responseMap.put(topicPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ val id = topicIds.get(topicPartition.topic())
+ var invalidId = false
+
+ // Ensure we have not received a request from an older protocol
+ if (id != null && id != Uuid.ZERO_UUID) {
+ partition.log.foreach { log =>
+ // Check if topic ID is in memory, if not, it must be new to
the broker and does not have a metadata file.
+ // This is because if the broker previously wrote it to
file, it would be recovered on restart after failure.
+ if (log.topicId == Uuid.ZERO_UUID) {
+ log.partitionMetadataFile.write(id)
+ log.topicId = id
+ // Warn if the topic ID in the request does not match the
log.
+ } else if (log.topicId != id) {
+ stateChangeLogger.warn(s"Topic Id in memory:
${log.topicId.toString} does not" +
Review comment:
I think we can even call this an error. The current state machine in the
controller should prevent this from happening. When it does not, it is a bug.
Also, can we mention the partition name in this log message?
Finally, one nit: it is not necessary to call `toString` explicitly in log
statements. It is called implicitly. So we can replace `${id.toString}` with
`$id` for example.
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
Some(partition)
}
- // Next check partition's leader epoch
+ // Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
- if (requestLeaderEpoch > currentLeaderEpoch) {
- // If the leader epoch is valid record the epoch of the
controller that made the leadership decision.
- // This is useful while updating the isr to maintain the
decision maker controller's epoch in the zookeeper path
- if (partitionState.replicas.contains(localBrokerId))
- partitionStates.put(partition, partitionState)
- else {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from
controller $controllerId with " +
- s"correlation id $correlationId epoch $controllerEpoch for
partition $topicPartition as itself is not " +
- s"in assigned replica list
${partitionState.replicas.asScala.mkString(",")}")
- responseMap.put(topicPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ val id = topicIds.get(topicPartition.topic())
Review comment:
nit: unnecessary parenthesis after `topic`. Also, can we name this
explicitly as `topicId`. Perhaps even `requestTopicId` to emphasize that this
is the one from the request.
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
Some(partition)
}
- // Next check partition's leader epoch
+ // Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
- if (requestLeaderEpoch > currentLeaderEpoch) {
- // If the leader epoch is valid record the epoch of the
controller that made the leadership decision.
- // This is useful while updating the isr to maintain the
decision maker controller's epoch in the zookeeper path
- if (partitionState.replicas.contains(localBrokerId))
- partitionStates.put(partition, partitionState)
- else {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from
controller $controllerId with " +
- s"correlation id $correlationId epoch $controllerEpoch for
partition $topicPartition as itself is not " +
- s"in assigned replica list
${partitionState.replicas.asScala.mkString(",")}")
- responseMap.put(topicPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ val id = topicIds.get(topicPartition.topic())
+ var invalidId = false
+
+ // Ensure we have not received a request from an older protocol
+ if (id != null && id != Uuid.ZERO_UUID) {
+ partition.log.foreach { log =>
+ // Check if topic ID is in memory, if not, it must be new to
the broker and does not have a metadata file.
+ // This is because if the broker previously wrote it to
file, it would be recovered on restart after failure.
+ if (log.topicId == Uuid.ZERO_UUID) {
+ log.partitionMetadataFile.write(id)
+ log.topicId = id
+ // Warn if the topic ID in the request does not match the
log.
+ } else if (log.topicId != id) {
+ stateChangeLogger.warn(s"Topic Id in memory:
${log.topicId.toString} does not" +
+ s" match the topic Id provided in the request: " +
+ s"${id.toString}.")
+ responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
+ invalidId = true
+ }
+ }
+ }
+
+ // If we found an invalid ID, we don't need to check the leader
epoch
+ if (!invalidId) {
+ if (requestLeaderEpoch > currentLeaderEpoch) {
Review comment:
It's a minor thing, but we can avoid this nesting by restructuring the
checks a little bit. For example, it would be a good idea to have a helper in
`Partition` which encapsulate the update of the topicId state. Maybe something
like this:
```scala
class Partition {
// Update topicid if necessary.
// Return false if the update failed because the topicId is inconsistent
def maybeUpdateTopicId(topicId: Uuid): Boolean
}
// in ReplicaManager
if (!partition.maybeUpdateTopicId(requestTopicId)) {
error(...)
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
} else if (requestLeaderEpoch > currentLeaderEpoch) {
...
##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
Some(partition)
}
- // Next check partition's leader epoch
+ // Next check the topic ID and the partition's leader epoch
partitionOpt.foreach { partition =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
- if (requestLeaderEpoch > currentLeaderEpoch) {
- // If the leader epoch is valid record the epoch of the
controller that made the leadership decision.
- // This is useful while updating the isr to maintain the
decision maker controller's epoch in the zookeeper path
- if (partitionState.replicas.contains(localBrokerId))
- partitionStates.put(partition, partitionState)
- else {
- stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from
controller $controllerId with " +
- s"correlation id $correlationId epoch $controllerEpoch for
partition $topicPartition as itself is not " +
- s"in assigned replica list
${partitionState.replicas.asScala.mkString(",")}")
- responseMap.put(topicPartition,
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ val id = topicIds.get(topicPartition.topic())
+ var invalidId = false
+
+ // Ensure we have not received a request from an older protocol
+ if (id != null && id != Uuid.ZERO_UUID) {
+ partition.log.foreach { log =>
+ // Check if topic ID is in memory, if not, it must be new to
the broker and does not have a metadata file.
+ // This is because if the broker previously wrote it to
file, it would be recovered on restart after failure.
+ if (log.topicId == Uuid.ZERO_UUID) {
+ log.partitionMetadataFile.write(id)
+ log.topicId = id
+ // Warn if the topic ID in the request does not match the
log.
+ } else if (log.topicId != id) {
+ stateChangeLogger.warn(s"Topic Id in memory:
${log.topicId.toString} does not" +
+ s" match the topic Id provided in the request: " +
+ s"${id.toString}.")
+ responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
Review comment:
I am a little doubtful about reusing this error code since this case
does not quite match the one it was intended for. I wonder if it would be worth
having an explicit `INCONSISTENT_TOPIC_ID` error code?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]