splett2 commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1268889660
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -858,13 +858,28 @@ class Partition(val topicPartition: TopicPartition, // No need to calculate low watermark if there is no delayed DeleteRecordsRequest val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset - replica.updateFetchState( - followerFetchOffsetMetadata, - followerStartOffset, - followerFetchTimeMs, - leaderEndOffset, - brokerEpoch - ) + + // Acquire the lock for the fetch state update. A race can happen between fetch requests from a rebooted broker. + // The requests before and after the reboot can carry different fetch metadata especially offsets and broker epoch. + // It can particularly affect the ISR expansion where we decide to expand based on stale fetch request but use the + // latest broker epoch to fill in the AlterPartition request. + inReadLock(leaderIsrUpdateLock) { + // Fence the fetch request with stale broker epoch from a rebooted follower. + val currentBrokerEpoch = replica.stateSnapshot.brokerEpoch.getOrElse(-1L) + if (brokerEpoch != -1 && brokerEpoch < currentBrokerEpoch) { + error(s"Received fetch request for $topicPartition with stale broker epoch=$brokerEpoch. The expected" + + s" broker epoch= $currentBrokerEpoch.\"") + return + } Review Comment: my understanding is that the epoch check needs to be done in the `updateFetchState` call under the atomic `updateAndGet`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org