splett2 commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1268889660


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -858,13 +858,28 @@ class Partition(val topicPartition: TopicPartition,
     // No need to calculate low watermark if there is no delayed 
DeleteRecordsRequest
     val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) 
lowWatermarkIfLeader else -1L
     val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset
-    replica.updateFetchState(
-      followerFetchOffsetMetadata,
-      followerStartOffset,
-      followerFetchTimeMs,
-      leaderEndOffset,
-      brokerEpoch
-    )
+
+    // Acquire the lock for the fetch state update. A race can happen between 
fetch requests from a rebooted broker.
+    // The requests before and after the reboot can carry different fetch 
metadata especially offsets and broker epoch.
+    // It can particularly affect the ISR expansion where we decide to expand 
based on stale fetch request but use the
+    // latest broker epoch to fill in the AlterPartition request.
+    inReadLock(leaderIsrUpdateLock) {
+      // Fence the fetch request with stale broker epoch from a rebooted 
follower.
+      val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L)
+      if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) {
+        error(s"Received fetch request for $topicPartition with stale broker 
epoch=$brokerEpoch. The expected" +
+          s" broker epoch= $currentBrokerEpoch.\"")
+        return
+      }

Review Comment:
   my understanding is that the epoch check needs to be done in the 
`updateFetchState` call under the atomic `updateAndGet`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to