kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131
########## core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala: ########## @@ -164,18 +169,71 @@ class DelayedFetchTest { assertTrue(delayedFetch.tryComplete()) assertTrue(delayedFetch.isCompleted) assertTrue(fetchResultOpt.isDefined) + + val fetchResult = fetchResultOpt.get + assertEquals(Errors.NONE, fetchResult.error) + } + + @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark minBytes={0}") + @ValueSource(ints = Array(1, 2)) + def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = { + val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") + val fetchOffset = 450L + val logStartOffset = 5L + val currentLeaderEpoch = Optional.of[Integer](10) + val replicaId = 1 + + val fetchStatus = FetchPartitionStatus( + startOffsetMetadata = new LogOffsetMetadata(fetchOffset), + fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, minBytes = minBytes) + + var fetchResultOpt: Option[FetchPartitionData] = None + def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { + fetchResultOpt = Some(responses.head._2) + } + + val delayedFetch = new DelayedFetch( + params = fetchParams, + fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus), + replicaManager = replicaManager, + quota = replicaQuota, + responseCallback = callback + ) + + val partition: Partition = mock(classOf[Partition]) + when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition) + // high-watermark is lesser than the log-start-offset + val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0) + when(partition.fetchOffsetSnapshot( + currentLeaderEpoch, + fetchOnlyFromLeader = true)) + .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, endOffsetMetadata, endOffsetMetadata)) + when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false) + expectReadFromReplica(fetchParams, topicIdPartition, fetchStatus.fetchInfo, Errors.NONE) + + val expected = minBytes == 1 + assertEquals(expected, delayedFetch.tryComplete()) + assertEquals(expected, delayedFetch.isCompleted) Review Comment: In the test, the [LogOffsetSnapshot](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java) contains message-only offset for logEndOffset, highWatermark, and lastStableOffset in DelayedFetchTest.java#207. So, the test passed with the newly added condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org