kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598832131


##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
     assertTrue(delayedFetch.tryComplete())
     assertTrue(delayedFetch.isCompleted)
     assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
minBytes={0}")
+  @ValueSource(ints = Array(1, 2))
+  def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+    val fetchOffset = 450L
+    val logStartOffset = 5L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, 
minBytes = minBytes)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      params = fetchParams,
+      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      responseCallback = callback
+    )
+
+    val partition: Partition = mock(classOf[Partition])
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+    // high-watermark is lesser than the log-start-offset
+    val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+    when(partition.fetchOffsetSnapshot(
+      currentLeaderEpoch,
+      fetchOnlyFromLeader = true))
+      .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+    when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+    expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+    val expected = minBytes == 1
+    assertEquals(expected, delayedFetch.tryComplete())
+    assertEquals(expected, delayedFetch.isCompleted)

Review Comment:
   In the test, the 
[LogOffsetSnapshot](https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetSnapshot.java)
 contains message-only offset for logEndOffset, highWatermark, and 
lastStableOffset in DelayedFetchTest.java#207. So, the test passed with the 
newly added condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to