This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e1ff3876054 KAFKA-14915: Allow reading from remote storage for 
multiple partitions in one fetchRequest (#20045)
e1ff3876054 is described below

commit e1ff38760545b472267aaa60a3422bd8cbf74b6a
Author: Luke Chen <[email protected]>
AuthorDate: Mon Jul 14 22:12:08 2025 +0800

    KAFKA-14915: Allow reading from remote storage for multiple partitions in 
one fetchRequest (#20045)
    
    This PR enables reading remote storage for multiple partitions in one
    fetchRequest. The main changes are:
    1. In `DelayedRemoteFetch`, we accept multiple remoteFetchTasks and
    other metadata now.
    2. In `DelayedRemoteFetch`, we'll wait until all remoteFetch done,
    either succeeded or failed.
    3. In `ReplicaManager#fetchMessage`, we'll create one
    `DelayedRemoteFetch` and pass multiple remoteFetch metadata to it, and
    watch all of them.
    4. Added tests
    
    Reviewers: Kamal Chandraprakash<[email protected]>, Federico 
Valeri <[email protected]>, Satish Duggana <[email protected]>
---
 .../kafka/clients/consumer/ConsumerConfig.java     |   8 +-
 .../scala/kafka/server/DelayedRemoteFetch.scala    |  24 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  94 +++---
 .../kafka/server/DelayedRemoteFetchTest.scala      | 340 ++++++++++++++++++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 111 ++++++-
 docs/ops.html                                      |   1 -
 .../purgatory/DelayedOperationPurgatory.java       |   2 +-
 .../log/remote/storage/RemoteLogManager.java       |   4 +-
 .../log/remote/storage/RemoteLogManagerConfig.java |   4 +-
 .../server/log/remote/storage/RemoteLogReader.java |  12 +-
 .../internals/log/RemoteStorageFetchInfo.java      |  10 +-
 .../log/remote/storage/RemoteLogManagerTest.java   |   9 +-
 .../log/remote/storage/RemoteLogReaderTest.java    |   7 +-
 13 files changed, 496 insertions(+), 130 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7700090ccef..3fcdf20953c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -197,9 +197,7 @@ public class ConsumerConfig extends AbstractConfig {
             "this value, the record batch will still be returned to ensure 
that the consumer can make progress. As such, this is not a absolute maximum. " 
+
             "The maximum record batch size accepted by the broker is defined 
via <code>message.max.bytes</code> (broker config) or " +
             "<code>max.message.bytes</code> (topic config). A fetch request 
consists of many partitions, and there is another setting that controls how 
much " +
-            "data is returned for each partition in a fetch request - see 
<code>max.partition.fetch.bytes</code>. Note that there is a current limitation 
when " +
-            "performing remote reads from tiered storage (KIP-405) - only one 
partition out of the fetch request is fetched from the remote store 
(KAFKA-14915). " +
-            "Note also that the consumer performs multiple fetches in 
parallel.";
+            "data is returned for each partition in a fetch request - see 
<code>max.partition.fetch.bytes</code>. Note that the consumer performs 
multiple fetches in parallel.";
     public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
 
     /**
@@ -224,9 +222,7 @@ public class ConsumerConfig extends AbstractConfig {
             "partition of the fetch is larger than this limit, the " +
             "batch will still be returned to ensure that the consumer can make 
progress. The maximum record batch size " +
             "accepted by the broker is defined via 
<code>message.max.bytes</code> (broker config) or " +
-            "<code>max.message.bytes</code> (topic config). See " + 
FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size. " +
-            "Consider increasing <code>max.partition.fetch.bytes</code> 
especially in the cases of remote storage reads (KIP-405), because currently 
only " +
-            "one partition per fetch request is served from the remote store 
(KAFKA-14915).";
+            "<code>max.message.bytes</code> (topic config). See " + 
FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size.";
     public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 
1024;
 
     /** <code>send.buffer.bytes</code> */
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 317c8dd4ac9..cb14a14b3e9 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, 
RemoteLogReadResult, RemoteStorageFetchInfo}
 
+import java.util
 import java.util.concurrent.{CompletableFuture, Future, TimeUnit}
 import java.util.{Optional, OptionalInt, OptionalLong}
 import scala.collection._
@@ -36,9 +37,9 @@ import scala.collection._
  * A remote fetch operation that can be created by the replica manager and 
watched
  * in the remote fetch operation purgatory
  */
-class DelayedRemoteFetch(remoteFetchTask: Future[Void],
-                         remoteFetchResult: 
CompletableFuture[RemoteLogReadResult],
-                         remoteFetchInfo: RemoteStorageFetchInfo,
+class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition, 
Future[Void]],
+                         remoteFetchResults: util.Map[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]],
+                         remoteFetchInfos: util.Map[TopicIdPartition, 
RemoteStorageFetchInfo],
                          remoteFetchMaxWaitMs: Long,
                          fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)],
                          fetchParams: FetchParams,
@@ -56,7 +57,7 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
    *
    * Case a: This broker is no longer the leader of the partition it tries to 
fetch
    * Case b: This broker does not know the partition it tries to fetch
-   * Case c: The remote storage read request completed (succeeded or failed)
+   * Case c: All the remote storage read request completed (succeeded or 
failed)
    * Case d: The partition is in an offline log directory on this broker
    *
    * Upon completion, should return whatever data is available for each valid 
partition
@@ -81,7 +82,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
             return forceComplete()
         }
     }
-    if (remoteFetchResult.isDone) // Case c
+    // Case c
+    if (remoteFetchResults.values().stream().allMatch(taskResult => 
taskResult.isDone))
       forceComplete()
     else
       false
@@ -90,8 +92,13 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
   override def onExpiration(): Unit = {
     // cancel the remote storage read task, if it has not been executed yet and
     // avoid interrupting the task if it is already running as it may force 
closing opened/cached resources as transaction index.
-    val cancelled = remoteFetchTask.cancel(false)
-    if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo: 
$remoteFetchInfo could not be cancelled and its isDone value is 
${remoteFetchTask.isDone}")
+    remoteFetchTasks.forEach { (topicIdPartition, task) =>
+      if (task != null && !task.isDone) {
+         if (!task.cancel(false)) {
+           debug(s"Remote fetch task for remoteFetchInfo: 
${remoteFetchInfos.get(topicIdPartition)} could not be cancelled.")
+         }
+      }
+    }
 
     DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
   }
@@ -101,7 +108,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
    */
   override def onComplete(): Unit = {
     val fetchPartitionData = localReadResults.map { case (tp, result) =>
-      if (tp.topicPartition().equals(remoteFetchInfo.topicPartition)
+      val remoteFetchResult = remoteFetchResults.get(tp)
+      if (remoteFetchInfos.containsKey(tp)
         && remoteFetchResult.isDone
         && result.error == Errors.NONE
         && result.info.delayedRemoteStorageFetch.isPresent) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e70a4726216..448ec1cf264 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1579,15 +1579,18 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo 
could not be scheduled successfully
-   * else returns [[None]].
+   * Initiates an asynchronous remote storage fetch operation for the given 
remote fetch information.
+   *
+   * This method schedules a remote fetch task with the remote log manager and 
sets up the necessary
+   * completion handling for the operation. The remote fetch result will be 
used to populate the
+   * delayed remote fetch purgatory when completed.
+   *
+   * @param remoteFetchInfo The remote storage fetch information
+   *
+   * @return A tuple containing the remote fetch task and the remote fetch 
result
    */
-  private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo,
-                                 params: FetchParams,
-                                 responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit,
-                                 logReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
-                                 fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)]): Option[LogReadResult] = {
-    val key = new 
TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), 
remoteFetchInfo.topicPartition.partition())
+  private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo): 
(Future[Void], CompletableFuture[RemoteLogReadResult]) = {
+    val key = new TopicPartitionOperationKey(remoteFetchInfo.topicIdPartition)
     val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
     var remoteFetchTask: Future[Void] = null
     try {
@@ -1597,31 +1600,39 @@ class ReplicaManager(val config: KafkaConfig,
       })
     } catch {
       case e: RejectedExecutionException =>
-        // Return the error if any in scheduling the remote fetch task
-        warn("Unable to fetch data from remote storage", e)
-        return Some(createLogReadResult(e))
+        warn(s"Unable to fetch data from remote storage for remoteFetchInfo: 
$remoteFetchInfo", e)
+        // Store the error in RemoteLogReadResult if any in scheduling the 
remote fetch task.
+        // It will be sent back to the client in DelayedRemoteFetch along with 
other successful remote fetch results.
+        remoteFetchResult.complete(new RemoteLogReadResult(Optional.empty, 
Optional.of(e)))
     }
 
+    (remoteFetchTask, remoteFetchResult)
+  }
+
+  /**
+   * Process all remote fetches by creating async read tasks and handling them 
in DelayedRemoteFetch collectively.
+   */
+  private def processRemoteFetches(remoteFetchInfos: 
util.HashMap[TopicIdPartition, RemoteStorageFetchInfo],
+                                   params: FetchParams,
+                                   responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit,
+                                   logReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
+                                   remoteFetchPartitionStatus: 
Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
+    val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
+    val remoteFetchResults = new util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]
+    
+    remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
+      val (task, result) = processRemoteFetch(remoteFetchInfo)
+      remoteFetchTasks.put(topicIdPartition, task)
+      remoteFetchResults.put(topicIdPartition, result)
+    }
+    
     val remoteFetchMaxWaitMs = 
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
-    val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
-      fetchPartitionStatus, params, logReadResults, this, responseCallback)
-    delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, 
util.Collections.singletonList(key))
-    None
-  }
-
-  private def buildPartitionToFetchPartitionData(logReadResults: 
Seq[(TopicIdPartition, LogReadResult)],
-                                                 remoteFetchTopicPartition: 
TopicPartition,
-                                                 error: LogReadResult): 
Seq[(TopicIdPartition, FetchPartitionData)] = {
-    logReadResults.map { case (tp, result) =>
-      val fetchPartitionData = {
-        if (tp.topicPartition().equals(remoteFetchTopicPartition))
-          error
-        else
-          result
-      }.toFetchPartitionData(false)
+    val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, 
remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
+      remoteFetchPartitionStatus, params, logReadResults, this, 
responseCallback)
 
-      tp -> fetchPartitionData
-    }
+    // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
+    val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) => 
new TopicPartitionOperationKey(tp) }.toList
+    delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, 
delayedFetchKeys.asJava)
   }
 
   /**
@@ -1639,8 +1650,8 @@ class ReplicaManager(val config: KafkaConfig,
     var bytesReadable: Long = 0
     var errorReadingData = false
 
-    // The 1st topic-partition that has to be read from remote storage
-    var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()
+    // topic-partitions that have to be read from remote storage
+    val remoteFetchInfos = new util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
 
     var hasDivergingEpoch = false
     var hasPreferredReadReplica = false
@@ -1651,8 +1662,8 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
       if (logReadResult.error != Errors.NONE)
         errorReadingData = true
-      if (!remoteFetchInfo.isPresent && 
logReadResult.info.delayedRemoteStorageFetch.isPresent) {
-        remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
+      if (logReadResult.info.delayedRemoteStorageFetch.isPresent) {
+        remoteFetchInfos.put(topicIdPartition, 
logReadResult.info.delayedRemoteStorageFetch.get())
       }
       if (logReadResult.divergingEpoch.isPresent)
         hasDivergingEpoch = true
@@ -1669,7 +1680,7 @@ class ReplicaManager(val config: KafkaConfig,
     //                        4) some error happens while reading data
     //                        5) we found a diverging epoch
     //                        6) has a preferred read replica
-    if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || 
fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
+    if (remoteFetchInfos.isEmpty && (params.maxWaitMs <= 0 || 
fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
       hasDivergingEpoch || hasPreferredReadReplica)) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
         val isReassignmentFetch = params.isFromFollower && 
isAddingReplica(tp.topicPartition, params.replicaId)
@@ -1686,15 +1697,8 @@ class ReplicaManager(val config: KafkaConfig,
         })
       }
 
-      if (remoteFetchInfo.isPresent) {
-        val maybeLogReadResultWithError = 
processRemoteFetch(remoteFetchInfo.get(), params, responseCallback, 
logReadResults, fetchPartitionStatus)
-        if (maybeLogReadResultWithError.isDefined) {
-          // If there is an error in scheduling the remote fetch task, return 
what we currently have
-          // (the data read from local log segment for the other 
topic-partitions) and an error for the topic-partition
-          // that we couldn't read from remote storage
-          val partitionToFetchPartitionData = 
buildPartitionToFetchPartitionData(logReadResults, 
remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get)
-          responseCallback(partitionToFetchPartitionData)
-        }
+      if (!remoteFetchInfos.isEmpty) {
+        processRemoteFetches(remoteFetchInfos, params, responseCallback, 
logReadResults, fetchPartitionStatus.toSeq)
       } else {
         // If there is not enough data to respond and there is no remote data, 
we will let the fetch request
         // wait for new data.
@@ -1902,9 +1906,9 @@ class ReplicaManager(val config: KafkaConfig,
           )
         } else {
           // For consume fetch requests, create a dummy FetchDataInfo with the 
remote storage fetch information.
-          // For the first topic-partition that needs remote data, we will use 
this information to read the data in another thread.
+          // For the topic-partitions that need remote data, we will use this 
information to read the data in another thread.
           new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false, Optional.empty(),
-            Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, 
minOneMessage, tp.topicPartition(),
+            Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, 
minOneMessage, tp,
               fetchInfo, params.isolation)))
         }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index b65de12182e..23b4b32b0d7 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -16,6 +16,7 @@
  */
 package kafka.server
 
+import com.yammer.metrics.core.Meter
 import kafka.cluster.Partition
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException
 import org.apache.kafka.common.protocol.Errors
@@ -28,9 +29,10 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, 
FetchParams, FetchPa
 import org.apache.kafka.storage.internals.log._
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
-import org.mockito.Mockito.{mock, verify, when}
+import org.mockito.ArgumentMatchers.anyBoolean
+import org.mockito.Mockito.{mock, never, verify, when}
 
-import java.util.{Optional, OptionalLong}
+import java.util.{Collections, Optional, OptionalLong}
 import java.util.concurrent.{CompletableFuture, Future}
 import scala.collection._
 import scala.jdk.CollectionConverters._
@@ -39,6 +41,7 @@ class DelayedRemoteFetchTest {
   private val maxBytes = 1024
   private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
   private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic")
+  private val topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic2")
   private val fetchOffset = 500L
   private val logStartOffset = 0L
   private val currentLeaderEpoch = Optional.of[Integer](10)
@@ -61,14 +64,22 @@ class DelayedRemoteFetchTest {
     }
 
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    future.complete(null)
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
+    future.complete(buildRemoteReadResult(Errors.NONE))
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null)
     val highWatermark = 100
     val leaderLogStartOffset = 10
     val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
 
-    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
+    val delayedRemoteFetch = new DelayedRemoteFetch(
+      java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
+      java.util.Collections.singletonMap(topicIdPartition, future),
+      java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo),
+      replicaManager,
+      callback)
 
     
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
       .thenReturn(mock(classOf[Partition]))
@@ -97,14 +108,23 @@ class DelayedRemoteFetchTest {
     }
 
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    future.complete(null)
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
+    future.complete(buildRemoteReadResult(Errors.NONE))
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null)
     val highWatermark = 100
     val leaderLogStartOffset = 10
     val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
     val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
-    assertThrows(classOf[IllegalStateException], () => new 
DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback))
+
+    assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(
+      java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
+      java.util.Collections.singletonMap(topicIdPartition, future),
+      java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo),
+      replicaManager,
+      callback))
   }
 
   @Test
@@ -123,12 +143,20 @@ class DelayedRemoteFetchTest {
       .thenThrow(new NotLeaderOrFollowerException(s"Replica for 
$topicIdPartition not available"))
 
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null)
 
     val logReadInfo = buildReadResult(Errors.NONE)
 
-    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
+    val delayedRemoteFetch =  new DelayedRemoteFetch(
+      java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
+      java.util.Collections.singletonMap(topicIdPartition, future),
+      java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo),
+      replicaManager,
+      callback)
 
     // delayed remote fetch should still be able to complete
     assertTrue(delayedRemoteFetch.tryComplete())
@@ -152,14 +180,22 @@ class DelayedRemoteFetchTest {
       .thenReturn(mock(classOf[Partition]))
 
     val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    future.complete(null)
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
+    future.complete(buildRemoteReadResult(Errors.NONE))
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition, null, null)
 
     // build a read result with error
     val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
 
-    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, 
remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
+    val delayedRemoteFetch = new DelayedRemoteFetch(
+      java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
+      java.util.Collections.singletonMap(topicIdPartition, future),
+      java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo),
+      replicaManager,
+      callback)
 
     assertTrue(delayedRemoteFetch.tryComplete())
     assertTrue(delayedRemoteFetch.isCompleted)
@@ -170,52 +206,262 @@ class DelayedRemoteFetchTest {
 
   @Test
   def testRequestExpiry(): Unit = {
-    var actualTopicPartition: Option[TopicIdPartition] = None
-    var fetchResultOpt: Option[FetchPartitionData] = None
+    val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
 
-    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
-      assertEquals(1, responses.size)
-      actualTopicPartition = Some(responses.head._1)
-      fetchResultOpt = Some(responses.head._2)
+    def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+      responseSeq.foreach { case (tp, data) =>
+        responses.put(tp, data)
+      }
+    }
+
+    def expiresPerSecValue(): Double = {
+      val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+      val metric = allMetrics.find { case (n, _) => 
n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")
 }
+
+      if (metric.isEmpty)
+        0
+      else
+        metric.get._2.asInstanceOf[Meter].count
     }
 
+    val remoteFetchTaskExpired = mock(classOf[Future[Void]])
+    val remoteFetchTask2 = mock(classOf[Future[Void]])
+    // complete the 2nd task, and keep the 1st one expired
+    when(remoteFetchTask2.isDone).thenReturn(true)
+
+    // Create futures - one completed, one not
+    val future1: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    val future2: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    // Only complete one remote fetch
+    future2.complete(buildRemoteReadResult(Errors.NONE))
+
+    val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, 
null, null)
+    val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, 
null, null)
+
     val highWatermark = 100
     val leaderLogStartOffset = 10
 
-    val remoteFetchTask = mock(classOf[Future[Void]])
-    val future: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
-    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, 
false, topicIdPartition.topicPartition(), null, null)
-    val logReadInfo = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
-
-    val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future, 
fetchInfo, remoteFetchMaxWaitMs,
-      Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition 
-> logReadInfo), replicaManager, callback)
+    val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark, 
leaderLogStartOffset)
+    val logReadInfo2 = buildReadResult(Errors.NONE)
+
+    val fetchStatus1 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchStatus2 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 
100, logStartOffset, maxBytes, currentLeaderEpoch))
+
+    // Set up maps for multiple partitions
+    val remoteFetchTasks = new java.util.HashMap[TopicIdPartition, 
Future[Void]]()
+    val remoteFetchResults = new java.util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]()
+    val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
+
+    remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired)
+    remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2)
+    remoteFetchResults.put(topicIdPartition, future1)
+    remoteFetchResults.put(topicIdPartition2, future2)
+    remoteFetchInfos.put(topicIdPartition, fetchInfo1)
+    remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(
+      remoteFetchTasks,
+      remoteFetchResults,
+      remoteFetchInfos,
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
+      replicaManager,
+      callback)
 
     
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
       .thenReturn(mock(classOf[Partition]))
+    
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
 
     // Verify that the ExpiresPerSec metric is zero before fetching
-    val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
-    assertEquals(0, metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
+    val existingMetricVal = expiresPerSecValue()
+    // Verify the delayedRemoteFetch is not completed yet
+    assertFalse(delayedRemoteFetch.isCompleted)
 
     // Force the delayed remote fetch to expire
     delayedRemoteFetch.run()
 
-    // Check that the task was cancelled and force-completed
-    verify(remoteFetchTask).cancel(false)
+    // Check that the expired task was cancelled and force-completed
+    verify(remoteFetchTaskExpired).cancel(anyBoolean())
+    verify(remoteFetchTask2, never()).cancel(anyBoolean())
     assertTrue(delayedRemoteFetch.isCompleted)
 
     // Check that the ExpiresPerSec metric was incremented
-    assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
+    assertTrue(expiresPerSecValue() > existingMetricVal)
 
-    // Fetch results should still include local read results
-    assertTrue(actualTopicPartition.isDefined)
-    assertEquals(topicIdPartition, actualTopicPartition.get)
-    assertTrue(fetchResultOpt.isDefined)
+    // Fetch results should include 2 results and the expired one should 
return local read results
+    assertEquals(2, responses.size)
+    assertTrue(responses.contains(topicIdPartition))
+    assertTrue(responses.contains(topicIdPartition2))
 
-    val fetchResult = fetchResultOpt.get
-    assertEquals(Errors.NONE, fetchResult.error)
-    assertEquals(highWatermark, fetchResult.highWatermark)
-    assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
+    assertEquals(Errors.NONE, responses(topicIdPartition).error)
+    assertEquals(highWatermark, responses(topicIdPartition).highWatermark)
+    assertEquals(leaderLogStartOffset, 
responses(topicIdPartition).logStartOffset)
+
+    assertEquals(Errors.NONE, responses(topicIdPartition2).error)
+  }
+
+  @Test
+  def testMultiplePartitions(): Unit = {
+    val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
+
+    def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+      responseSeq.foreach { case (tp, data) =>
+        responses.put(tp, data)
+      }
+    }
+
+    // Create futures - one completed, one not
+    val future1: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    val future2: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    // Only complete one remote fetch
+    future1.complete(buildRemoteReadResult(Errors.NONE))
+
+    val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, 
null, null)
+    val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, 
null, null)
+
+    val highWatermark1 = 100
+    val leaderLogStartOffset1 = 10
+    val highWatermark2 = 200
+    val leaderLogStartOffset2 = 20
+
+    val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
+    val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
+
+    val fetchStatus1 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchStatus2 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 
100, logStartOffset, maxBytes, currentLeaderEpoch))
+
+    // Set up maps for multiple partitions
+    val remoteFetchResults = new java.util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]()
+    val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
+
+    remoteFetchResults.put(topicIdPartition, future1)
+    remoteFetchResults.put(topicIdPartition2, future2)
+    remoteFetchInfos.put(topicIdPartition, fetchInfo1)
+    remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(
+      Collections.emptyMap[TopicIdPartition, Future[Void]](),
+      remoteFetchResults,
+      remoteFetchInfos,
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
+      replicaManager,
+      callback)
+
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+    
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+
+    // Should not complete since future2 is not done
+    assertFalse(delayedRemoteFetch.tryComplete())
+    assertFalse(delayedRemoteFetch.isCompleted)
+
+    // Complete future2
+    future2.complete(buildRemoteReadResult(Errors.NONE))
+
+    // Now it should complete
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+
+    // Verify both partitions were processed without error
+    assertEquals(2, responses.size)
+    assertTrue(responses.contains(topicIdPartition))
+    assertTrue(responses.contains(topicIdPartition2))
+
+    assertEquals(Errors.NONE, responses(topicIdPartition).error)
+    assertEquals(highWatermark1, responses(topicIdPartition).highWatermark)
+    assertEquals(leaderLogStartOffset1, 
responses(topicIdPartition).logStartOffset)
+
+    assertEquals(Errors.NONE, responses(topicIdPartition2).error)
+    assertEquals(highWatermark2, responses(topicIdPartition2).highWatermark)
+    assertEquals(leaderLogStartOffset2, 
responses(topicIdPartition2).logStartOffset)
+  }
+
+  @Test
+  def testMultiplePartitionsWithFailedResults(): Unit = {
+    val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
+
+    def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): 
Unit = {
+      responseSeq.foreach { case (tp, data) =>
+        responses.put(tp, data)
+      }
+    }
+
+    // Create futures - one successful, one with error
+    val future1: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+    val future2: CompletableFuture[RemoteLogReadResult] = new 
CompletableFuture[RemoteLogReadResult]()
+
+    // Created 1 successful result and 1 failed result
+    future1.complete(buildRemoteReadResult(Errors.NONE))
+    future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR))
+
+    val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, 
null, null)
+    val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, 
null, null)
+
+    val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
+    val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
+
+    val fetchStatus1 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, 
logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchStatus2 = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
+      fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 
100, logStartOffset, maxBytes, currentLeaderEpoch))
+
+    // Set up maps for multiple partitions
+    val remoteFetchResults = new java.util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]()
+    val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, 
RemoteStorageFetchInfo]()
+
+    remoteFetchResults.put(topicIdPartition, future1)
+    remoteFetchResults.put(topicIdPartition2, future2)
+    remoteFetchInfos.put(topicIdPartition, fetchInfo1)
+    remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(
+      Collections.emptyMap[TopicIdPartition, Future[Void]](),
+      remoteFetchResults,
+      remoteFetchInfos,
+      remoteFetchMaxWaitMs,
+      Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
+      fetchParams,
+      Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
+      replicaManager,
+      callback)
+
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+    
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+
+    // Verify both partitions were processed
+    assertEquals(2, responses.size)
+    assertTrue(responses.contains(topicIdPartition))
+    assertTrue(responses.contains(topicIdPartition2))
+
+    // First partition should be successful
+    val fetchResult1 = responses(topicIdPartition)
+    assertEquals(Errors.NONE, fetchResult1.error)
+
+    // Second partition should have an error due to remote fetch failure
+    val fetchResult2 = responses(topicIdPartition2)
+    assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error)
   }
 
   private def buildFetchParams(replicaId: Int,
@@ -235,7 +481,8 @@ class DelayedRemoteFetchTest {
                               highWatermark: Int = 0,
                               leaderLogStartOffset: Int = 0): LogReadResult = {
     new LogReadResult(
-      new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY),
+      new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY, false, Optional.empty(),
+        Optional.of(mock(classOf[RemoteStorageFetchInfo]))),
       Optional.empty(),
       highWatermark,
       leaderLogStartOffset,
@@ -246,4 +493,9 @@ class DelayedRemoteFetchTest {
       if (error != Errors.NONE) Optional.of[Throwable](error.exception) else 
Optional.empty[Throwable]())
   }
 
+  private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = {
+    new RemoteLogReadResult(
+      Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY)),
+      if (error != Errors.NONE) Optional.of[Throwable](error.exception) else 
Optional.empty[Throwable]())
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 761fc49b2ea..843a0d60342 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.server.util.timer.MockTimer
 import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
 import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, 
ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, 
UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, 
ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, 
RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -94,8 +94,8 @@ import java.net.InetAddress
 import java.nio.file.{Files, Paths}
 import java.util
 import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
-import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, 
CountDownLatch, TimeUnit}
-import java.util.function.BiConsumer
+import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap, 
CountDownLatch, Future, TimeUnit}
+import java.util.function.{BiConsumer, Consumer}
 import java.util.stream.IntStream
 import java.util.{Collections, Optional, OptionalLong, Properties}
 import scala.collection.{Map, Seq, mutable}
@@ -3390,7 +3390,7 @@ class ReplicaManagerTest {
       } else {
         
verify(mockRemoteLogManager).asyncRead(remoteStorageFetchInfoArg.capture(), 
any())
         val remoteStorageFetchInfo = remoteStorageFetchInfoArg.getValue
-        assertEquals(tp0, remoteStorageFetchInfo.topicPartition)
+        assertEquals(tp0, 
remoteStorageFetchInfo.topicIdPartition.topicPartition)
         assertEquals(fetchOffset, remoteStorageFetchInfo.fetchInfo.fetchOffset)
         assertEquals(topicId, remoteStorageFetchInfo.fetchInfo.topicId)
         assertEquals(startOffset, 
remoteStorageFetchInfo.fetchInfo.logStartOffset)
@@ -3594,6 +3594,109 @@ class ReplicaManagerTest {
     }
   }
 
+  @Test
+  def testMultipleRemoteFetchesInOneFetchRequest(): Unit = {
+    val replicaId = -1
+    val tp0 = new TopicPartition(topic, 0)
+    val tp1 = new TopicPartition(topic, 1)
+    val tidp0 = new TopicIdPartition(topicId, tp0)
+    val tidp1 = new TopicIdPartition(topicId, tp1)
+
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
+
+    try {
+      val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+      replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+
+      val leaderEpoch = 0
+      val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 0, 
leaderEpoch = leaderEpoch)
+      val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
+      val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
+      replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
+      replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
+
+      val params = new FetchParams(replicaId, 1, 1000, 10, 100, 
FetchIsolation.LOG_END, Optional.empty)
+      val fetchOffsetTp0 = 1
+      val fetchOffsetTp1 = 2
+
+      val responseSeq = new AtomicReference[Seq[(TopicIdPartition, 
FetchPartitionData)]]()
+      val responseLatch = new CountDownLatch(1)
+
+      def fetchCallback(responseStatus: Seq[(TopicIdPartition, 
FetchPartitionData)]): Unit = {
+        responseSeq.set(responseStatus)
+        responseLatch.countDown()
+      }
+
+      val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new 
util.HashSet[Consumer[RemoteLogReadResult]]()
+      when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => {
+        callbacks.add(ans.getArgument(1, 
classOf[Consumer[RemoteLogReadResult]]))
+        mock(classOf[Future[Void]])
+      })
+
+      // Start the fetch request for both partitions - this should trigger 
remote fetches since
+      // the default mocked log behavior throws OffsetOutOfRangeException
+      replicaManager.fetchMessages(params, Seq(
+        tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
+        tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset, 
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))
+      ), UNBOUNDED_QUOTA, fetchCallback)
+
+      // Verify that exactly two asyncRead calls were made (one for each 
partition)
+      val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] = 
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
+      verify(mockRemoteLogManager, 
times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any())
+
+      // Verify that remote fetch operations were properly set up for both 
partitions
+      assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2, 
"DelayedRemoteFetch purgatory should have operations")
+
+      // Verify both partitions were captured in the remote fetch requests
+      val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
+      assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage 
fetch info calls")
+
+      val capturedTopicPartitions = 
capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
+      assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " + 
tp0)
+      assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " + 
tp1)
+
+      // Verify the fetch info details are correct for both partitions
+      capturedFetchInfos.foreach { fetchInfo =>
+        assertEquals(topicId, fetchInfo.fetchInfo.topicId)
+        assertEquals(startOffset, fetchInfo.fetchInfo.logStartOffset)
+        assertEquals(leaderEpoch, fetchInfo.fetchInfo.currentLeaderEpoch.get())
+        if (fetchInfo.topicIdPartition.topicPartition == tp0) {
+          assertEquals(fetchOffsetTp0, fetchInfo.fetchInfo.fetchOffset)
+        } else {
+          assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset)
+        }
+      }
+
+      // Complete the 2 asyncRead tasks
+      callbacks.forEach(callback => 
callback.accept(buildRemoteReadResult(Errors.NONE)))
+
+      // Wait for the fetch callback to complete and verify responseSeq content
+      assertTrue(responseLatch.await(5, TimeUnit.SECONDS), "Fetch callback 
should complete")
+
+      val responseData = responseSeq.get()
+      assertNotNull(responseData, "Response sequence should not be null")
+      assertEquals(2, responseData.size, "Response should contain data for 
both partitions")
+
+      // Verify that response contains both tidp0 and tidp1 and have no errors
+      val responseTopicIdPartitions = responseData.map(_._1).toSet
+      assertTrue(responseTopicIdPartitions.contains(tidp0), "Response should 
contain " + tidp0)
+      assertTrue(responseTopicIdPartitions.contains(tidp1), "Response should 
contain " + tidp1)
+      responseData.foreach { case (_, fetchPartitionData) =>
+        assertEquals(Errors.NONE, fetchPartitionData.error)
+      }
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = {
+    new RemoteLogReadResult(
+      Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, 
MemoryRecords.EMPTY)),
+      if (error != Errors.NONE) Optional.of[Throwable](error.exception) else 
Optional.empty[Throwable]())
+  }
+
   private def yammerMetricValue(name: String): Any = {
     val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
     val (_, metric) = allMetrics.find { case (n, _) => 
n.getMBeanName.endsWith(name) }
diff --git a/docs/ops.html b/docs/ops.html
index 09748a9686d..0b1e8fa6880 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -4358,7 +4358,6 @@ $ bin/kafka-topics.sh --create --topic tieredTopic 
--bootstrap-server localhost:
   <li>Disabling tiered storage on all topics where it is enabled is required 
before disabling tiered storage at the broker level</li>
   <li>Admin actions related to tiered storage feature are only supported on 
clients from version 3.0 onwards</li>
   <li>No support for log segments missing producer snapshot file. It can 
happen when topic is created before v2.8.0.</li>
-  <li>Only one partition per fetch request is served from the remote store. 
This limitation can become a bottleneck for consumer client throughput - 
consider configuring <code>max.partition.fetch.bytes</code> appropriately.</li>
 </ul>
 
 <p>For more information, please check <a 
href="https://cwiki.apache.org/confluence/x/9xDOEg";>Kafka Tiered Storage GA 
Release Notes</a>.
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
 
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
index bfb6d97f6d9..157fdcb885d 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
@@ -199,7 +199,7 @@ public class DelayedOperationPurgatory<T extends 
DelayedOperation> {
     }
 
     /**
-     * Return the total size of watch lists the purgatory. Since an operation 
may be watched
+     * Return the total size of watch lists in the purgatory. Since an 
operation may be watched
      * on multiple lists, and some of its watched entries may still be in the 
watch lists
      * even when it has been completed, this number may be larger than the 
number of real operations watched
      */
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 0970f6f0a34..852e1d4d1fb 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -1659,7 +1659,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
 
     public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
         int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
-        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        TopicPartition tp = 
remoteStorageFetchInfo.topicIdPartition.topicPartition();
         FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
 
         boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
@@ -1715,6 +1715,8 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
             //  - there is no minimum-one-message constraint and
             //  - the first batch size is more than maximum bytes that can be 
sent and
             if (!remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes) {
+                LOGGER.debug("Returning empty record for offset {} in 
partition {} because the first batch size {} " +
+                        "is greater than max fetch bytes {}", offset, tp, 
firstBatchSize, maxBytes);
                 return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
             }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 586816dae29..0b1b049d766 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -192,8 +192,8 @@ public final class RemoteLogManagerConfig {
     public static final int 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1;
 
     public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP = 
"remote.fetch.max.wait.ms";
-    public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum 
amount of time the server will wait before answering the remote fetch request. 
" +
-        "Note that the broker currently only fetches one partition per fetch 
request from the remote store. (KAFKA-14915)";
+    public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum 
amount of time the server will wait before answering the fetch request 
containing remote fetch partitions. " +
+        "It's important to be aware that the request will only be responded 
after all remote partitions have been successfully fetched, have failed, or 
this timeout is exceeded.";
     public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
 
     public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP = 
"remote.list.offsets.request.timeout.ms";
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java
index a23ee7207ae..898ff52760c 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java
@@ -51,7 +51,7 @@ public class RemoteLogReader implements Callable<Void> {
         this.rlm = rlm;
         this.brokerTopicStats = brokerTopicStats;
         this.callback = callback;
-        
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark();
+        
this.brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchRequestRate().mark();
         this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
         this.quotaManager = quotaManager;
         this.remoteReadTimer = remoteReadTimer;
@@ -61,20 +61,20 @@ public class RemoteLogReader implements Callable<Void> {
     public Void call() {
         RemoteLogReadResult result;
         try {
-            LOGGER.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
+            LOGGER.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicIdPartition);
             FetchDataInfo fetchDataInfo = remoteReadTimer.time(() -> 
rlm.read(fetchInfo));
-            
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
+            
brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
             
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
             result = new RemoteLogReadResult(Optional.of(fetchDataInfo), 
Optional.empty());
         } catch (OffsetOutOfRangeException e) {
             result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
         } catch (Exception e) {
-            
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteFetchRequestRate().mark();
+            
brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).failedRemoteFetchRequestRate().mark();
             
brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark();
-            LOGGER.error("Error occurred while reading the remote data for 
{}", fetchInfo.topicPartition, e);
+            LOGGER.error("Error occurred while reading the remote data for 
{}", fetchInfo.topicIdPartition, e);
             result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
         }
-        LOGGER.debug("Finished reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
+        LOGGER.debug("Finished reading records from remote storage for topic 
partition {}", fetchInfo.topicIdPartition);
         quotaManager.record(result.fetchDataInfo.map(fetchDataInfo -> 
fetchDataInfo.records.sizeInBytes()).orElse(0));
         callback.accept(result);
         return null;
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
index c110e750d7c..e02fea8c0ef 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.storage.internals.log;
 
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 
@@ -24,15 +24,15 @@ public class RemoteStorageFetchInfo {
 
     public final int fetchMaxBytes;
     public final boolean minOneMessage;
-    public final TopicPartition topicPartition;
+    public final TopicIdPartition topicIdPartition;
     public final FetchRequest.PartitionData fetchInfo;
     public final FetchIsolation fetchIsolation;
 
-    public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, 
TopicPartition topicPartition,
+    public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, 
TopicIdPartition topicIdPartition,
                                   FetchRequest.PartitionData fetchInfo, 
FetchIsolation fetchIsolation) {
         this.fetchMaxBytes = fetchMaxBytes;
         this.minOneMessage = minOneMessage;
-        this.topicPartition = topicPartition;
+        this.topicIdPartition = topicIdPartition;
         this.fetchInfo = fetchInfo;
         this.fetchIsolation = fetchIsolation;
     }
@@ -42,7 +42,7 @@ public class RemoteStorageFetchInfo {
         return "RemoteStorageFetchInfo{" +
                 "fetchMaxBytes=" + fetchMaxBytes +
                 ", minOneMessage=" + minOneMessage +
-                ", topicPartition=" + topicPartition +
+                ", topicIdPartition=" + topicIdPartition +
                 ", fetchInfo=" + fetchInfo +
                 ", fetchIsolation=" + fetchIsolation +
                 '}';
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index e82536d7233..182fda9abb9 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -205,6 +205,7 @@ public class RemoteLogManagerTest {
     private final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
     private final Map<String, Uuid> topicIds = new HashMap<>();
     private final TopicPartition tp = new TopicPartition("TestTopic", 5);
+    private final TopicIdPartition tpId = new 
TopicIdPartition(Uuid.randomUuid(), tp);
     private final EpochEntry epochEntry0 = new EpochEntry(0, 0);
     private final EpochEntry epochEntry1 = new EpochEntry(1, 100);
     private final EpochEntry epochEntry2 = new EpochEntry(2, 200);
@@ -3100,7 +3101,7 @@ public class RemoteLogManagerTest {
         );
 
         RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
-                0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED
+                0, false, tpId, partitionData, FetchIsolation.TXN_COMMITTED
         );
 
         try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@@ -3180,7 +3181,7 @@ public class RemoteLogManagerTest {
         );
 
         RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
-                0, minOneMessage, tp, partitionData, 
FetchIsolation.HIGH_WATERMARK
+                0, minOneMessage, tpId, partitionData, 
FetchIsolation.HIGH_WATERMARK
         );
 
         try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@@ -3266,7 +3267,7 @@ public class RemoteLogManagerTest {
         when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
         doNothing().when(firstBatch).writeTo(capture.capture());
         RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
-                0, true, tp, partitionData, FetchIsolation.HIGH_WATERMARK
+                0, true, tpId, partitionData, FetchIsolation.HIGH_WATERMARK
         );
 
 
@@ -3651,7 +3652,7 @@ public class RemoteLogManagerTest {
         FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
                 Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
         RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(
-                1048576, true, leaderTopicIdPartition.topicPartition(),
+                1048576, true, leaderTopicIdPartition,
                 partitionData, FetchIsolation.HIGH_WATERMARK);
         FetchDataInfo fetchDataInfo = 
remoteLogManager.read(remoteStorageFetchInfo);
         // firstBatch baseOffset may not be equal to the fetchOffset
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
index 6c7026a52d5..efb6bb76e05 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
@@ -18,7 +18,8 @@ package org.apache.kafka.server.log.remote.storage;
 
 import kafka.utils.TestUtils;
 
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
@@ -69,7 +70,7 @@ public class RemoteLogReaderTest {
         
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
 
         Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
-        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicIdPartition(Uuid.randomUuid(), 0, 
TOPIC), null, null);
         RemoteLogReader remoteLogReader =
                 new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, 
brokerTopicStats, mockQuotaManager, timer);
         remoteLogReader.call();
@@ -102,7 +103,7 @@ public class RemoteLogReaderTest {
         when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new 
RuntimeException("error"));
 
         Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
-        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(0, false, new TopicIdPartition(Uuid.randomUuid(), 0, 
TOPIC), null, null);
         RemoteLogReader remoteLogReader =
                 new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, 
brokerTopicStats, mockQuotaManager, timer);
         remoteLogReader.call();

Reply via email to