satishd commented on code in PR #20045:
URL: https://github.com/apache/kafka/pull/20045#discussion_r2203811713


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3586,6 +3586,109 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testMultipleRemoteFetchesInOneFetchRequest(): Unit = {
+    val replicaId = -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    val tidp1 = new TopicIdPartition(topicId, tp1)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
+
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+
+      val leaderEpoch = 0
+      val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
+      val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
+      replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
+      replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
+
+      val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, Optional.empty)
+      val fetchOffsetTp0 = 1
+      val fetchOffsetTp1 = 2
+
+      val responseSeq = new AtomicReference[Seq[(TopicIdPartition, 
FetchPartitionData)]]()
+      val responseLatch = new CountDownLatch(1)
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        responseSeq.set(responseStatus)
+        responseLatch.countDown()
+      }
+
+      val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new 
util.HashSet[Consumer[RemoteLogReadResult]]()
+      when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => {
+        callbacks.add(ans.getArgument(1, 
classOf[Consumer[RemoteLogReadResult]]))
+        mock(classOf[Future[Void]])
+      })
+
+      // Start the fetch request for both partitions - this should trigger 
remote fetches since
+      // the default mocked log behavior throws OffsetOutOfRangeException
+      replicaManager.fetchMessages(params, Seq(
+        tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
+        tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))
+      ), UNBOUNDED_QUOTA, fetchCallback)
+
+      // Verify that exactly two asyncRead calls were made (one for each 
partition)
+      val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = 
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
+      verify(mockRemoteLogManager, 
times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any())
+
+      // Verify that remote fetch operations were properly set up for both 
partitions
+      assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2, 
"DelayedRemoteFetch purgatory should have operations")
+
+      // Verify both partitions were captured in the remote fetch requests
+      val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
+      assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage 
fetch info calls")
+
+      val capturedTopicPartitions = 
capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
+      assertTrue(capturedTopicPartitions.contains(tp0), "Should contain tp0")
+      assertTrue(capturedTopicPartitions.contains(tp1), "Should contain tp1")
+
+      // Verify the fetch info details are correct for both partitions
+      capturedFetchInfos.foreach { fetchInfo =>
+        assertEquals(topicId, fetchInfo.fetchInfo.topicId)
+        assertEquals(startOffset, fetchInfo.fetchInfo.logStartOffset)
+        assertEquals(leaderEpoch, fetchInfo.fetchInfo.currentLeaderEpoch.get())
+        if (fetchInfo.topicIdPartition.topicPartition == tp0) {
+          assertEquals(fetchOffsetTp0, fetchInfo.fetchInfo.fetchOffset)
+        } else {
+          assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset)
+        }
+      }
+
+      // Complete the 2 asyncRead tasks
+      callbacks.forEach(callback => 
callback.accept(buildRemoteReadResult(Errors.NONE)))
+
+      // Wait for the fetch callback to complete and verify responseSeq content
+      assertTrue(responseLatch.await(5, TimeUnit.SECONDS), "Fetch callback 
should complete")
+
+      val responseData = responseSeq.get()
+      assertNotNull(responseData, "Response sequence should not be null")
+      assertEquals(2, responseData.size, "Response should contain data for 
both partitions")
+
+      // Verify that response contains both tidp0 and tidp1 and have no errors
+      val responseTopicIdPartitions = responseData.map(_._1).toSet
+      assertTrue(responseTopicIdPartitions.contains(tidp0), "Response should 
contain tidp0")
+      assertTrue(responseTopicIdPartitions.contains(tidp1), "Response should 
contain tidp1")

Review Comment:
   assert condition should have topic partition: "Response should contain 
tidp1" -> "Response should contain" + tidp1



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3586,6 +3586,109 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testMultipleRemoteFetchesInOneFetchRequest(): Unit = {
+    val replicaId = -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    val tidp1 = new TopicIdPartition(topicId, tp1)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
+
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+
+      val leaderEpoch = 0
+      val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
+      val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
+      replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
+      replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
+
+      val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, Optional.empty)
+      val fetchOffsetTp0 = 1
+      val fetchOffsetTp1 = 2
+
+      val responseSeq = new AtomicReference[Seq[(TopicIdPartition, 
FetchPartitionData)]]()
+      val responseLatch = new CountDownLatch(1)
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        responseSeq.set(responseStatus)
+        responseLatch.countDown()
+      }
+
+      val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new 
util.HashSet[Consumer[RemoteLogReadResult]]()
+      when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => {
+        callbacks.add(ans.getArgument(1, 
classOf[Consumer[RemoteLogReadResult]]))
+        mock(classOf[Future[Void]])
+      })
+
+      // Start the fetch request for both partitions - this should trigger 
remote fetches since
+      // the default mocked log behavior throws OffsetOutOfRangeException
+      replicaManager.fetchMessages(params, Seq(
+        tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
+        tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))
+      ), UNBOUNDED_QUOTA, fetchCallback)
+
+      // Verify that exactly two asyncRead calls were made (one for each 
partition)
+      val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = 
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
+      verify(mockRemoteLogManager, 
times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any())
+
+      // Verify that remote fetch operations were properly set up for both 
partitions
+      assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2, 
"DelayedRemoteFetch purgatory should have operations")
+
+      // Verify both partitions were captured in the remote fetch requests
+      val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
+      assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage 
fetch info calls")
+
+      val capturedTopicPartitions = 
capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
+      assertTrue(capturedTopicPartitions.contains(tp0), "Should contain tp0")

Review Comment:
   assert condition should have topic partition: "Should contain tp0" -> 
"Should contain " + tp0



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3586,6 +3586,109 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testMultipleRemoteFetchesInOneFetchRequest(): Unit = {
+    val replicaId = -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    val tidp1 = new TopicIdPartition(topicId, tp1)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
+
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+
+      val leaderEpoch = 0
+      val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
+      val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
+      replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
+      replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
+
+      val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, Optional.empty)
+      val fetchOffsetTp0 = 1
+      val fetchOffsetTp1 = 2
+
+      val responseSeq = new AtomicReference[Seq[(TopicIdPartition, 
FetchPartitionData)]]()
+      val responseLatch = new CountDownLatch(1)
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        responseSeq.set(responseStatus)
+        responseLatch.countDown()
+      }
+
+      val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new 
util.HashSet[Consumer[RemoteLogReadResult]]()
+      when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => {
+        callbacks.add(ans.getArgument(1, 
classOf[Consumer[RemoteLogReadResult]]))
+        mock(classOf[Future[Void]])
+      })
+
+      // Start the fetch request for both partitions - this should trigger 
remote fetches since
+      // the default mocked log behavior throws OffsetOutOfRangeException
+      replicaManager.fetchMessages(params, Seq(
+        tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
+        tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))
+      ), UNBOUNDED_QUOTA, fetchCallback)
+
+      // Verify that exactly two asyncRead calls were made (one for each 
partition)
+      val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = 
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
+      verify(mockRemoteLogManager, 
times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any())
+
+      // Verify that remote fetch operations were properly set up for both 
partitions
+      assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2, 
"DelayedRemoteFetch purgatory should have operations")
+
+      // Verify both partitions were captured in the remote fetch requests
+      val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
+      assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage 
fetch info calls")
+
+      val capturedTopicPartitions = 
capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
+      assertTrue(capturedTopicPartitions.contains(tp0), "Should contain tp0")
+      assertTrue(capturedTopicPartitions.contains(tp1), "Should contain tp1")
+
+      // Verify the fetch info details are correct for both partitions
+      capturedFetchInfos.foreach { fetchInfo =>
+        assertEquals(topicId, fetchInfo.fetchInfo.topicId)
+        assertEquals(startOffset, fetchInfo.fetchInfo.logStartOffset)
+        assertEquals(leaderEpoch, fetchInfo.fetchInfo.currentLeaderEpoch.get())
+        if (fetchInfo.topicIdPartition.topicPartition == tp0) {
+          assertEquals(fetchOffsetTp0, fetchInfo.fetchInfo.fetchOffset)
+        } else {
+          assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset)
+        }
+      }
+
+      // Complete the 2 asyncRead tasks
+      callbacks.forEach(callback => 
callback.accept(buildRemoteReadResult(Errors.NONE)))
+
+      // Wait for the fetch callback to complete and verify responseSeq content
+      assertTrue(responseLatch.await(5, TimeUnit.SECONDS), "Fetch callback 
should complete")
+
+      val responseData = responseSeq.get()
+      assertNotNull(responseData, "Response sequence should not be null")
+      assertEquals(2, responseData.size, "Response should contain data for 
both partitions")
+
+      // Verify that response contains both tidp0 and tidp1 and have no errors
+      val responseTopicIdPartitions = responseData.map(_._1).toSet
+      assertTrue(responseTopicIdPartitions.contains(tidp0), "Response should 
contain tidp0")

Review Comment:
   assert condition should have topic partition: "Response should contain 
tidp0" -> "Response should contain" + tidp0



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3586,6 +3586,109 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testMultipleRemoteFetchesInOneFetchRequest(): Unit = {
+    val replicaId = -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    val tidp1 = new TopicIdPartition(topicId, tp1)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
+
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+
+      val leaderEpoch = 0
+      val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
+      val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
+      replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
+      replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
+
+      val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, Optional.empty)
+      val fetchOffsetTp0 = 1
+      val fetchOffsetTp1 = 2
+
+      val responseSeq = new AtomicReference[Seq[(TopicIdPartition, 
FetchPartitionData)]]()
+      val responseLatch = new CountDownLatch(1)
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        responseSeq.set(responseStatus)
+        responseLatch.countDown()
+      }
+
+      val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new 
util.HashSet[Consumer[RemoteLogReadResult]]()
+      when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => {
+        callbacks.add(ans.getArgument(1, 
classOf[Consumer[RemoteLogReadResult]]))
+        mock(classOf[Future[Void]])
+      })
+
+      // Start the fetch request for both partitions - this should trigger 
remote fetches since
+      // the default mocked log behavior throws OffsetOutOfRangeException
+      replicaManager.fetchMessages(params, Seq(
+        tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
+        tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))
+      ), UNBOUNDED_QUOTA, fetchCallback)
+
+      // Verify that exactly two asyncRead calls were made (one for each 
partition)
+      val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = 
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
+      verify(mockRemoteLogManager, 
times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any())
+
+      // Verify that remote fetch operations were properly set up for both 
partitions
+      assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2, 
"DelayedRemoteFetch purgatory should have operations")
+
+      // Verify both partitions were captured in the remote fetch requests
+      val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
+      assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage 
fetch info calls")
+
+      val capturedTopicPartitions = 
capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
+      assertTrue(capturedTopicPartitions.contains(tp0), "Should contain tp0")
+      assertTrue(capturedTopicPartitions.contains(tp1), "Should contain tp1")

Review Comment:
   assert condition should have topic partition: "Should contain tp1" -> 
"Should contain " + tp1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to