kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610893769
########## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ########## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + + val fetchResult = fetchResultOpt.get + assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithMessageOnlyHighWatermark endOffset={0}") + @ValueSource(longs = Array(0, 500)) + def testDelayedFetchWithMessageOnlyHighWatermark(endOffset: Long): Unit = { + val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") + val fetchOffset = 450L + val logStartOffset = 5L + val currentLeaderEpoch = Optional.of[Integer](10) + val replicaId = 1 + + val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) + + var fetchResultOpt: Option[FetchPartitionData] = None + def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) + } + + val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback + ) + + val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) + // Note that the high-watermark does not contain the complete metadata + val endOffsetMetadata = new LogOffsetMetadata(endOffset, -1L, -1) + when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + + // 1. When `endOffset` is 0, it refers to the truncation case + // 2. When `endOffset` is 500, it refers to the normal case + val expected = endOffset == 0 Review Comment: when fetchOffset > endOffset, `forceComplete` gets called. In this case. 450 > 0, so it triggers force completion. This is the only behavior change post the refactor. Previously when fetchOffset > endOffset: 1. If the offsets lie on the same segment, then we wait for the end-offset to move (or) timeout the request. 2. If the endOffset lie on the older segment compared to fetch-offset, then we complete the request by calling force-complete. We can retain the same behavior if required. ``` if (fetchOffset.messageOffset > endOffset.messageOffset) { if (endOffset.onOlderSegment(fetchOffset)) { // Case F, this can happen when the new fetch operation is on a truncated leader debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") return forceComplete() } } else if (fetchOffset.messageOffset < endOffset.messageOffset) { ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org