Hangleton commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1102707234
########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String, * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset. * - * In the first case, the follower's current log end offset is smaller than the leader's log start offset - * (or leader's local log start offset). - * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current - * leader's log start offset(or leader's local log start offset). + * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the + * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log + * start offset. * In the second case, the follower should just keep the current log segments and retry the fetch. In the second * case, there will be some inconsistency of data between old and new leader. We are not solving it here. * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both * brokers and producers. * * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset - * and the current leader's (local-log-start-offset or) log start offset. + * and the current leader's log start offset. */ - val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset) - leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else - leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) - + val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) Review Comment: IIUC, this is correct. We should try to fetch from the leader log start offset instead of the local leader log start offset so that in the case where the leader log start offset < local leader log start offset, the leader returns an offset-move-to-tiered-storage error and the follower takes the related code path to reconstruct the local replica log prefix. ########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String, * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset. * - * In the first case, the follower's current log end offset is smaller than the leader's log start offset - * (or leader's local log start offset). - * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current - * leader's log start offset(or leader's local log start offset). + * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the + * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log + * start offset. Review Comment: I think this case refers to line 674. This comment reverts back to the original one, before the change introduced for TS. Agreed it could be made clearer though, perhaps by referencing explicitly the case below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org