This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eb378da99cb KAFKA-19462: Count fetch size when remote fetch (#20088)
eb378da99cb is described below

commit eb378da99cb3ee39188a1dc24e2d9a30e439a161
Author: Luke Chen <[email protected]>
AuthorDate: Thu Jul 3 10:45:59 2025 +0800

    KAFKA-19462: Count fetch size when remote fetch (#20088)
    
    Estimate the fetch size for remote fetch to avoid to exceed the
    `fetch.max.bytes` config. We don't want to query the remoteLogMetadata
    during API handling, thus we assume the remote fetch can get
    `max.partition.fetch.bytes` size. Tests added.
    
    Reviewers: Kamal Chandraprakash <[email protected]>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  6 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 61 ++++++++++++++--------
 2 files changed, 44 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 00efb5f7a07..e70a4726216 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1852,7 +1852,11 @@ class ReplicaManager(val config: KafkaConfig,
       // Once we read from a non-empty partition, we stop ignoring request and 
partition level size limits
       if (recordBatchSize > 0)
         minOneMessage = false
-      limitBytes = math.max(0, limitBytes - recordBatchSize)
+      // Because we don't know how much data will be retrieved in remote fetch 
yet, and we don't want to block the API call
+      // to query remoteLogMetadata, assume it will fetch the max bytes size 
of data to avoid to exceed the "fetch.max.bytes" setting.
+      val estimatedRecordBatchSize = if (recordBatchSize == 0 && 
readResult.info.delayedRemoteStorageFetch.isPresent)
+        readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else 
recordBatchSize
+      limitBytes = math.max(0, limitBytes - estimatedRecordBatchSize)
       result += (tp -> readResult)
     }
     result
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 3f2753d3ab7..761fc49b2ea 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -112,7 +112,9 @@ object ReplicaManagerTest {
 class ReplicaManagerTest {
 
   private val topic = "test-topic"
+  private val topic2 = "test-topic2"
   private val topicId = Uuid.fromString("YK2ed2GaTH2JpgzUaJ8tgg")
+  private val topicId2 = Uuid.randomUuid()
   private val topicIds = scala.Predef.Map("test-topic" -> topicId)
   private val topicNames = topicIds.map(_.swap)
   private val topicPartition = new TopicPartition(topic, 0)
@@ -3294,38 +3296,53 @@ class ReplicaManagerTest {
   @ValueSource(booleans = Array(true, false))
   def testOffsetOutOfRangeExceptionWhenReadFromLog(isFromFollower: Boolean): 
Unit = {
     val replicaId = if (isFromFollower) 1 else -1
+    val fetchMaxBytes = 150
+    val partitionMaxBytes = 100
     val tp0 = new TopicPartition(topic, 0)
+    val tp02 = new TopicPartition(topic2, 0)
     val tidp0 = new TopicIdPartition(topicId, tp0)
+    val tidp02 = new TopicIdPartition(topicId2, tp02)
     // create a replicaManager with remoteLog enabled
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteFetchQuotaExceeded = Some(false))
     try {
       val offsetCheckpoints = new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
       replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
+      replicaManager.createPartition(tp02).createLogIfNotExists(isNew = false, 
isFutureReplica = false, offsetCheckpoints, None)
       val partition0Replicas = Seq[Integer](0, 1).asJava
-      val topicIds = Map(tp0.topic -> topicId).asJava
+      val topicIds = Map(tp0.topic -> topicId, tp02.topic -> topicId2).asJava
       val leaderEpoch = 0
       val delta = createLeaderDelta(topicIds.get(topic), tp0, 
partition0Replicas.get(0), partition0Replicas, partition0Replicas)
+      val delta2 = createLeaderDelta(topicIds.get(topic2), tp02, 
partition0Replicas.get(0), partition0Replicas, partition0Replicas)
       val leaderMetadataImage = imageFromTopics(delta.apply())
+      val leaderMetadataImage2 = imageFromTopics(delta2.apply())
       replicaManager.applyDelta(delta, leaderMetadataImage)
-
-      val params = new FetchParams(replicaId, 1, 1000, 0, 100, 
FetchIsolation.LOG_END, Optional.empty)
-      // when reading log, it'll throw OffsetOutOfRangeException, which will 
be handled separately
-      val result = replicaManager.readFromLog(params, Seq(tidp0 -> new 
PartitionData(topicId, 1, 0, 100000, Optional.of[Integer](leaderEpoch), 
Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, false)
-
-      if (isFromFollower) {
-        // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from 
follower, since the data is already available in remote log
-        assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, 
result.head._2.error)
-      } else {
-        assertEquals(Errors.NONE, result.head._2.error)
-      }
-      assertEquals(startOffset, result.head._2.leaderLogStartOffset)
-      assertEquals(endOffset, result.head._2.leaderLogEndOffset)
-      assertEquals(highHW, result.head._2.highWatermark)
-      if (isFromFollower) {
-        assertFalse(result.head._2.info.delayedRemoteStorageFetch.isPresent)
-      } else {
-        // for consumer fetch, we should return a delayedRemoteStorageFetch to 
wait for remote fetch
-        assertTrue(result.head._2.info.delayedRemoteStorageFetch.isPresent)
+      replicaManager.applyDelta(delta2, leaderMetadataImage2)
+
+      val params = new FetchParams(replicaId, 1, 100, 0, fetchMaxBytes, 
FetchIsolation.LOG_END, Optional.empty)
+      // when reading logs from 2 partitions, they'll throw 
OffsetOutOfRangeException, which will be handled separately
+      val results = replicaManager.readFromLog(params, Seq(
+        tidp0 -> new PartitionData(topicId, 1, 0, partitionMaxBytes, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch)),
+        tidp02 -> new PartitionData(topicId2, 1, 0, partitionMaxBytes, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UNBOUNDED_QUOTA, false)
+
+      results.foreach { case (tidp, partitionData) =>
+        assertEquals(startOffset, partitionData.leaderLogStartOffset)
+        assertEquals(endOffset, partitionData.leaderLogEndOffset)
+        assertEquals(highHW, partitionData.highWatermark)
+        if (isFromFollower) {
+          // expect OFFSET_MOVED_TO_TIERED_STORAGE error returned if it's from 
follower, since the data is already available in remote log
+          assertEquals(Errors.OFFSET_MOVED_TO_TIERED_STORAGE, 
partitionData.error)
+          assertFalse(partitionData.info.delayedRemoteStorageFetch.isPresent)
+        } else {
+          assertEquals(Errors.NONE, partitionData.error)
+          // for consumer fetch, we should return a delayedRemoteStorageFetch 
to wait for remote fetch
+          assertTrue(partitionData.info.delayedRemoteStorageFetch.isPresent)
+          // verify the 1st partition will set the fetchMaxBytes to 
partitionMaxBytes,
+          // and the 2nd one will set to the remaining (fetchMaxBytes - 
partitionMaxBytes) to meet the "fetch.max.bytes" config.
+          if (tidp.topic == topic)
+            assertEquals(partitionMaxBytes, 
partitionData.info.delayedRemoteStorageFetch.get().fetchMaxBytes)
+          else
+            assertEquals(fetchMaxBytes - partitionMaxBytes, 
partitionData.info.delayedRemoteStorageFetch.get().fetchMaxBytes)
+        }
       }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -3723,8 +3740,8 @@ class ReplicaManagerTest {
     partitionDir.mkdir()
     when(mockLog.dir).thenReturn(partitionDir)
     when(mockLog.parentDir).thenReturn(path)
-    when(mockLog.topicId).thenReturn(Optional.of(topicId))
-    when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 0))
+    
when(mockLog.topicId).thenReturn(Optional.of(topicId)).thenReturn(Optional.of(topicId2))
+    when(mockLog.topicPartition).thenReturn(new TopicPartition(topic, 
0)).thenReturn(new TopicPartition(topic2, 0))
     when(mockLog.highWatermark).thenReturn(highHW)
     when(mockLog.updateHighWatermark(anyLong())).thenReturn(0L)
     when(mockLog.logEndOffsetMetadata).thenReturn(new LogOffsetMetadata(10))

Reply via email to