dajac commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1269078528
########## core/src/main/scala/kafka/cluster/Partition.scala: ########## @@ -858,13 +858,22 @@ class Partition(val topicPartition: TopicPartition, // No need to calculate low watermark if there is no delayed DeleteRecordsRequest val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L val prevFollowerEndOffset = replica.stateSnapshot.logEndOffset - replica.updateFetchState( - followerFetchOffsetMetadata, - followerStartOffset, - followerFetchTimeMs, - leaderEndOffset, - brokerEpoch - ) + + // Acquire the lock for the fetch state update. A race can happen between fetch requests from a rebooted broker. + // The requests before and after the reboot can carry different fetch metadata especially offsets and broker epoch. + // It can particularly affect the ISR expansion where we decide to expand based on stale fetch request but use the + // latest broker epoch to fill in the AlterPartition request. + inReadLock(leaderIsrUpdateLock) { + if (!replica.maybeUpdateFetchState( + followerFetchOffsetMetadata, + followerStartOffset, + followerFetchTimeMs, + leaderEndOffset, + brokerEpoch + )) { + return Review Comment: Should we return an error here? `followerReplicaOrThrow` may be a good inspiration for the errors that we could use here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org