kamalcph commented on code in PR #20045:
URL: https://github.com/apache/kafka/pull/20045#discussion_r2182287587


##########
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##########
@@ -235,7 +471,8 @@ class DelayedRemoteFetchTest {
                               highWatermark: Int = 0,
                               leaderLogStartOffset: Int = 0): LogReadResult = {
     new LogReadResult(
-      new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY),
+      new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY, false, Optional.empty(),
+        Optional.of(mock(classOf[RemoteStorageFetchInfo]))),

Review Comment:
   Thanks for making this change! Previously, it was reading the results from 
`localReadResults`.



##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -81,17 +82,23 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
             return forceComplete()
         }
     }
-    if (remoteFetchResult.isDone) // Case c
-      forceComplete()
-    else
+    // Case c
+    if (remoteFetchResults.values().stream().anyMatch(taskResult => 
!taskResult.isDone))

Review Comment:
   nit: subjective
   Can the condition be inverted?
   
   ```
   // Case c
   if (remoteFetchResults.values().stream().allMatch(taskResult => 
taskResult.isDone))
     forceComplete()
   else
     false
   ```



##########
core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala:
##########
@@ -170,52 +206,252 @@ class DelayedRemoteFetchTest {
 
   @Test
   def testRequestExpiry(): Unit = {
-    var actualTopicPartition: Option[TopicIdPartition] = None
-    var fetchResultOpt: Option[FetchPartitionData] = None
+    val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
 
-    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
-      assertEquals(1, responses.size)
-      actualTopicPartition = Some(responses.head._1)
-      fetchResultOpt = Some(responses.head._2)
+    def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+      responseSeq.foreach { case (tp, data) =>
+        responses.put(tp, data)
+      }
     }
 
+    def expiresPerSecValue(): Double = {
+      val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      val metric = allMetrics.find { case (n, _) => 
n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")
 }
+
+      if (metric.isEmpty)
+        0
+      else
+        metric.get._2.asInstanceOf[Meter].count
+    }
+
+    val remoteFetchTaskExpired = mock(classOf[Future[Void]])
+    val remoteFetchTask2 = mock(classOf[Future[Void]])
+    // complete the 2nd task, and keep the 1st one expired
+    when(remoteFetchTask2.isDone).thenReturn(true)
+
+    // Create futures - one completed, one not
+    val future1: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    val future2: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    // Only complete one remote fetch
+    future2.complete(buildRemoteReadResult(Errors.NONE))
+
+    val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, 
null, null)
+    val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, 
null, null)
+
     val highWatermark = 100
     val leaderLogStartOffset = 10
 
-    val remoteFetchTask = mock(classOf[Future[Void]])
-    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
-    val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
-
-    val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, 
fetchInfo, remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
+    val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
+    val logReadInfo2 = buildReadResult(Errors.NONE)
+
+    val fetchStatus1 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchStatus2 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 
100, logStartOffset, maxBytes, currentLeaderEpoch))
+
+    // Set up maps for multiple partitions
+    val remoteFetchTasks = new java.util.HashMap[TopicIdPartition, 
Future[Void]]()
+    val remoteFetchResults = new java.util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]()
+    val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
+
+    remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired)
+    remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2)
+    remoteFetchResults.put(topicIdPartition, future1)
+    remoteFetchResults.put(topicIdPartition2, future2)
+    remoteFetchInfos.put(topicIdPartition, fetchInfo1)
+    remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(
+      remoteFetchTasks,
+      remoteFetchResults,
+      remoteFetchInfos,
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
+      replicaManager,
+      callback)
 
     
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
       .thenReturn(mock(classOf[Partition]))
+    
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
 
     // Verify that the ExpiresPerSec metric is zero before fetching
-    val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
-    assertEquals(0, metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
+    val existingMetricVal = expiresPerSecValue()
+    // Verify the delayedRemoteFetch is not completed yet
+    assertFalse(delayedRemoteFetch.isCompleted)
 
     // Force the delayed remote fetch to expire
     delayedRemoteFetch.run()
 
-    // Check that the task was cancelled and force-completed
-    verify(remoteFetchTask).cancel(false)
+    // Check that the expired task was cancelled and force-completed
+    verify(remoteFetchTaskExpired).cancel(anyBoolean())
+    verify(remoteFetchTask2, never()).cancel(anyBoolean())
     assertTrue(delayedRemoteFetch.isCompleted)
 
     // Check that the ExpiresPerSec metric was incremented
-    assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
+    assertTrue(expiresPerSecValue() > existingMetricVal)
 
-    // Fetch results should still include local read results
-    assertTrue(actualTopicPartition.isDefined)
-    assertEquals(topicIdPartition, actualTopicPartition.get)
-    assertTrue(fetchResultOpt.isDefined)
+    // Fetch results should include 2 results and the expired one should 
return local read results
+    assertEquals(2, responses.size)
+    assertTrue(responses.contains(topicIdPartition))
+    assertTrue(responses.contains(topicIdPartition2))
 
-    val fetchResult = fetchResultOpt.get
-    assertEquals(Errors.NONE, fetchResult.error)
-    assertEquals(highWatermark, fetchResult.highWatermark)
-    assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
+    assertEquals(Errors.NONE, responses(topicIdPartition).error)
+    assertEquals(highWatermark, responses(topicIdPartition).highWatermark)
+    assertEquals(leaderLogStartOffset, 
responses(topicIdPartition).logStartOffset)
+
+    assertEquals(Errors.NONE, responses(topicIdPartition2).error)
+  }
+
+  @Test
+  def testMultiplePartitions(): Unit = {
+    val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
+
+    def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+      responseSeq.foreach { case (tp, data) =>
+        responses.put(tp, data)
+      }
+    }
+
+    // Create futures - one completed, one not
+    val future1: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    val future2: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    // Only complete one remote fetch
+    future1.complete(buildRemoteReadResult(Errors.NONE))
+
+    val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, 
null, null)
+    val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, 
null, null)
+
+    val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
+    val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
+
+    val fetchStatus1 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchStatus2 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 
100, logStartOffset, maxBytes, currentLeaderEpoch))
+
+    // Set up maps for multiple partitions
+    val remoteFetchResults = new java.util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]()
+    val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
+
+    remoteFetchResults.put(topicIdPartition, future1)
+    remoteFetchResults.put(topicIdPartition2, future2)
+    remoteFetchInfos.put(topicIdPartition, fetchInfo1)
+    remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(
+      Collections.emptyMap[TopicIdPartition, Future[Void]](),
+      remoteFetchResults,
+      remoteFetchInfos,
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
+      replicaManager,
+      callback)
+
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+    
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+
+    // Should not complete since future2 is not done
+    assertFalse(delayedRemoteFetch.tryComplete())
+    assertFalse(delayedRemoteFetch.isCompleted)
+
+    // Complete the other future
+    future2.complete(buildRemoteReadResult(Errors.NONE))
+
+    // Now it should complete
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+
+    // Verify both partitions were processed without error
+    assertEquals(2, responses.size)
+    assertTrue(responses.contains(topicIdPartition))
+    assertTrue(responses.contains(topicIdPartition2))
+
+    assertEquals(Errors.NONE, responses(topicIdPartition).error)
+    assertEquals(Errors.NONE, responses(topicIdPartition2).error)

Review Comment:
   can the highWatermark and logStartOffset also be asserted? It should also be 
filled in the response.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1598,31 +1601,40 @@ class ReplicaManager(val config: KafkaConfig,
       })
     } catch {
       case e: RejectedExecutionException =>
-        // Return the error if any in scheduling the remote fetch task
-        warn("Unable to fetch data from remote storage", e)
-        return Some(createLogReadResult(e))
+        warn(s"Unable to fetch data from remote storage for remoteFetchInfo: 
$remoteFetchInfo", e)
+        // Store the error in RemoteLogReadResult if any in scheduling the 
remote fetch task.
+        // It will be sent back to the client in DelayedRemoteFetch along with 
other successful remote fetch results.
+        remoteFetchResult.complete(new RemoteLogReadResult(Optional.empty, 
Optional.of(e)))
+        delayedRemoteFetchPurgatory.checkAndComplete(key)

Review Comment:
   DelayedRemoteFetch is not instantiated at this place. Calling the 
purgatory#checkAndComplete might complete the existing delayed operation in the 
purgatory (which is fine). Could you explain why do we need this change?
   ```
   delayedRemoteFetchPurgatory.checkAndComplete(key)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to