hachikuji commented on a change in pull request #9816: URL: https://github.com/apache/kafka/pull/9816#discussion_r553598153
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -970,40 +996,98 @@ private FetchResponseData tryCompleteFetchRequest( long fetchOffset = request.fetchOffset(); int lastFetchedEpoch = request.lastFetchedEpoch(); LeaderState state = quorum.leaderStateOrThrow(); - Optional<OffsetAndEpoch> divergingEpochOpt = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); - - if (divergingEpochOpt.isPresent()) { - Optional<FetchResponseData.EpochEndOffset> divergingEpoch = - divergingEpochOpt.map(offsetAndEpoch -> new FetchResponseData.EpochEndOffset() - .setEpoch(offsetAndEpoch.epoch) - .setEndOffset(offsetAndEpoch.offset)); - return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark()); - } else { + ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch); + + final Records records; + if (validatedOffsetAndEpoch.type() == ValidatedFetchOffsetAndEpoch.Type.VALID) { LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED); if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) { onUpdateLeaderHighWatermark(state, currentTimeMs); } - return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark()); + records = info.records; + } else { + records = MemoryRecords.EMPTY; } + + return buildFetchResponse(Errors.NONE, records, validatedOffsetAndEpoch, state.highWatermark()); } /** * Check whether a fetch offset and epoch is valid. Return the diverging epoch, which * is the largest epoch such that subsequent records are known to diverge. */ - private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { - if (fetchOffset == 0 && lastFetchedEpoch == 0) { - return Optional.empty(); + private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long fetchOffset, int lastFetchedEpoch) { + if (log.startOffset() == 0 && fetchOffset == 0) { + if (lastFetchedEpoch != 0) { + logger.warn( + "Replica sent a zero fetch offset ({}) but the last fetched epoch ({}) was not zero", + fetchOffset, + lastFetchedEpoch + ); + } + return ValidatedFetchOffsetAndEpoch.valid(new OffsetAndEpoch(fetchOffset, lastFetchedEpoch)); + } + + if (fetchOffset < log.startOffset() || fetchOffset == log.startOffset()) { + // Snapshot must be present if start offset is non zero. + OffsetAndEpoch latestSnapshotId = log.latestSnapshotId().orElseThrow(() -> { + return new IllegalStateException( + String.format( + "The log start offset (%s) was greater than zero but no snapshot was found", + log.startOffset() + ) + ); + }); + + if (fetchOffset < log.startOffset() || lastFetchedEpoch != latestSnapshotId.epoch) { + return ValidatedFetchOffsetAndEpoch.snapshot(latestSnapshotId); + } } OffsetAndEpoch endOffsetAndEpoch = log.endOffsetForEpoch(lastFetchedEpoch) .orElse(new OffsetAndEpoch(-1L, -1)); if (endOffsetAndEpoch.epoch != lastFetchedEpoch || endOffsetAndEpoch.offset < fetchOffset) { - return Optional.of(endOffsetAndEpoch); - } else { - return Optional.empty(); + // TODO: Investiage this. Can the diverging offset be less than log start offset? If so, then we might as well Review comment: If `lastFetchedEpoch` is lower than the epoch of the first data in the log, then I believe `endOffsetForEpoch` will return `lastFetchedEpoch` with the end offset set to the current log start offset. In this case, I think we probably would prefer to return the latest snapshot. You can let me know if it makes sense, but I think the way I'd try to write the order of the checks in this method is the following: 1. First call `endOffsetForEpoch` with `lastFetchedEpoch` 2. If the end offset is undefined, then we need to read from the latest snapshot. 3. If the end offset is defined, but it is less than or equal to the log start offset, then we also need the latest snapshot. 4. Next check `endOffsetAndEpoch.epoch != lastFetchedEpoch || endOffsetAndEpoch.offset < fetchOffset` as we do today. If this is true, then we return the diverging epoch. 5. We have a valid fetch. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org