[ https://issues.apache.org/jira/browse/KAFKA-19554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirk True updated KAFKA-19554: ------------------------------ Component/s: clients consumer > Add a Kafka client parameter to limit number of messages fetched > ---------------------------------------------------------------- > > Key: KAFKA-19554 > URL: https://issues.apache.org/jira/browse/KAFKA-19554 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer > Reporter: corkitse > Priority: Major > Labels: kip-required > > h3. Description > Currently, Kafka fetch requests only support limiting the total size of > messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition > ({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the > *number of messages* fetched per request—neither globally nor on a > per-partition basis. > While Kafka was originally designed as a high-throughput distributed > messaging platform and has traditionally focused more on throughput than > individual message control, its role has since evolved. Kafka is now not only > a leading message queue but also a core component in modern {*}data pipelines > and stream processing frameworks{*}. > In these newer use cases, especially for downstream services and streaming > applications, *rate-limiting by message count* is a common requirement. > Currently, the workaround is for clients to {*}fetch a batch of messages, > manually truncate them based on count, and then adjust offsets manually{*}, > which is inefficient, error-prone, and significantly reduces throughput. In > practice, this forces developers to use external tools such as Redis to > implement additional buffering or rate control mechanisms—adding complexity > and overhead. > Adding *native support* for a message count limit in fetch requests would > offer the following benefits: > h3. Benefits > # {*}Make Kafka a more mature and production-ready stream processing > platform{*}, by supporting more granular rate-limiting use cases. > # *Improve overall system throughput* for consumers that need to limit by > message count, by eliminating inefficient post-processing workarounds. > ---- > h3. Potential Challenges > # Due to compression and batching, Kafka consumers do not always have direct > access to message counts in a fetch response. This means any solution would > need to {*}estimate or calculate message counts indirectly{*}—possibly based > on batch metadata. > # Implementation must ensure that {*}Kafka’s high-performance > characteristics are preserved{*}. Any support for message-count limits must > avoid excessive decompression or deserialization on the broker side. > Moreover, from what I’ve observed, {*}many capable companies have already > implemented their own internal forks or wrappers of Kafka to support this > feature{*}, highlighting the demand and practical importance of this > functionality. Therefore, it would be highly beneficial for Kafka to provide > a {*}unified and officially supported solution{*}. > > The parameter reference from our internal version. > > > > {code:java} > ... > "validVersions": "4-18", > "flexibleVersions": "12+", > "fields": [ > { "name": "ClusterId", "type": "string", "versions": "12+", > "nullableVersions": "12+", "default": "null", > "taggedVersions": "12+", "tag": 0, "ignorable": true, > "about": "The clusterId if known. This is used to validate metadata > fetches prior to broker registration." }, > { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": > "-1", "entityType": "brokerId", > "about": "The broker ID of the follower, of -1 if this request is from a > consumer." }, > { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", > "taggedVersions": "15+", "tag": 1, > "about": "The state of the replica in the follower.", "fields": [ > { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": > "-1", "entityType": "brokerId", > "about": "The replica ID of the follower, or -1 if this request is from > a consumer." }, > { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": > "-1", > "about": "The epoch of this follower, or -1 if not available." } > ]}, > { "name": "MaxWaitMs", "type": "int32", "versions": "0+", > "about": "The maximum time in milliseconds to wait for the response." }, > { "name": "MinBytes", "type": "int32", "versions": "0+", > "about": "The minimum bytes to accumulate in the response." }, > { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": > "0x7fffffff", "ignorable": true, > "about": "The maximum bytes to fetch. See KIP-74 for cases where this > limit may not be honored." }, > { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1", > "ignorable": true, > "about": "The maximum number of messages to fetch. -1 means no limit." }, > { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": > "0", "ignorable": true, > "about": "This setting controls the visibility of transactional records. > Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With > READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED > transactional records are visible. To be more concrete, READ_COMMITTED > returns all data from offsets smaller than the current LSO (last stable > offset), and enables the inclusion of the list of aborted transactions in the > result, which allows consumers to discard ABORTED transactional records." }, > { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", > "ignorable": true, > "about": "The fetch session ID." }, > { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": > "-1", "ignorable": true, > "about": "The fetch session epoch, which is used for ordering requests in > a session." }, > { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", > "about": "The topics to fetch.", "fields": [ > { "name": "Topic", "type": "string", "versions": "0-12", "entityType": > "topicName", "ignorable": true, > "about": "The name of the topic to fetch." }, > { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": > true, "about": "The unique topic ID."}, > { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", > "about": "The partitions to fetch.", "fields": [ > { "name": "Partition", "type": "int32", "versions": "0+", > "about": "The partition index." }, > { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", > "default": "-1", "ignorable": true, > "about": "The current leader epoch of the partition." }, > { "name": "FetchOffset", "type": "int64", "versions": "0+", > "about": "The message offset." }, > { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", > "default": "-1", "ignorable": false, > "about": "The epoch of the last fetched record or -1 if there is > none."}, > { "name": "LogStartOffset", "type": "int64", "versions": "5+", > "default": "-1", "ignorable": true, > "about": "The earliest available offset of the follower replica. The > field is only used when the request is sent by the follower."}, > { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", > "about": "The maximum bytes to fetch from this partition. See KIP-74 > for cases where this limit may not be honored." }, > { "name": "PartitionMaxNum", "type": "int32", "versions": "18+", > "default": "-1", "ignorable": true, > "about": "The maximum number of messages to fetch from this > partition. -1 means no limit." }, > { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", > "taggedVersions": "17+", "tag": 0, "ignorable": true, > "about": "The directory id of the follower fetching." } > ]} > ]}, > ...{code} > > ---- > -- This message was sent by Atlassian Jira (v8.20.10#820010)