junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1612186225


##########
core/src/main/scala/kafka/server/DelayedFetch.scala:
##########
@@ -103,7 +103,7 @@ class DelayedFetch(
                 // We will not force complete the fetch request if a replica 
should be throttled.
                 if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
                   return forceComplete()
-              } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+              } else if (fetchOffset.onSameSegment(endOffset) && 
fetchOffset.messageOffset < endOffset.messageOffset) {

Review Comment:
   Hmm, we assume that `fetchOffset.messageOffset > endOffset.messageOffset` is 
the truncated leader case. In that case, we should always call 
`forceComplete()` immediately, right? The current code only calls 
`forceComplete()` when the offset metadata is available, but it's more 
consistent to do that regardless of the availability of the offset metadata. 
Should we do sth like the following?
   
   ```
               if (fetchOffset.messageOffset > endOffset.messageOffset) {
                 // Case F, this can happen when the new fetch operation is on 
a truncated leader
                 debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
                 return forceComplete()
               } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                 if (fetchOffset.onOlderSegment(endOffset)) {
                   // Case F, this can happen when the fetch operation is 
falling behind the current segment
                   // or the partition has just rolled a new segment
                   debug(s"Satisfying fetch $this immediately since it is 
fetching older segments.")
                   // We will not force complete the fetch request if a replica 
should be throttled.
                   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
                     return forceComplete()
                 } else if (fetchOffset.onSameSegment(endOffset)) {
                   // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
                   val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
                   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
                     accumulatedSize += bytesAvailable
                 }
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to