corkitse created KAFKA-19554:
--------------------------------
Summary: Add a Kafka client parameter to limit number of messages
fetched
Key: KAFKA-19554
URL: https://issues.apache.org/jira/browse/KAFKA-19554
Project: Kafka
Issue Type: Improvement
Reporter: corkitse
h3. Description
Currently, Kafka fetch requests only support limiting the total size of
messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition
({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the
*number of messages* fetched per request—neither globally nor on a
per-partition basis.
While Kafka was originally designed as a high-throughput distributed messaging
platform and has traditionally focused more on throughput than individual
message control, its role has since evolved. Kafka is now not only a leading
message queue but also a core component in modern {*}data pipelines and stream
processing frameworks{*}.
In these newer use cases, especially for downstream services and streaming
applications, *rate-limiting by message count* is a common requirement.
Currently, the workaround is for clients to {*}fetch a batch of messages,
manually truncate them based on count, and then adjust offsets manually{*},
which is inefficient, error-prone, and significantly reduces throughput. In
practice, this forces developers to use external tools such as Redis to
implement additional buffering or rate control mechanisms—adding complexity and
overhead.
Adding *native support* for a message count limit in fetch requests would offer
the following benefits:
h3. Benefits
# {*}Make Kafka a more mature and production-ready stream processing
platform{*}, by supporting more granular rate-limiting use cases.
# *Improve overall system throughput* for consumers that need to limit by
message count, by eliminating inefficient post-processing workarounds.
----
h3. Potential Challenges
# Due to compression and batching, Kafka consumers do not always have direct
access to message counts in a fetch response. This means any solution would
need to {*}estimate or calculate message counts indirectly{*}—possibly based on
batch metadata.
# Implementation must ensure that {*}Kafka’s high-performance characteristics
are preserved{*}. Any support for message-count limits must avoid excessive
decompression or deserialization on the broker side.
Moreover, from what I’ve observed, {*}many capable companies have already
implemented their own internal forks or wrappers of Kafka to support this
feature{*}, highlighting the demand and practical importance of this
functionality. Therefore, it would be highly beneficial for Kafka to provide a
{*}unified and officially supported solution{*}.
The parameter reference from our internal version.
{code:java}
...
"validVersions": "4-18",
"flexibleVersions": "12+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "12+",
"nullableVersions": "12+", "default": "null",
"taggedVersions": "12+", "tag": 0, "ignorable": true,
"about": "The clusterId if known. This is used to validate metadata fetches
prior to broker registration." },
{ "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1",
"entityType": "brokerId",
"about": "The broker ID of the follower, of -1 if this request is from a
consumer." },
{ "name": "ReplicaState", "type": "ReplicaState", "versions": "15+",
"taggedVersions": "15+", "tag": 1,
"about": "The state of the replica in the follower.", "fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1",
"entityType": "brokerId",
"about": "The replica ID of the follower, or -1 if this request is from a
consumer." },
{ "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default":
"-1",
"about": "The epoch of this follower, or -1 if not available." }
]},
{ "name": "MaxWaitMs", "type": "int32", "versions": "0+",
"about": "The maximum time in milliseconds to wait for the response." },
{ "name": "MinBytes", "type": "int32", "versions": "0+",
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "3+", "default":
"0x7fffffff", "ignorable": true,
"about": "The maximum bytes to fetch. See KIP-74 for cases where this
limit may not be honored." },
{ "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1",
"ignorable": true,
"about": "The maximum number of messages to fetch. -1 means no limit." },
{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0",
"ignorable": true,
"about": "This setting controls the visibility of transactional records.
Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With
READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED
transactional records are visible. To be more concrete, READ_COMMITTED returns
all data from offsets smaller than the current LSO (last stable offset), and
enables the inclusion of the list of aborted transactions in the result, which
allows consumers to discard ABORTED transactional records." },
{ "name": "SessionId", "type": "int32", "versions": "7+", "default": "0",
"ignorable": true,
"about": "The fetch session ID." },
{ "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1",
"ignorable": true,
"about": "The fetch session epoch, which is used for ordering requests in a
session." },
{ "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
"about": "The topics to fetch.", "fields": [
{ "name": "Topic", "type": "string", "versions": "0-12", "entityType":
"topicName", "ignorable": true,
"about": "The name of the topic to fetch." },
{ "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true,
"about": "The unique topic ID."},
{ "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
"about": "The partitions to fetch.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+",
"default": "-1", "ignorable": true,
"about": "The current leader epoch of the partition." },
{ "name": "FetchOffset", "type": "int64", "versions": "0+",
"about": "The message offset." },
{ "name": "LastFetchedEpoch", "type": "int32", "versions": "12+",
"default": "-1", "ignorable": false,
"about": "The epoch of the last fetched record or -1 if there is
none."},
{ "name": "LogStartOffset", "type": "int64", "versions": "5+", "default":
"-1", "ignorable": true,
"about": "The earliest available offset of the follower replica. The
field is only used when the request is sent by the follower."},
{ "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
"about": "The maximum bytes to fetch from this partition. See KIP-74
for cases where this limit may not be honored." },
{ "name": "PartitionMaxNum", "type": "int32", "versions": "18+",
"default": "-1", "ignorable": true,
"about": "The maximum number of messages to fetch from this partition.
-1 means no limit." },
{ "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+",
"taggedVersions": "17+", "tag": 0, "ignorable": true,
"about": "The directory id of the follower fetching." }
]}
]},
...{code}
----
h3. Proposal
Introduce a new optional parameter, such as:
java
复制编辑
{{fetch.max.message.count}}
to specify the *maximum number of messages* to fetch globally (or perhaps per
partition via a separate config). This can be best-effort if precise control is
too expensive, but even a close approximation would provide great value to many
real-world use cases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)