splett2 commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1268702825
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -1366,6 +1376,17 @@ class Partition(val topicPartition: TopicPartition, fetchParams.replicaId, fetchPartitionData ) + + // Fence the fetch request with stale broker epoch from a rebooted follower. + if (metadataCache.isInstanceOf[KRaftMetadataCache]) { + val brokerEpoch = fetchParams.replicaEpoch + val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L) + if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) { + throw new StaleBrokerEpochException(s"Received fetch request for $topicPartition with stale broker " + + s"epoch=$brokerEpoch. The expected broker epoch= $currentBrokerEpoch.") + } + } Review Comment: We would also need to update the `Replica.updateFetchState` call to check the brokerEpoch before trying to apply a fetch update. Otherwise we can pass this check and race for an update. I don't think we need to check whether we are in KRaft mode or not. The broker epoch should be monotonic in both zookeeper and kraft mode. I also don't think we can throw `StaleBrokerEpochException` since this would introduce a new error code returned on fetch responses without a KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org