junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1612025298


##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
     assertTrue(delayedFetch.tryComplete())
     assertTrue(delayedFetch.isCompleted)
     assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+    val fetchOffset = 450L
+    val logStartOffset = 5L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      params = fetchParams,
+      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      responseCallback = callback
+    )
+
+    val partition: Partition = mock(classOf[Partition])
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+    // Note that the high-watermark does not contain the complete metadata
+    val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+    when(partition.fetchOffsetSnapshot(
+      currentLeaderEpoch,
+      fetchOnlyFromLeader = true))
+      .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+    when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+    expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+    // 1. When `endOffset` is 0, it refers to the truncation case
+    // 2. When `endOffset` is 500, it refers to the normal case
+    val expected = endOffset == 0

Review Comment:
   Got it. We won't complete when `endOffset` is 500 because it doesn't contain 
offset metadata. But this is not the normal case. So, we want to adjust "it 
refers to the normal case" accordingly.



##########
core/src/main/scala/kafka/server/DelayedFetch.scala:
##########
@@ -91,19 +91,23 @@ class DelayedFetch(
             // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
             // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
             // which would incorrectly be seen as an instance of Case F.
-            if (endOffset.messageOffset != fetchOffset.messageOffset) {
-              if (endOffset.onOlderSegment(fetchOffset)) {
-                // Case F, this can happen when the new fetch operation is on 
a truncated leader
-                debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-                return forceComplete()
+            if (fetchOffset.messageOffset > endOffset.messageOffset) {
+              // Case F, this can happen when the new fetch operation is on a 
truncated leader
+              debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+              return forceComplete()
+            } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+              if (fetchOffset.messageOffsetOnly() || 
endOffset.messageOffsetOnly()) {

Review Comment:
   Yes, since `onOlderSegment()` and `onSameSegment()` do the 
`messageOffsetOnly()` check already. We could just get rid of this condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to