kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769


##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
     assertTrue(delayedFetch.tryComplete())
     assertTrue(delayedFetch.isCompleted)
     assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark 
endOffset={0}")
+  @ValueSource(longs = Array(0, 500))
+  def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = {
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+    val fetchOffset = 450L
+    val logStartOffset = 5L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      params = fetchParams,
+      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      responseCallback = callback
+    )
+
+    val partition: Partition = mock(classOf[Partition])
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+    // Note that the high-watermark does not contain the complete metadata
+    val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1)
+    when(partition.fetchOffsetSnapshot(
+      currentLeaderEpoch,
+      fetchOnlyFromLeader = true))
+      .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+    when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+    expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+    // 1. When `endOffset` is 0, it refers to the truncation case
+    // 2. When `endOffset` is 500, it refers to the normal case
+    val expected = endOffset == 0

Review Comment:
   when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 
> 0, so it triggers force completion.
   
   This is the only behavior change post the refactor. Previously when 
fetchOffset > endOffset:
   
   1. If the offsets lie on the same segment, then we wait for the end-offset 
to move (or) timeout the request. 
   2. If the endOffset lie on the older segment compared to fetch-offset, then 
we complete the request by calling force-complete. 
   
   We can retain the same behavior if required.
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
     if (endOffset.onOlderSegment(fetchOffset)) {
       // Case F, this can happen when the new fetch operation is on a 
truncated leader
       debug(s"Satisfying fetch $this since it is fetching later segments of 
partition $topicIdPartition.")
       return forceComplete()
     }
   } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
                  ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to