splett2 commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1268702825


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1366,6 +1376,17 @@ class Partition(val topicPartition: TopicPartition,
           fetchParams.replicaId,
           fetchPartitionData
         )
+
+        // Fence the fetch request with stale broker epoch from a rebooted 
follower.
+        if (metadataCache.isInstanceOf[KRaftMetadataCache]) {
+          val brokerEpoch = fetchParams.replicaEpoch
+          val currentBrokerEpoch = 
replica.stateSnapshot.brokerEpoch.getOrElse(-1L)
+          if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) {
+            throw new StaleBrokerEpochException(s"Received fetch request for 
$topicPartition with stale broker " +
+              s"epoch=$brokerEpoch. The expected broker epoch= 
$currentBrokerEpoch.")
+          }
+        }

Review Comment:
   We would also need to update the `Replica.updateFetchState` call to check 
the brokerEpoch before trying to apply a fetch update. Otherwise we can pass 
this check and race for an update.
   
   I don't think we need to check whether we are in KRaft mode or not. The 
broker epoch should be monotonic in both zookeeper and kraft mode.
   
   I also don't think we can throw `StaleBrokerEpochException` since this would 
introduce a new error code returned on fetch responses without a KIP. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to