This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e1ff3876054 KAFKA-14915: Allow reading from remote storage for
multiple partitions in one fetchRequest (#20045)
e1ff3876054 is described below
commit e1ff38760545b472267aaa60a3422bd8cbf74b6a
Author: Luke Chen <[email protected]>
AuthorDate: Mon Jul 14 22:12:08 2025 +0800
KAFKA-14915: Allow reading from remote storage for multiple partitions in
one fetchRequest (#20045)
This PR enables reading remote storage for multiple partitions in one
fetchRequest. The main changes are:
1. In `DelayedRemoteFetch`, we accept multiple remoteFetchTasks and
other metadata now.
2. In `DelayedRemoteFetch`, we'll wait until all remoteFetch done,
either succeeded or failed.
3. In `ReplicaManager#fetchMessage`, we'll create one
`DelayedRemoteFetch` and pass multiple remoteFetch metadata to it, and
watch all of them.
4. Added tests
Reviewers: Kamal Chandraprakash<[email protected]>, Federico
Valeri <[email protected]>, Satish Duggana <[email protected]>
---
.../kafka/clients/consumer/ConsumerConfig.java | 8 +-
.../scala/kafka/server/DelayedRemoteFetch.scala | 24 +-
.../main/scala/kafka/server/ReplicaManager.scala | 94 +++---
.../kafka/server/DelayedRemoteFetchTest.scala | 340 ++++++++++++++++++---
.../unit/kafka/server/ReplicaManagerTest.scala | 111 ++++++-
docs/ops.html | 1 -
.../purgatory/DelayedOperationPurgatory.java | 2 +-
.../log/remote/storage/RemoteLogManager.java | 4 +-
.../log/remote/storage/RemoteLogManagerConfig.java | 4 +-
.../server/log/remote/storage/RemoteLogReader.java | 12 +-
.../internals/log/RemoteStorageFetchInfo.java | 10 +-
.../log/remote/storage/RemoteLogManagerTest.java | 9 +-
.../log/remote/storage/RemoteLogReaderTest.java | 7 +-
13 files changed, 496 insertions(+), 130 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7700090ccef..3fcdf20953c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -197,9 +197,7 @@ public class ConsumerConfig extends AbstractConfig {
"this value, the record batch will still be returned to ensure
that the consumer can make progress. As such, this is not a absolute maximum. "
+
"The maximum record batch size accepted by the broker is defined
via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). A fetch request
consists of many partitions, and there is another setting that controls how
much " +
- "data is returned for each partition in a fetch request - see
<code>max.partition.fetch.bytes</code>. Note that there is a current limitation
when " +
- "performing remote reads from tiered storage (KIP-405) - only one
partition out of the fetch request is fetched from the remote store
(KAFKA-14915). " +
- "Note also that the consumer performs multiple fetches in
parallel.";
+ "data is returned for each partition in a fetch request - see
<code>max.partition.fetch.bytes</code>. Note that the consumer performs
multiple fetches in parallel.";
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;
/**
@@ -224,9 +222,7 @@ public class ConsumerConfig extends AbstractConfig {
"partition of the fetch is larger than this limit, the " +
"batch will still be returned to ensure that the consumer can make
progress. The maximum record batch size " +
"accepted by the broker is defined via
<code>message.max.bytes</code> (broker config) or " +
- "<code>max.message.bytes</code> (topic config). See " +
FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size. " +
- "Consider increasing <code>max.partition.fetch.bytes</code>
especially in the cases of remote storage reads (KIP-405), because currently
only " +
- "one partition per fetch request is served from the remote store
(KAFKA-14915).";
+ "<code>max.message.bytes</code> (topic config). See " +
FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size.";
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 *
1024;
/** <code>send.buffer.bytes</code> */
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 317c8dd4ac9..cb14a14b3e9 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata,
RemoteLogReadResult, RemoteStorageFetchInfo}
+import java.util
import java.util.concurrent.{CompletableFuture, Future, TimeUnit}
import java.util.{Optional, OptionalInt, OptionalLong}
import scala.collection._
@@ -36,9 +37,9 @@ import scala.collection._
* A remote fetch operation that can be created by the replica manager and
watched
* in the remote fetch operation purgatory
*/
-class DelayedRemoteFetch(remoteFetchTask: Future[Void],
- remoteFetchResult:
CompletableFuture[RemoteLogReadResult],
- remoteFetchInfo: RemoteStorageFetchInfo,
+class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition,
Future[Void]],
+ remoteFetchResults: util.Map[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]],
+ remoteFetchInfos: util.Map[TopicIdPartition,
RemoteStorageFetchInfo],
remoteFetchMaxWaitMs: Long,
fetchPartitionStatus: Seq[(TopicIdPartition,
FetchPartitionStatus)],
fetchParams: FetchParams,
@@ -56,7 +57,7 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
*
* Case a: This broker is no longer the leader of the partition it tries to
fetch
* Case b: This broker does not know the partition it tries to fetch
- * Case c: The remote storage read request completed (succeeded or failed)
+ * Case c: All the remote storage read request completed (succeeded or
failed)
* Case d: The partition is in an offline log directory on this broker
*
* Upon completion, should return whatever data is available for each valid
partition
@@ -81,7 +82,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
return forceComplete()
}
}
- if (remoteFetchResult.isDone) // Case c
+ // Case c
+ if (remoteFetchResults.values().stream().allMatch(taskResult =>
taskResult.isDone))
forceComplete()
else
false
@@ -90,8 +92,13 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
override def onExpiration(): Unit = {
// cancel the remote storage read task, if it has not been executed yet and
// avoid interrupting the task if it is already running as it may force
closing opened/cached resources as transaction index.
- val cancelled = remoteFetchTask.cancel(false)
- if (!cancelled) debug(s"Remote fetch task for RemoteStorageFetchInfo:
$remoteFetchInfo could not be cancelled and its isDone value is
${remoteFetchTask.isDone}")
+ remoteFetchTasks.forEach { (topicIdPartition, task) =>
+ if (task != null && !task.isDone) {
+ if (!task.cancel(false)) {
+ debug(s"Remote fetch task for remoteFetchInfo:
${remoteFetchInfos.get(topicIdPartition)} could not be cancelled.")
+ }
+ }
+ }
DelayedRemoteFetchMetrics.expiredRequestMeter.mark()
}
@@ -101,7 +108,8 @@ class DelayedRemoteFetch(remoteFetchTask: Future[Void],
*/
override def onComplete(): Unit = {
val fetchPartitionData = localReadResults.map { case (tp, result) =>
- if (tp.topicPartition().equals(remoteFetchInfo.topicPartition)
+ val remoteFetchResult = remoteFetchResults.get(tp)
+ if (remoteFetchInfos.containsKey(tp)
&& remoteFetchResult.isDone
&& result.error == Errors.NONE
&& result.info.delayedRemoteStorageFetch.isPresent) {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e70a4726216..448ec1cf264 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1579,15 +1579,18 @@ class ReplicaManager(val config: KafkaConfig,
}
/**
- * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo
could not be scheduled successfully
- * else returns [[None]].
+ * Initiates an asynchronous remote storage fetch operation for the given
remote fetch information.
+ *
+ * This method schedules a remote fetch task with the remote log manager and
sets up the necessary
+ * completion handling for the operation. The remote fetch result will be
used to populate the
+ * delayed remote fetch purgatory when completed.
+ *
+ * @param remoteFetchInfo The remote storage fetch information
+ *
+ * @return A tuple containing the remote fetch task and the remote fetch
result
*/
- private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo,
- params: FetchParams,
- responseCallback: Seq[(TopicIdPartition,
FetchPartitionData)] => Unit,
- logReadResults: Seq[(TopicIdPartition,
LogReadResult)],
- fetchPartitionStatus: Seq[(TopicIdPartition,
FetchPartitionStatus)]): Option[LogReadResult] = {
- val key = new
TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(),
remoteFetchInfo.topicPartition.partition())
+ private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo):
(Future[Void], CompletableFuture[RemoteLogReadResult]) = {
+ val key = new TopicPartitionOperationKey(remoteFetchInfo.topicIdPartition)
val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
var remoteFetchTask: Future[Void] = null
try {
@@ -1597,31 +1600,39 @@ class ReplicaManager(val config: KafkaConfig,
})
} catch {
case e: RejectedExecutionException =>
- // Return the error if any in scheduling the remote fetch task
- warn("Unable to fetch data from remote storage", e)
- return Some(createLogReadResult(e))
+ warn(s"Unable to fetch data from remote storage for remoteFetchInfo:
$remoteFetchInfo", e)
+ // Store the error in RemoteLogReadResult if any in scheduling the
remote fetch task.
+ // It will be sent back to the client in DelayedRemoteFetch along with
other successful remote fetch results.
+ remoteFetchResult.complete(new RemoteLogReadResult(Optional.empty,
Optional.of(e)))
}
+ (remoteFetchTask, remoteFetchResult)
+ }
+
+ /**
+ * Process all remote fetches by creating async read tasks and handling them
in DelayedRemoteFetch collectively.
+ */
+ private def processRemoteFetches(remoteFetchInfos:
util.HashMap[TopicIdPartition, RemoteStorageFetchInfo],
+ params: FetchParams,
+ responseCallback: Seq[(TopicIdPartition,
FetchPartitionData)] => Unit,
+ logReadResults: Seq[(TopicIdPartition,
LogReadResult)],
+ remoteFetchPartitionStatus:
Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
+ val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
+ val remoteFetchResults = new util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]
+
+ remoteFetchInfos.forEach { (topicIdPartition, remoteFetchInfo) =>
+ val (task, result) = processRemoteFetch(remoteFetchInfo)
+ remoteFetchTasks.put(topicIdPartition, task)
+ remoteFetchResults.put(topicIdPartition, result)
+ }
+
val remoteFetchMaxWaitMs =
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
- val remoteFetch = new DelayedRemoteFetch(remoteFetchTask,
remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
- fetchPartitionStatus, params, logReadResults, this, responseCallback)
- delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch,
util.Collections.singletonList(key))
- None
- }
-
- private def buildPartitionToFetchPartitionData(logReadResults:
Seq[(TopicIdPartition, LogReadResult)],
- remoteFetchTopicPartition:
TopicPartition,
- error: LogReadResult):
Seq[(TopicIdPartition, FetchPartitionData)] = {
- logReadResults.map { case (tp, result) =>
- val fetchPartitionData = {
- if (tp.topicPartition().equals(remoteFetchTopicPartition))
- error
- else
- result
- }.toFetchPartitionData(false)
+ val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks,
remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
+ remoteFetchPartitionStatus, params, logReadResults, this,
responseCallback)
- tp -> fetchPartitionData
- }
+ // create a list of (topic, partition) pairs to use as keys for this
delayed fetch operation
+ val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) =>
new TopicPartitionOperationKey(tp) }.toList
+ delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch,
delayedFetchKeys.asJava)
}
/**
@@ -1639,8 +1650,8 @@ class ReplicaManager(val config: KafkaConfig,
var bytesReadable: Long = 0
var errorReadingData = false
- // The 1st topic-partition that has to be read from remote storage
- var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()
+ // topic-partitions that have to be read from remote storage
+ val remoteFetchInfos = new util.HashMap[TopicIdPartition,
RemoteStorageFetchInfo]()
var hasDivergingEpoch = false
var hasPreferredReadReplica = false
@@ -1651,8 +1662,8 @@ class ReplicaManager(val config: KafkaConfig,
brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
if (logReadResult.error != Errors.NONE)
errorReadingData = true
- if (!remoteFetchInfo.isPresent &&
logReadResult.info.delayedRemoteStorageFetch.isPresent) {
- remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
+ if (logReadResult.info.delayedRemoteStorageFetch.isPresent) {
+ remoteFetchInfos.put(topicIdPartition,
logReadResult.info.delayedRemoteStorageFetch.get())
}
if (logReadResult.divergingEpoch.isPresent)
hasDivergingEpoch = true
@@ -1669,7 +1680,7 @@ class ReplicaManager(val config: KafkaConfig,
// 4) some error happens while reading data
// 5) we found a diverging epoch
// 6) has a preferred read replica
- if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 ||
fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
+ if (remoteFetchInfos.isEmpty && (params.maxWaitMs <= 0 ||
fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
hasDivergingEpoch || hasPreferredReadReplica)) {
val fetchPartitionData = logReadResults.map { case (tp, result) =>
val isReassignmentFetch = params.isFromFollower &&
isAddingReplica(tp.topicPartition, params.replicaId)
@@ -1686,15 +1697,8 @@ class ReplicaManager(val config: KafkaConfig,
})
}
- if (remoteFetchInfo.isPresent) {
- val maybeLogReadResultWithError =
processRemoteFetch(remoteFetchInfo.get(), params, responseCallback,
logReadResults, fetchPartitionStatus)
- if (maybeLogReadResultWithError.isDefined) {
- // If there is an error in scheduling the remote fetch task, return
what we currently have
- // (the data read from local log segment for the other
topic-partitions) and an error for the topic-partition
- // that we couldn't read from remote storage
- val partitionToFetchPartitionData =
buildPartitionToFetchPartitionData(logReadResults,
remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get)
- responseCallback(partitionToFetchPartitionData)
- }
+ if (!remoteFetchInfos.isEmpty) {
+ processRemoteFetches(remoteFetchInfos, params, responseCallback,
logReadResults, fetchPartitionStatus.toSeq)
} else {
// If there is not enough data to respond and there is no remote data,
we will let the fetch request
// wait for new data.
@@ -1902,9 +1906,9 @@ class ReplicaManager(val config: KafkaConfig,
)
} else {
// For consume fetch requests, create a dummy FetchDataInfo with the
remote storage fetch information.
- // For the first topic-partition that needs remote data, we will use
this information to read the data in another thread.
+ // For the topic-partitions that need remote data, we will use this
information to read the data in another thread.
new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY, false, Optional.empty(),
- Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes,
minOneMessage, tp.topicPartition(),
+ Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes,
minOneMessage, tp,
fetchInfo, params.isolation)))
}
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index b65de12182e..23b4b32b0d7 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -16,6 +16,7 @@
*/
package kafka.server
+import com.yammer.metrics.core.Meter
import kafka.cluster.Partition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.protocol.Errors
@@ -28,9 +29,10 @@ import org.apache.kafka.server.storage.log.{FetchIsolation,
FetchParams, FetchPa
import org.apache.kafka.storage.internals.log._
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
-import org.mockito.Mockito.{mock, verify, when}
+import org.mockito.ArgumentMatchers.anyBoolean
+import org.mockito.Mockito.{mock, never, verify, when}
-import java.util.{Optional, OptionalLong}
+import java.util.{Collections, Optional, OptionalLong}
import java.util.concurrent.{CompletableFuture, Future}
import scala.collection._
import scala.jdk.CollectionConverters._
@@ -39,6 +41,7 @@ class DelayedRemoteFetchTest {
private val maxBytes = 1024
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0,
"topic")
+ private val topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0,
"topic2")
private val fetchOffset = 500L
private val logStartOffset = 0L
private val currentLeaderEpoch = Optional.of[Integer](10)
@@ -61,14 +64,22 @@ class DelayedRemoteFetchTest {
}
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- future.complete(null)
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
+ future.complete(buildRemoteReadResult(Errors.NONE))
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null)
val highWatermark = 100
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
- val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo,
remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition
-> logReadInfo), replicaManager, callback)
+ val delayedRemoteFetch = new DelayedRemoteFetch(
+ java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
+ java.util.Collections.singletonMap(topicIdPartition, future),
+ java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
+ remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus),
+ fetchParams,
+ Seq(topicIdPartition -> logReadInfo),
+ replicaManager,
+ callback)
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
@@ -97,14 +108,23 @@ class DelayedRemoteFetchTest {
}
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- future.complete(null)
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
+ future.complete(buildRemoteReadResult(Errors.NONE))
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null)
val highWatermark = 100
val leaderLogStartOffset = 10
val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500)
- assertThrows(classOf[IllegalStateException], () => new
DelayedRemoteFetch(null, future, fetchInfo, remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition
-> logReadInfo), replicaManager, callback))
+
+ assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch(
+ java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
+ java.util.Collections.singletonMap(topicIdPartition, future),
+ java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
+ remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus),
+ fetchParams,
+ Seq(topicIdPartition -> logReadInfo),
+ replicaManager,
+ callback))
}
@Test
@@ -123,12 +143,20 @@ class DelayedRemoteFetchTest {
.thenThrow(new NotLeaderOrFollowerException(s"Replica for
$topicIdPartition not available"))
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null)
val logReadInfo = buildReadResult(Errors.NONE)
- val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo,
remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition
-> logReadInfo), replicaManager, callback)
+ val delayedRemoteFetch = new DelayedRemoteFetch(
+ java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
+ java.util.Collections.singletonMap(topicIdPartition, future),
+ java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
+ remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus),
+ fetchParams,
+ Seq(topicIdPartition -> logReadInfo),
+ replicaManager,
+ callback)
// delayed remote fetch should still be able to complete
assertTrue(delayedRemoteFetch.tryComplete())
@@ -152,14 +180,22 @@ class DelayedRemoteFetchTest {
.thenReturn(mock(classOf[Partition]))
val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- future.complete(null)
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
+ future.complete(buildRemoteReadResult(Errors.NONE))
+ val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition, null, null)
// build a read result with error
val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
- val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo,
remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition
-> logReadInfo), replicaManager, callback)
+ val delayedRemoteFetch = new DelayedRemoteFetch(
+ java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](),
+ java.util.Collections.singletonMap(topicIdPartition, future),
+ java.util.Collections.singletonMap(topicIdPartition, fetchInfo),
+ remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus),
+ fetchParams,
+ Seq(topicIdPartition -> logReadInfo),
+ replicaManager,
+ callback)
assertTrue(delayedRemoteFetch.tryComplete())
assertTrue(delayedRemoteFetch.isCompleted)
@@ -170,52 +206,262 @@ class DelayedRemoteFetchTest {
@Test
def testRequestExpiry(): Unit = {
- var actualTopicPartition: Option[TopicIdPartition] = None
- var fetchResultOpt: Option[FetchPartitionData] = None
+ val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
- def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit
= {
- assertEquals(1, responses.size)
- actualTopicPartition = Some(responses.head._1)
- fetchResultOpt = Some(responses.head._2)
+ def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]):
Unit = {
+ responseSeq.foreach { case (tp, data) =>
+ responses.put(tp, data)
+ }
+ }
+
+ def expiresPerSecValue(): Double = {
+ val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
+ val metric = allMetrics.find { case (n, _) =>
n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")
}
+
+ if (metric.isEmpty)
+ 0
+ else
+ metric.get._2.asInstanceOf[Meter].count
}
+ val remoteFetchTaskExpired = mock(classOf[Future[Void]])
+ val remoteFetchTask2 = mock(classOf[Future[Void]])
+ // complete the 2nd task, and keep the 1st one expired
+ when(remoteFetchTask2.isDone).thenReturn(true)
+
+ // Create futures - one completed, one not
+ val future1: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ val future2: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ // Only complete one remote fetch
+ future2.complete(buildRemoteReadResult(Errors.NONE))
+
+ val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition,
null, null)
+ val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2,
null, null)
+
val highWatermark = 100
val leaderLogStartOffset = 10
- val remoteFetchTask = mock(classOf[Future[Void]])
- val future: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
- val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0,
false, topicIdPartition.topicPartition(), null, null)
- val logReadInfo = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
-
- val delayedRemoteFetch = new DelayedRemoteFetch(remoteFetchTask, future,
fetchInfo, remoteFetchMaxWaitMs,
- Seq(topicIdPartition -> fetchStatus), fetchParams, Seq(topicIdPartition
-> logReadInfo), replicaManager, callback)
+ val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark,
leaderLogStartOffset)
+ val logReadInfo2 = buildReadResult(Errors.NONE)
+
+ val fetchStatus1 = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+ fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchStatus2 = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
+ fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset +
100, logStartOffset, maxBytes, currentLeaderEpoch))
+
+ // Set up maps for multiple partitions
+ val remoteFetchTasks = new java.util.HashMap[TopicIdPartition,
Future[Void]]()
+ val remoteFetchResults = new java.util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]()
+ val remoteFetchInfos = new java.util.HashMap[TopicIdPartition,
RemoteStorageFetchInfo]()
+
+ remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired)
+ remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2)
+ remoteFetchResults.put(topicIdPartition, future1)
+ remoteFetchResults.put(topicIdPartition2, future2)
+ remoteFetchInfos.put(topicIdPartition, fetchInfo1)
+ remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
+
+ val delayedRemoteFetch = new DelayedRemoteFetch(
+ remoteFetchTasks,
+ remoteFetchResults,
+ remoteFetchInfos,
+ remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
+ fetchParams,
+ Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
+ replicaManager,
+ callback)
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
.thenReturn(mock(classOf[Partition]))
+
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
+ .thenReturn(mock(classOf[Partition]))
// Verify that the ExpiresPerSec metric is zero before fetching
- val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
- assertEquals(0, metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
+ val existingMetricVal = expiresPerSecValue()
+ // Verify the delayedRemoteFetch is not completed yet
+ assertFalse(delayedRemoteFetch.isCompleted)
// Force the delayed remote fetch to expire
delayedRemoteFetch.run()
- // Check that the task was cancelled and force-completed
- verify(remoteFetchTask).cancel(false)
+ // Check that the expired task was cancelled and force-completed
+ verify(remoteFetchTaskExpired).cancel(anyBoolean())
+ verify(remoteFetchTask2, never()).cancel(anyBoolean())
assertTrue(delayedRemoteFetch.isCompleted)
// Check that the ExpiresPerSec metric was incremented
- assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec"))
+ assertTrue(expiresPerSecValue() > existingMetricVal)
- // Fetch results should still include local read results
- assertTrue(actualTopicPartition.isDefined)
- assertEquals(topicIdPartition, actualTopicPartition.get)
- assertTrue(fetchResultOpt.isDefined)
+ // Fetch results should include 2 results and the expired one should
return local read results
+ assertEquals(2, responses.size)
+ assertTrue(responses.contains(topicIdPartition))
+ assertTrue(responses.contains(topicIdPartition2))
- val fetchResult = fetchResultOpt.get
- assertEquals(Errors.NONE, fetchResult.error)
- assertEquals(highWatermark, fetchResult.highWatermark)
- assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
+ assertEquals(Errors.NONE, responses(topicIdPartition).error)
+ assertEquals(highWatermark, responses(topicIdPartition).highWatermark)
+ assertEquals(leaderLogStartOffset,
responses(topicIdPartition).logStartOffset)
+
+ assertEquals(Errors.NONE, responses(topicIdPartition2).error)
+ }
+
+ @Test
+ def testMultiplePartitions(): Unit = {
+ val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
+
+ def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]):
Unit = {
+ responseSeq.foreach { case (tp, data) =>
+ responses.put(tp, data)
+ }
+ }
+
+ // Create futures - one completed, one not
+ val future1: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ val future2: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ // Only complete one remote fetch
+ future1.complete(buildRemoteReadResult(Errors.NONE))
+
+ val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition,
null, null)
+ val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2,
null, null)
+
+ val highWatermark1 = 100
+ val leaderLogStartOffset1 = 10
+ val highWatermark2 = 200
+ val leaderLogStartOffset2 = 20
+
+ val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
+ val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
+
+ val fetchStatus1 = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+ fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchStatus2 = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
+ fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset +
100, logStartOffset, maxBytes, currentLeaderEpoch))
+
+ // Set up maps for multiple partitions
+ val remoteFetchResults = new java.util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]()
+ val remoteFetchInfos = new java.util.HashMap[TopicIdPartition,
RemoteStorageFetchInfo]()
+
+ remoteFetchResults.put(topicIdPartition, future1)
+ remoteFetchResults.put(topicIdPartition2, future2)
+ remoteFetchInfos.put(topicIdPartition, fetchInfo1)
+ remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
+
+ val delayedRemoteFetch = new DelayedRemoteFetch(
+ Collections.emptyMap[TopicIdPartition, Future[Void]](),
+ remoteFetchResults,
+ remoteFetchInfos,
+ remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
+ fetchParams,
+ Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
+ replicaManager,
+ callback)
+
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+ .thenReturn(mock(classOf[Partition]))
+
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
+ .thenReturn(mock(classOf[Partition]))
+
+ // Should not complete since future2 is not done
+ assertFalse(delayedRemoteFetch.tryComplete())
+ assertFalse(delayedRemoteFetch.isCompleted)
+
+ // Complete future2
+ future2.complete(buildRemoteReadResult(Errors.NONE))
+
+ // Now it should complete
+ assertTrue(delayedRemoteFetch.tryComplete())
+ assertTrue(delayedRemoteFetch.isCompleted)
+
+ // Verify both partitions were processed without error
+ assertEquals(2, responses.size)
+ assertTrue(responses.contains(topicIdPartition))
+ assertTrue(responses.contains(topicIdPartition2))
+
+ assertEquals(Errors.NONE, responses(topicIdPartition).error)
+ assertEquals(highWatermark1, responses(topicIdPartition).highWatermark)
+ assertEquals(leaderLogStartOffset1,
responses(topicIdPartition).logStartOffset)
+
+ assertEquals(Errors.NONE, responses(topicIdPartition2).error)
+ assertEquals(highWatermark2, responses(topicIdPartition2).highWatermark)
+ assertEquals(leaderLogStartOffset2,
responses(topicIdPartition2).logStartOffset)
+ }
+
+ @Test
+ def testMultiplePartitionsWithFailedResults(): Unit = {
+ val responses = mutable.Map[TopicIdPartition, FetchPartitionData]()
+
+ def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]):
Unit = {
+ responseSeq.foreach { case (tp, data) =>
+ responses.put(tp, data)
+ }
+ }
+
+ // Create futures - one successful, one with error
+ val future1: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+ val future2: CompletableFuture[RemoteLogReadResult] = new
CompletableFuture[RemoteLogReadResult]()
+
+ // Created 1 successful result and 1 failed result
+ future1.complete(buildRemoteReadResult(Errors.NONE))
+ future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR))
+
+ val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition,
null, null)
+ val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2,
null, null)
+
+ val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10)
+ val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20)
+
+ val fetchStatus1 = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+ fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset,
logStartOffset, maxBytes, currentLeaderEpoch))
+ val fetchStatus2 = FetchPartitionStatus(
+ startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100),
+ fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset +
100, logStartOffset, maxBytes, currentLeaderEpoch))
+
+ // Set up maps for multiple partitions
+ val remoteFetchResults = new java.util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]()
+ val remoteFetchInfos = new java.util.HashMap[TopicIdPartition,
RemoteStorageFetchInfo]()
+
+ remoteFetchResults.put(topicIdPartition, future1)
+ remoteFetchResults.put(topicIdPartition2, future2)
+ remoteFetchInfos.put(topicIdPartition, fetchInfo1)
+ remoteFetchInfos.put(topicIdPartition2, fetchInfo2)
+
+ val delayedRemoteFetch = new DelayedRemoteFetch(
+ Collections.emptyMap[TopicIdPartition, Future[Void]](),
+ remoteFetchResults,
+ remoteFetchInfos,
+ remoteFetchMaxWaitMs,
+ Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2),
+ fetchParams,
+ Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2),
+ replicaManager,
+ callback)
+
+
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+ .thenReturn(mock(classOf[Partition]))
+
when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition))
+ .thenReturn(mock(classOf[Partition]))
+
+ assertTrue(delayedRemoteFetch.tryComplete())
+ assertTrue(delayedRemoteFetch.isCompleted)
+
+ // Verify both partitions were processed
+ assertEquals(2, responses.size)
+ assertTrue(responses.contains(topicIdPartition))
+ assertTrue(responses.contains(topicIdPartition2))
+
+ // First partition should be successful
+ val fetchResult1 = responses(topicIdPartition)
+ assertEquals(Errors.NONE, fetchResult1.error)
+
+ // Second partition should have an error due to remote fetch failure
+ val fetchResult2 = responses(topicIdPartition2)
+ assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error)
}
private def buildFetchParams(replicaId: Int,
@@ -235,7 +481,8 @@ class DelayedRemoteFetchTest {
highWatermark: Int = 0,
leaderLogStartOffset: Int = 0): LogReadResult = {
new LogReadResult(
- new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY),
+ new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY, false, Optional.empty(),
+ Optional.of(mock(classOf[RemoteStorageFetchInfo]))),
Optional.empty(),
highWatermark,
leaderLogStartOffset,
@@ -246,4 +493,9 @@ class DelayedRemoteFetchTest {
if (error != Errors.NONE) Optional.of[Throwable](error.exception) else
Optional.empty[Throwable]())
}
+ private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = {
+ new RemoteLogReadResult(
+ Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY)),
+ if (error != Errors.NONE) Optional.of[Throwable](error.exception) else
Optional.empty[Throwable]())
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 761fc49b2ea..843a0d60342 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.server.util.timer.MockTimer
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments,
ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo,
UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo,
LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments,
ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult,
RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -94,8 +94,8 @@ import java.net.InetAddress
import java.nio.file.{Files, Paths}
import java.util
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
-import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap,
CountDownLatch, TimeUnit}
-import java.util.function.BiConsumer
+import java.util.concurrent.{Callable, CompletableFuture, ConcurrentHashMap,
CountDownLatch, Future, TimeUnit}
+import java.util.function.{BiConsumer, Consumer}
import java.util.stream.IntStream
import java.util.{Collections, Optional, OptionalLong, Properties}
import scala.collection.{Map, Seq, mutable}
@@ -3390,7 +3390,7 @@ class ReplicaManagerTest {
} else {
verify(mockRemoteLogManager).asyncRead(remoteStorageFetchInfoArg.capture(),
any())
val remoteStorageFetchInfo = remoteStorageFetchInfoArg.getValue
- assertEquals(tp0, remoteStorageFetchInfo.topicPartition)
+ assertEquals(tp0,
remoteStorageFetchInfo.topicIdPartition.topicPartition)
assertEquals(fetchOffset, remoteStorageFetchInfo.fetchInfo.fetchOffset)
assertEquals(topicId, remoteStorageFetchInfo.fetchInfo.topicId)
assertEquals(startOffset,
remoteStorageFetchInfo.fetchInfo.logStartOffset)
@@ -3594,6 +3594,109 @@ class ReplicaManagerTest {
}
}
+ @Test
+ def testMultipleRemoteFetchesInOneFetchRequest(): Unit = {
+ val replicaId = -1
+ val tp0 = new TopicPartition(topic, 0)
+ val tp1 = new TopicPartition(topic, 1)
+ val tidp0 = new TopicIdPartition(topicId, tp0)
+ val tidp1 = new TopicIdPartition(topicId, tp1)
+
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true,
shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
+
+ try {
+ val offsetCheckpoints = new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
+ replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
+ replicaManager.createPartition(tp1).createLogIfNotExists(isNew = false,
isFutureReplica = false, offsetCheckpoints, None)
+
+ val leaderEpoch = 0
+ val leaderDelta0 = createLeaderDelta(topicId, tp0, leaderId = 0,
leaderEpoch = leaderEpoch)
+ val leaderDelta1 = createLeaderDelta(topicId, tp1, leaderId = 0,
leaderEpoch = leaderEpoch)
+ val leaderMetadataImage0 = imageFromTopics(leaderDelta0.apply())
+ val leaderMetadataImage1 = imageFromTopics(leaderDelta1.apply())
+ replicaManager.applyDelta(leaderDelta0, leaderMetadataImage0)
+ replicaManager.applyDelta(leaderDelta1, leaderMetadataImage1)
+
+ val params = new FetchParams(replicaId, 1, 1000, 10, 100,
FetchIsolation.LOG_END, Optional.empty)
+ val fetchOffsetTp0 = 1
+ val fetchOffsetTp1 = 2
+
+ val responseSeq = new AtomicReference[Seq[(TopicIdPartition,
FetchPartitionData)]]()
+ val responseLatch = new CountDownLatch(1)
+
+ def fetchCallback(responseStatus: Seq[(TopicIdPartition,
FetchPartitionData)]): Unit = {
+ responseSeq.set(responseStatus)
+ responseLatch.countDown()
+ }
+
+ val callbacks: util.Set[Consumer[RemoteLogReadResult]] = new
util.HashSet[Consumer[RemoteLogReadResult]]()
+ when(mockRemoteLogManager.asyncRead(any(), any())).thenAnswer(ans => {
+ callbacks.add(ans.getArgument(1,
classOf[Consumer[RemoteLogReadResult]]))
+ mock(classOf[Future[Void]])
+ })
+
+ // Start the fetch request for both partitions - this should trigger
remote fetches since
+ // the default mocked log behavior throws OffsetOutOfRangeException
+ replicaManager.fetchMessages(params, Seq(
+ tidp0 -> new PartitionData(topicId, fetchOffsetTp0, startOffset,
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
+ tidp1 -> new PartitionData(topicId, fetchOffsetTp1, startOffset,
100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))
+ ), UNBOUNDED_QUOTA, fetchCallback)
+
+ // Verify that exactly two asyncRead calls were made (one for each
partition)
+ val remoteStorageFetchInfoArg: ArgumentCaptor[RemoteStorageFetchInfo] =
ArgumentCaptor.forClass(classOf[RemoteStorageFetchInfo])
+ verify(mockRemoteLogManager,
times(2)).asyncRead(remoteStorageFetchInfoArg.capture(), any())
+
+ // Verify that remote fetch operations were properly set up for both
partitions
+ assertTrue(replicaManager.delayedRemoteFetchPurgatory.watched == 2,
"DelayedRemoteFetch purgatory should have operations")
+
+ // Verify both partitions were captured in the remote fetch requests
+ val capturedFetchInfos = remoteStorageFetchInfoArg.getAllValues.asScala
+ assertEquals(2, capturedFetchInfos.size, "Should have 2 remote storage
fetch info calls")
+
+ val capturedTopicPartitions =
capturedFetchInfos.map(_.topicIdPartition.topicPartition).toSet
+ assertTrue(capturedTopicPartitions.contains(tp0), "Should contain " +
tp0)
+ assertTrue(capturedTopicPartitions.contains(tp1), "Should contain " +
tp1)
+
+ // Verify the fetch info details are correct for both partitions
+ capturedFetchInfos.foreach { fetchInfo =>
+ assertEquals(topicId, fetchInfo.fetchInfo.topicId)
+ assertEquals(startOffset, fetchInfo.fetchInfo.logStartOffset)
+ assertEquals(leaderEpoch, fetchInfo.fetchInfo.currentLeaderEpoch.get())
+ if (fetchInfo.topicIdPartition.topicPartition == tp0) {
+ assertEquals(fetchOffsetTp0, fetchInfo.fetchInfo.fetchOffset)
+ } else {
+ assertEquals(fetchOffsetTp1, fetchInfo.fetchInfo.fetchOffset)
+ }
+ }
+
+ // Complete the 2 asyncRead tasks
+ callbacks.forEach(callback =>
callback.accept(buildRemoteReadResult(Errors.NONE)))
+
+ // Wait for the fetch callback to complete and verify responseSeq content
+ assertTrue(responseLatch.await(5, TimeUnit.SECONDS), "Fetch callback
should complete")
+
+ val responseData = responseSeq.get()
+ assertNotNull(responseData, "Response sequence should not be null")
+ assertEquals(2, responseData.size, "Response should contain data for
both partitions")
+
+ // Verify that response contains both tidp0 and tidp1 and have no errors
+ val responseTopicIdPartitions = responseData.map(_._1).toSet
+ assertTrue(responseTopicIdPartitions.contains(tidp0), "Response should
contain " + tidp0)
+ assertTrue(responseTopicIdPartitions.contains(tidp1), "Response should
contain " + tidp1)
+ responseData.foreach { case (_, fetchPartitionData) =>
+ assertEquals(Errors.NONE, fetchPartitionData.error)
+ }
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
+ private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = {
+ new RemoteLogReadResult(
+ Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA,
MemoryRecords.EMPTY)),
+ if (error != Errors.NONE) Optional.of[Throwable](error.exception) else
Optional.empty[Throwable]())
+ }
+
private def yammerMetricValue(name: String): Any = {
val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
val (_, metric) = allMetrics.find { case (n, _) =>
n.getMBeanName.endsWith(name) }
diff --git a/docs/ops.html b/docs/ops.html
index 09748a9686d..0b1e8fa6880 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -4358,7 +4358,6 @@ $ bin/kafka-topics.sh --create --topic tieredTopic
--bootstrap-server localhost:
<li>Disabling tiered storage on all topics where it is enabled is required
before disabling tiered storage at the broker level</li>
<li>Admin actions related to tiered storage feature are only supported on
clients from version 3.0 onwards</li>
<li>No support for log segments missing producer snapshot file. It can
happen when topic is created before v2.8.0.</li>
- <li>Only one partition per fetch request is served from the remote store.
This limitation can become a bottleneck for consumer client throughput -
consider configuring <code>max.partition.fetch.bytes</code> appropriately.</li>
</ul>
<p>For more information, please check <a
href="https://cwiki.apache.org/confluence/x/9xDOEg">Kafka Tiered Storage GA
Release Notes</a>.
diff --git
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
index bfb6d97f6d9..157fdcb885d 100644
---
a/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
+++
b/server-common/src/main/java/org/apache/kafka/server/purgatory/DelayedOperationPurgatory.java
@@ -199,7 +199,7 @@ public class DelayedOperationPurgatory<T extends
DelayedOperation> {
}
/**
- * Return the total size of watch lists the purgatory. Since an operation
may be watched
+ * Return the total size of watch lists in the purgatory. Since an
operation may be watched
* on multiple lists, and some of its watched entries may still be in the
watch lists
* even when it has been completed, this number may be larger than the
number of real operations watched
*/
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 0970f6f0a34..852e1d4d1fb 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -1659,7 +1659,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo)
throws RemoteStorageException, IOException {
int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
- TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+ TopicPartition tp =
remoteStorageFetchInfo.topicIdPartition.topicPartition();
FetchRequest.PartitionData fetchInfo =
remoteStorageFetchInfo.fetchInfo;
boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation ==
FetchIsolation.TXN_COMMITTED;
@@ -1715,6 +1715,8 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
// - there is no minimum-one-message constraint and
// - the first batch size is more than maximum bytes that can be
sent and
if (!remoteStorageFetchInfo.minOneMessage && firstBatchSize >
maxBytes) {
+ LOGGER.debug("Returning empty record for offset {} in
partition {} because the first batch size {} " +
+ "is greater than max fetch bytes {}", offset, tp,
firstBatchSize, maxBytes);
return new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY);
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 586816dae29..0b1b049d766 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -192,8 +192,8 @@ public final class RemoteLogManagerConfig {
public static final int
DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS = 1;
public static final String REMOTE_FETCH_MAX_WAIT_MS_PROP =
"remote.fetch.max.wait.ms";
- public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum
amount of time the server will wait before answering the remote fetch request.
" +
- "Note that the broker currently only fetches one partition per fetch
request from the remote store. (KAFKA-14915)";
+ public static final String REMOTE_FETCH_MAX_WAIT_MS_DOC = "The maximum
amount of time the server will wait before answering the fetch request
containing remote fetch partitions. " +
+ "It's important to be aware that the request will only be responded
after all remote partitions have been successfully fetched, have failed, or
this timeout is exceeded.";
public static final int DEFAULT_REMOTE_FETCH_MAX_WAIT_MS = 500;
public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP =
"remote.list.offsets.request.timeout.ms";
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java
index a23ee7207ae..898ff52760c 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogReader.java
@@ -51,7 +51,7 @@ public class RemoteLogReader implements Callable<Void> {
this.rlm = rlm;
this.brokerTopicStats = brokerTopicStats;
this.callback = callback;
-
this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchRequestRate().mark();
+
this.brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchRequestRate().mark();
this.brokerTopicStats.allTopicsStats().remoteFetchRequestRate().mark();
this.quotaManager = quotaManager;
this.remoteReadTimer = remoteReadTimer;
@@ -61,20 +61,20 @@ public class RemoteLogReader implements Callable<Void> {
public Void call() {
RemoteLogReadResult result;
try {
- LOGGER.debug("Reading records from remote storage for topic
partition {}", fetchInfo.topicPartition);
+ LOGGER.debug("Reading records from remote storage for topic
partition {}", fetchInfo.topicIdPartition);
FetchDataInfo fetchDataInfo = remoteReadTimer.time(() ->
rlm.read(fetchInfo));
-
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
+
brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
brokerTopicStats.allTopicsStats().remoteFetchBytesRate().mark(fetchDataInfo.records.sizeInBytes());
result = new RemoteLogReadResult(Optional.of(fetchDataInfo),
Optional.empty());
} catch (OffsetOutOfRangeException e) {
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
} catch (Exception e) {
-
brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteFetchRequestRate().mark();
+
brokerTopicStats.topicStats(fetchInfo.topicIdPartition.topic()).failedRemoteFetchRequestRate().mark();
brokerTopicStats.allTopicsStats().failedRemoteFetchRequestRate().mark();
- LOGGER.error("Error occurred while reading the remote data for
{}", fetchInfo.topicPartition, e);
+ LOGGER.error("Error occurred while reading the remote data for
{}", fetchInfo.topicIdPartition, e);
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
}
- LOGGER.debug("Finished reading records from remote storage for topic
partition {}", fetchInfo.topicPartition);
+ LOGGER.debug("Finished reading records from remote storage for topic
partition {}", fetchInfo.topicIdPartition);
quotaManager.record(result.fetchDataInfo.map(fetchDataInfo ->
fetchDataInfo.records.sizeInBytes()).orElse(0));
callback.accept(result);
return null;
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
index c110e750d7c..e02fea8c0ef 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
@@ -16,7 +16,7 @@
*/
package org.apache.kafka.storage.internals.log;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.server.storage.log.FetchIsolation;
@@ -24,15 +24,15 @@ public class RemoteStorageFetchInfo {
public final int fetchMaxBytes;
public final boolean minOneMessage;
- public final TopicPartition topicPartition;
+ public final TopicIdPartition topicIdPartition;
public final FetchRequest.PartitionData fetchInfo;
public final FetchIsolation fetchIsolation;
- public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage,
TopicPartition topicPartition,
+ public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage,
TopicIdPartition topicIdPartition,
FetchRequest.PartitionData fetchInfo,
FetchIsolation fetchIsolation) {
this.fetchMaxBytes = fetchMaxBytes;
this.minOneMessage = minOneMessage;
- this.topicPartition = topicPartition;
+ this.topicIdPartition = topicIdPartition;
this.fetchInfo = fetchInfo;
this.fetchIsolation = fetchIsolation;
}
@@ -42,7 +42,7 @@ public class RemoteStorageFetchInfo {
return "RemoteStorageFetchInfo{" +
"fetchMaxBytes=" + fetchMaxBytes +
", minOneMessage=" + minOneMessage +
- ", topicPartition=" + topicPartition +
+ ", topicIdPartition=" + topicIdPartition +
", fetchInfo=" + fetchInfo +
", fetchIsolation=" + fetchIsolation +
'}';
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index e82536d7233..182fda9abb9 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -205,6 +205,7 @@ public class RemoteLogManagerTest {
private final TopicIdPartition followerTopicIdPartition = new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0));
private final Map<String, Uuid> topicIds = new HashMap<>();
private final TopicPartition tp = new TopicPartition("TestTopic", 5);
+ private final TopicIdPartition tpId = new
TopicIdPartition(Uuid.randomUuid(), tp);
private final EpochEntry epochEntry0 = new EpochEntry(0, 0);
private final EpochEntry epochEntry1 = new EpochEntry(1, 100);
private final EpochEntry epochEntry2 = new EpochEntry(2, 200);
@@ -3100,7 +3101,7 @@ public class RemoteLogManagerTest {
);
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
- 0, false, tp, partitionData, FetchIsolation.TXN_COMMITTED
+ 0, false, tpId, partitionData, FetchIsolation.TXN_COMMITTED
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@@ -3180,7 +3181,7 @@ public class RemoteLogManagerTest {
);
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
- 0, minOneMessage, tp, partitionData,
FetchIsolation.HIGH_WATERMARK
+ 0, minOneMessage, tpId, partitionData,
FetchIsolation.HIGH_WATERMARK
);
try (RemoteLogManager remoteLogManager = new RemoteLogManager(
@@ -3266,7 +3267,7 @@ public class RemoteLogManagerTest {
when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
doNothing().when(firstBatch).writeTo(capture.capture());
RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(
- 0, true, tp, partitionData, FetchIsolation.HIGH_WATERMARK
+ 0, true, tpId, partitionData, FetchIsolation.HIGH_WATERMARK
);
@@ -3651,7 +3652,7 @@ public class RemoteLogManagerTest {
FetchRequest.PartitionData partitionData = new
FetchRequest.PartitionData(
Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(
- 1048576, true, leaderTopicIdPartition.topicPartition(),
+ 1048576, true, leaderTopicIdPartition,
partitionData, FetchIsolation.HIGH_WATERMARK);
FetchDataInfo fetchDataInfo =
remoteLogManager.read(remoteStorageFetchInfo);
// firstBatch baseOffset may not be equal to the fetchOffset
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
index 6c7026a52d5..efb6bb76e05 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogReaderTest.java
@@ -18,7 +18,8 @@ package org.apache.kafka.server.log.remote.storage;
import kafka.utils.TestUtils;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.log.remote.quota.RLMQuotaManager;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
@@ -69,7 +70,7 @@ public class RemoteLogReaderTest {
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
- RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicIdPartition(Uuid.randomUuid(), 0,
TOPIC), null, null);
RemoteLogReader remoteLogReader =
new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback,
brokerTopicStats, mockQuotaManager, timer);
remoteLogReader.call();
@@ -102,7 +103,7 @@ public class RemoteLogReaderTest {
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new
RuntimeException("error"));
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
- RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null);
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(0, false, new TopicIdPartition(Uuid.randomUuid(), 0,
TOPIC), null, null);
RemoteLogReader remoteLogReader =
new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback,
brokerTopicStats, mockQuotaManager, timer);
remoteLogReader.call();