kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769
########## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ########## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + + val fetchResult = fetchResultOpt.get + assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { + val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") + val fetchOffset = 450L + val logStartOffset = 5L + val currentLeaderEpoch = Optional.of[Integer](10) + val replicaId = 1 + + val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + + var fetchResultOpt: Option[FetchPartitionData] = None + def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) + } + + val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback + ) + + val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) + // Note that the high-watermark does not contain the complete metadata + val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) + when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + + // 1. When `endOffset` is 0, it refers to the truncation case + // 2. When `endOffset` is 500, it refers to the normal case + val expected = endOffset == 0 Review Comment: when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 > 0, so it triggers force completion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org