[ 
https://issues.apache.org/jira/browse/KAFKA-19554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-19554:
------------------------------
    Component/s: clients
                 consumer

> Add a Kafka client parameter to limit number of messages fetched
> ----------------------------------------------------------------
>
>                 Key: KAFKA-19554
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19554
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, consumer
>            Reporter: corkitse
>            Priority: Major
>              Labels: kip-required
>
> h3. Description
> Currently, Kafka fetch requests only support limiting the total size of 
> messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition 
> ({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the 
> *number of messages* fetched per request—neither globally nor on a 
> per-partition basis.
> While Kafka was originally designed as a high-throughput distributed 
> messaging platform and has traditionally focused more on throughput than 
> individual message control, its role has since evolved. Kafka is now not only 
> a leading message queue but also a core component in modern {*}data pipelines 
> and stream processing frameworks{*}.
> In these newer use cases, especially for downstream services and streaming 
> applications, *rate-limiting by message count* is a common requirement.
> Currently, the workaround is for clients to {*}fetch a batch of messages, 
> manually truncate them based on count, and then adjust offsets manually{*}, 
> which is inefficient, error-prone, and significantly reduces throughput. In 
> practice, this forces developers to use external tools such as Redis to 
> implement additional buffering or rate control mechanisms—adding complexity 
> and overhead.
> Adding *native support* for a message count limit in fetch requests would 
> offer the following benefits:
> h3. Benefits
>  # {*}Make Kafka a more mature and production-ready stream processing 
> platform{*}, by supporting more granular rate-limiting use cases.
>  # *Improve overall system throughput* for consumers that need to limit by 
> message count, by eliminating inefficient post-processing workarounds.
> ----
> h3. Potential Challenges
>  # Due to compression and batching, Kafka consumers do not always have direct 
> access to message counts in a fetch response. This means any solution would 
> need to {*}estimate or calculate message counts indirectly{*}—possibly based 
> on batch metadata.
>  # Implementation must ensure that {*}Kafka’s high-performance 
> characteristics are preserved{*}. Any support for message-count limits must 
> avoid excessive decompression or deserialization on the broker side.
> Moreover, from what I’ve observed, {*}many capable companies have already 
> implemented their own internal forks or wrappers of Kafka to support this 
> feature{*}, highlighting the demand and practical importance of this 
> functionality. Therefore, it would be highly beneficial for Kafka to provide 
> a {*}unified and officially supported solution{*}.
>  
> The parameter reference from our internal version.
>  
>  
>  
> {code:java}
> ...
> "validVersions": "4-18",
> "flexibleVersions": "12+",
> "fields": [
>   { "name": "ClusterId", "type": "string", "versions": "12+", 
> "nullableVersions": "12+", "default": "null",
>     "taggedVersions": "12+", "tag": 0, "ignorable": true,
>     "about": "The clusterId if known. This is used to validate metadata 
> fetches prior to broker registration." },
>   { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
> "-1", "entityType": "brokerId",
>     "about": "The broker ID of the follower, of -1 if this request is from a 
> consumer." },
>   { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", 
> "taggedVersions": "15+", "tag": 1,
>     "about": "The state of the replica in the follower.", "fields": [
>     { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": 
> "-1", "entityType": "brokerId",
>       "about": "The replica ID of the follower, or -1 if this request is from 
> a consumer." },
>     { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": 
> "-1",
>       "about": "The epoch of this follower, or -1 if not available." }
>   ]},
>   { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
>     "about": "The maximum time in milliseconds to wait for the response." },
>   { "name": "MinBytes", "type": "int32", "versions": "0+",
>     "about": "The minimum bytes to accumulate in the response." },
>   { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": 
> "0x7fffffff", "ignorable": true,
>     "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
> limit may not be honored." },
>   { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1", 
> "ignorable": true,
>     "about": "The maximum number of messages to fetch. -1 means no limit." },
>   { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
> "0", "ignorable": true,
>     "about": "This setting controls the visibility of transactional records. 
> Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With 
> READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED 
> transactional records are visible. To be more concrete, READ_COMMITTED 
> returns all data from offsets smaller than the current LSO (last stable 
> offset), and enables the inclusion of the list of aborted transactions in the 
> result, which allows consumers to discard ABORTED transactional records." },
>   { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", 
> "ignorable": true,
>     "about": "The fetch session ID." },
>   { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": 
> "-1", "ignorable": true,
>     "about": "The fetch session epoch, which is used for ordering requests in 
> a session." },
>   { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
>     "about": "The topics to fetch.", "fields": [
>     { "name": "Topic", "type": "string", "versions": "0-12", "entityType": 
> "topicName", "ignorable": true,
>       "about": "The name of the topic to fetch." },
>     { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": 
> true, "about": "The unique topic ID."},
>     { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
>       "about": "The partitions to fetch.", "fields": [
>       { "name": "Partition", "type": "int32", "versions": "0+",
>         "about": "The partition index." },
>       { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", 
> "default": "-1", "ignorable": true,
>         "about": "The current leader epoch of the partition." },
>       { "name": "FetchOffset", "type": "int64", "versions": "0+",
>         "about": "The message offset." },
>       { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", 
> "default": "-1", "ignorable": false,
>         "about": "The epoch of the last fetched record or -1 if there is 
> none."},
>       { "name": "LogStartOffset", "type": "int64", "versions": "5+", 
> "default": "-1", "ignorable": true,
>         "about": "The earliest available offset of the follower replica.  The 
> field is only used when the request is sent by the follower."},
>       { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
>         "about": "The maximum bytes to fetch from this partition.  See KIP-74 
> for cases where this limit may not be honored." },
>       { "name": "PartitionMaxNum", "type": "int32", "versions": "18+", 
> "default": "-1", "ignorable": true,
>         "about": "The maximum number of messages to fetch from this 
> partition. -1 means no limit." },
>       { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", 
> "taggedVersions": "17+", "tag": 0, "ignorable": true,
>         "about": "The directory id of the follower fetching." }
>     ]}
>   ]}, 
> ...{code}
>  
> ----
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to