kamalcph commented on code in PR #20088:
URL: https://github.com/apache/kafka/pull/20088#discussion_r2412551590
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1852,7 +1852,11 @@ class ReplicaManager(val config: KafkaConfig,
// Once we read from a non-empty partition, we stop ignoring request and
partition level size limits
if (recordBatchSize > 0)
minOneMessage = false
- limitBytes = math.max(0, limitBytes - recordBatchSize)
+ // Because we don't know how much data will be retrieved in remote fetch
yet, and we don't want to block the API call
+ // to query remoteLogMetadata, assume it will fetch the max bytes size
of data to avoid to exceed the "fetch.max.bytes" setting.
+ val estimatedRecordBatchSize = if (recordBatchSize == 0 &&
readResult.info.delayedRemoteStorageFetch.isPresent)
+ readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else
recordBatchSize
Review Comment:
Revisiting this change, should we have to set the `minOneMessage` as false
from the second partition when the delayedRemoteFetch is present in the first
partition?
Assume that the FETCH request is configured with fetchMaxBytes as 50 MB and
max.partition.fetch.bytes as 1 MB. And, the broker is hosting 5 partitions as
leader for the topic. The FETCH request might try to read the data from remote
for all the 5 partitions.
If the size of the message in the topic is 30 MB, then we might be returning
back 150 MB of response instead of 30 MB due to `minOneMessage` set to true for
all the remote-read requests.
cc @satishd
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]