hachikuji commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r553598153



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -970,40 +996,98 @@ private FetchResponseData tryCompleteFetchRequest(
         long fetchOffset = request.fetchOffset();
         int lastFetchedEpoch = request.lastFetchedEpoch();
         LeaderState state = quorum.leaderStateOrThrow();
-        Optional<OffsetAndEpoch> divergingEpochOpt = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
-
-        if (divergingEpochOpt.isPresent()) {
-            Optional<FetchResponseData.EpochEndOffset> divergingEpoch =
-                divergingEpochOpt.map(offsetAndEpoch -> new 
FetchResponseData.EpochEndOffset()
-                    .setEpoch(offsetAndEpoch.epoch)
-                    .setEndOffset(offsetAndEpoch.offset));
-            return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, 
divergingEpoch, state.highWatermark());
-        } else {
+        ValidatedFetchOffsetAndEpoch validatedOffsetAndEpoch = 
validateFetchOffsetAndEpoch(fetchOffset, lastFetchedEpoch);
+
+        final Records records;
+        if (validatedOffsetAndEpoch.type() == 
ValidatedFetchOffsetAndEpoch.Type.VALID) {
             LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
 
             if (state.updateReplicaState(replicaId, currentTimeMs, 
info.startOffsetMetadata)) {
                 onUpdateLeaderHighWatermark(state, currentTimeMs);
             }
 
-            return buildFetchResponse(Errors.NONE, info.records, 
Optional.empty(), state.highWatermark());
+            records = info.records;
+        } else {
+            records = MemoryRecords.EMPTY;
         }
+
+        return buildFetchResponse(Errors.NONE, records, 
validatedOffsetAndEpoch, state.highWatermark());
     }
 
     /**
      * Check whether a fetch offset and epoch is valid. Return the diverging 
epoch, which
      * is the largest epoch such that subsequent records are known to diverge.
      */
-    private Optional<OffsetAndEpoch> validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
-        if (fetchOffset == 0 && lastFetchedEpoch == 0) {
-            return Optional.empty();
+    private ValidatedFetchOffsetAndEpoch validateFetchOffsetAndEpoch(long 
fetchOffset, int lastFetchedEpoch) {
+        if (log.startOffset() == 0 && fetchOffset == 0) {
+            if (lastFetchedEpoch != 0) {
+                logger.warn(
+                    "Replica sent a zero fetch offset ({}) but the last 
fetched epoch ({}) was not zero",
+                    fetchOffset,
+                    lastFetchedEpoch
+                );
+            }
+            return ValidatedFetchOffsetAndEpoch.valid(new 
OffsetAndEpoch(fetchOffset, lastFetchedEpoch));
+        }
+
+        if (fetchOffset < log.startOffset() || fetchOffset == 
log.startOffset()) {
+            // Snapshot must be present if start offset is non zero.
+            OffsetAndEpoch latestSnapshotId = 
log.latestSnapshotId().orElseThrow(() -> {
+                return new IllegalStateException(
+                    String.format(
+                        "The log start offset (%s) was greater than zero but 
no snapshot was found",
+                        log.startOffset()
+                    )
+                );
+            });
+
+            if (fetchOffset < log.startOffset() || lastFetchedEpoch != 
latestSnapshotId.epoch) {
+                return ValidatedFetchOffsetAndEpoch.snapshot(latestSnapshotId);
+            }
         }
 
         OffsetAndEpoch endOffsetAndEpoch = 
log.endOffsetForEpoch(lastFetchedEpoch)
             .orElse(new OffsetAndEpoch(-1L, -1));
         if (endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset) {
-            return Optional.of(endOffsetAndEpoch);
-        } else {
-            return Optional.empty();
+            // TODO: Investiage this. Can the diverging offset be less than 
log start offset? If so, then we might as well

Review comment:
       If `lastFetchedEpoch` is lower than the epoch of the first data in the 
log, then I believe `endOffsetForEpoch` will return `lastFetchedEpoch` with the 
end offset set to the current log start offset. In this case, I think we 
probably would prefer to return the latest snapshot.
   
   You can let me know if it makes sense, but I think the way I'd try to write 
the order of the checks in this method is the following:
   
   1. First call `endOffsetForEpoch` with `lastFetchedEpoch`
   2. If the end offset is undefined, then we need to read from the latest 
snapshot.
   3. If the end offset is defined, but it is less than or equal to the log 
start offset, then we also need the latest snapshot.
   4. Next check `endOffsetAndEpoch.epoch != lastFetchedEpoch || 
endOffsetAndEpoch.offset < fetchOffset` as we do today. If this is true, then 
we return the diverging epoch.
   5. We have a valid fetch.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to