jsancio commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r566430269
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -983,48 +999,88 @@ private FetchResponseData tryCompleteFetchRequest( FetchRequestData.FetchPartition request, long currentTimeMs ) { - Optional<Errors> errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); - if (errorOpt.isPresent()) { - return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); - } + try { + Optional<Errors> errorOpt = validateLeaderOnlyRequest(request.currentLeaderEpoch()); + if (errorOpt.isPresent()) { + return buildEmptyFetchResponse(errorOpt.get(), Optional.empty()); + } - long fetchOffset = request.fetchOffset(); - int lastFetchedEpoch = request.lastFetchedEpoch(); - LeaderState state = quorum.leaderStateOrThrow(); - Optional<OffsetAndEpoch> divergingEpochOpt = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - - if (divergingEpochOpt.isPresent()) { - Optional<FetchResponseData.EpochEndOffset> divergingEpoch = - divergingEpochOpt.map(offsetAndEpoch -> new FetchResponseData.EpochEndOffset() - .setEpoch(offsetAndEpoch.epoch) - .setEndOffset(offsetAndEpoch.offset)); - return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark()); - } else { - LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); + long fetchOffset = request.fetchOffset(); + int lastFetchedEpoch = request.lastFetchedEpoch(); + LeaderState state = quorum.leaderStateOrThrow(); + ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + + final Records records; + if (validatedOffsetAndEpoch.type() == ValidatedFetchOffsetAndEpoch.Type.VALID) { + LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); - if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { - onUpdateLeaderHighWatermark(state, currentTimeMs); + if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { + onUpdateLeaderHighWatermark(state, currentTimeMs); + } + + records = info.records; + } else { + records = MemoryRecords.EMPTY; } - return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark()); + return buildFetchResponse(Errors.NONE, records, validatedOffsetAndEpoch, state.highWatermark()); + } catch (Exception e) { + logger.error("Caught unexpected error in fetch completion of request {}", request, e); + return buildEmptyFetchResponse(Errors.UNKNOWN_SERVER_ERROR, Optional.empty()); } } /** * Check whether a fetch offset and epoch is valid. Return the diverging epoch, which * is the largest epoch such that subsequent records are known to diverge. */ - private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { - if (fetchOffset == 0 && lastFetchedEpoch == 0) { - return Optional.empty(); + private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { + if (log.startOffset() == 0 && fetchOffset == 0) { + if (lastFetchedEpoch != 0) { + logger.warn( + "Replica sent a zero fetch offset ({}) but the last fetched epoch ({}) was not zero", + fetchOffset, + lastFetchedEpoch + ); + } + return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); } - OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch) - .orElse(new OffsetAndEpoch(-1L, -1)); - if (endOffsetAndEpoch.epoch != lastFetchedEpoch || endOffsetAndEpoch.offset < fetchOffset) { - return Optional.of(endOffsetAndEpoch); + + OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch).orElseThrow(() -> { + return new IllegalStateException( + String.format( + "Expected to find an end offset for epoch %s since it must be less than the current epoch %s", + lastFetchedEpoch, + quorum.epoch() + ) + ); + }); + + if (log.oldestSnapshotId().isPresent() && + ((fetchOffset < log.startOffset()) || + (fetchOffset == log.startOffset() && lastFetchedEpoch != log.oldestSnapshotId().get().epoch) || + (lastFetchedEpoch < log.oldestSnapshotId().get().epoch))) { Review comment: Hmmm. The case that I have in mind is this one: ``` oldestSnapshotId: OffsetAndEpoch(10, 2) leaderEpochCache: (epoch: 4, startOffset: 10), (epoch: 5, starOffset: 20), ... ``` If the follower fetches `(fetchOffset: ..., lastFetchdEpoch: 3)`, then the leader should return `diverging(10, 2)`. If the follower fetches `(fetchOffset: 15, lastFetchdEpoch: 2)`, then the leader should return `diverging(10, 2)`. I think we can only do this if we have the check against the `oldestSnapshotId`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org