junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1612186225
########## core/src/main/scala/kafka/server/DelayedFetch.scala: ########## @@ -103,7 +103,7 @@ class DelayedFetch( // We will not force complete the fetch request if a replica should be throttled. if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() - } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + } else if (fetchOffset.onSameSegment(endOffset) && fetchOffset.messageOffset < endOffset.messageOffset) { Review Comment: Hmm, we assume that `fetchOffset.messageOffset > endOffset.messageOffset` is the truncated leader case. In that case, we should always call `forceComplete()` immediately, right? The current code only calls `forceComplete()` when the offset metadata is available, but it's more consistent to do that regardless of the availability of the offset metadata. Should we do sth like the following? ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } else if (fetchOffset.messageOffset < endOffset.messageOffset) { if (fetchOffset.onOlderSegment(endOffset)) { // Case F, this can happen when the fetch operation is falling behind the current segment // or the partition has just rolled a new segment debug(s"Satisfying fetch $this immediately since it is fetching older segments.") // We will not force complete the fetch request if a replica should be throttled. if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) return forceComplete() } else if (fetchOffset.onSameSegment(endOffset)) { // we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition) val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes) if (!params.isFromFollower || !replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId)) accumulatedSize += bytesAvailable } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org