[ https://issues.apache.org/jira/browse/KAFKA-19554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
corkitse updated KAFKA-19554: ----------------------------- Description: h3. Description Currently, Kafka fetch requests only support limiting the total size of messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition ({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the *number of messages* fetched per request—neither globally nor on a per-partition basis. While Kafka was originally designed as a high-throughput distributed messaging platform and has traditionally focused more on throughput than individual message control, its role has since evolved. Kafka is now not only a leading message queue but also a core component in modern {*}data pipelines and stream processing frameworks{*}. In these newer use cases, especially for downstream services and streaming applications, *rate-limiting by message count* is a common requirement. Currently, the workaround is for clients to {*}fetch a batch of messages, manually truncate them based on count, and then adjust offsets manually{*}, which is inefficient, error-prone, and significantly reduces throughput. In practice, this forces developers to use external tools such as Redis to implement additional buffering or rate control mechanisms—adding complexity and overhead. Adding *native support* for a message count limit in fetch requests would offer the following benefits: h3. Benefits # {*}Make Kafka a more mature and production-ready stream processing platform{*}, by supporting more granular rate-limiting use cases. # *Improve overall system throughput* for consumers that need to limit by message count, by eliminating inefficient post-processing workarounds. ---- h3. Potential Challenges # Due to compression and batching, Kafka consumers do not always have direct access to message counts in a fetch response. This means any solution would need to {*}estimate or calculate message counts indirectly{*}—possibly based on batch metadata. # Implementation must ensure that {*}Kafka’s high-performance characteristics are preserved{*}. Any support for message-count limits must avoid excessive decompression or deserialization on the broker side. Moreover, from what I’ve observed, {*}many capable companies have already implemented their own internal forks or wrappers of Kafka to support this feature{*}, highlighting the demand and practical importance of this functionality. Therefore, it would be highly beneficial for Kafka to provide a {*}unified and officially supported solution{*}. The parameter reference from our internal version. {code:java} ... "validVersions": "4-18", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", "taggedVersions": "15+", "tag": 1, "about": "The state of the replica in the follower.", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId", "about": "The replica ID of the follower, or -1 if this request is from a consumer." }, { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": "-1", "about": "The epoch of this follower, or -1 if not available." } ]}, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1", "ignorable": true, "about": "The maximum number of messages to fetch. -1 means no limit." }, { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true, "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": true, "about": "The fetch session ID." }, { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true, "about": "The fetch session epoch, which is used for ordering requests in a session." }, { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true, "about": "The name of the topic to fetch." }, { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true, "about": "The current leader epoch of the partition." }, { "name": "FetchOffset", "type": "int64", "versions": "0+", "about": "The message offset." }, { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", "default": "-1", "ignorable": false, "about": "The epoch of the last fetched record or -1 if there is none."}, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The earliest available offset of the follower replica. The field is only used when the request is sent by the follower."}, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." }, { "name": "PartitionMaxNum", "type": "int32", "versions": "18+", "default": "-1", "ignorable": true, "about": "The maximum number of messages to fetch from this partition. -1 means no limit." }, { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true, "about": "The directory id of the follower fetching." } ]} ]}, ...{code} ---- was: h3. Description Currently, Kafka fetch requests only support limiting the total size of messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition ({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the *number of messages* fetched per request—neither globally nor on a per-partition basis. While Kafka was originally designed as a high-throughput distributed messaging platform and has traditionally focused more on throughput than individual message control, its role has since evolved. Kafka is now not only a leading message queue but also a core component in modern {*}data pipelines and stream processing frameworks{*}. In these newer use cases, especially for downstream services and streaming applications, *rate-limiting by message count* is a common requirement. Currently, the workaround is for clients to {*}fetch a batch of messages, manually truncate them based on count, and then adjust offsets manually{*}, which is inefficient, error-prone, and significantly reduces throughput. In practice, this forces developers to use external tools such as Redis to implement additional buffering or rate control mechanisms—adding complexity and overhead. Adding *native support* for a message count limit in fetch requests would offer the following benefits: h3. Benefits # {*}Make Kafka a more mature and production-ready stream processing platform{*}, by supporting more granular rate-limiting use cases. # *Improve overall system throughput* for consumers that need to limit by message count, by eliminating inefficient post-processing workarounds. ---- h3. Potential Challenges # Due to compression and batching, Kafka consumers do not always have direct access to message counts in a fetch response. This means any solution would need to {*}estimate or calculate message counts indirectly{*}—possibly based on batch metadata. # Implementation must ensure that {*}Kafka’s high-performance characteristics are preserved{*}. Any support for message-count limits must avoid excessive decompression or deserialization on the broker side. Moreover, from what I’ve observed, {*}many capable companies have already implemented their own internal forks or wrappers of Kafka to support this feature{*}, highlighting the demand and practical importance of this functionality. Therefore, it would be highly beneficial for Kafka to provide a {*}unified and officially supported solution{*}. The parameter reference from our internal version. {code:java} ... "validVersions": "4-18", "flexibleVersions": "12+", "fields": [ { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null", "taggedVersions": "12+", "tag": 0, "ignorable": true, "about": "The clusterId if known. This is used to validate metadata fetches prior to broker registration." }, { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", "entityType": "brokerId", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", "taggedVersions": "15+", "tag": 1, "about": "The state of the replica in the follower.", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", "entityType": "brokerId", "about": "The replica ID of the follower, or -1 if this request is from a consumer." }, { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": "-1", "about": "The epoch of this follower, or -1 if not available." } ]}, { "name": "MaxWaitMs", "type": "int32", "versions": "0+", "about": "The maximum time in milliseconds to wait for the response." }, { "name": "MinBytes", "type": "int32", "versions": "0+", "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1", "ignorable": true, "about": "The maximum number of messages to fetch. -1 means no limit." }, { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true, "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABORTED transactional records." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": true, "about": "The fetch session ID." }, { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true, "about": "The fetch session epoch, which is used for ordering requests in a session." }, { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "Topic", "type": "string", "versions": "0-12", "entityType": "topicName", "ignorable": true, "about": "The name of the topic to fetch." }, { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID."}, { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "Partition", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true, "about": "The current leader epoch of the partition." }, { "name": "FetchOffset", "type": "int64", "versions": "0+", "about": "The message offset." }, { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", "default": "-1", "ignorable": false, "about": "The epoch of the last fetched record or -1 if there is none."}, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The earliest available offset of the follower replica. The field is only used when the request is sent by the follower."}, { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this partition. See KIP-74 for cases where this limit may not be honored." }, { "name": "PartitionMaxNum", "type": "int32", "versions": "18+", "default": "-1", "ignorable": true, "about": "The maximum number of messages to fetch from this partition. -1 means no limit." }, { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", "taggedVersions": "17+", "tag": 0, "ignorable": true, "about": "The directory id of the follower fetching." } ]} ]}, ...{code} ---- h3. Proposal Introduce a new optional parameter, such as: java 复制编辑 {{fetch.max.message.count}} to specify the *maximum number of messages* to fetch globally (or perhaps per partition via a separate config). This can be best-effort if precise control is too expensive, but even a close approximation would provide great value to many real-world use cases. > Add a Kafka client parameter to limit number of messages fetched > ---------------------------------------------------------------- > > Key: KAFKA-19554 > URL: https://issues.apache.org/jira/browse/KAFKA-19554 > Project: Kafka > Issue Type: Improvement > Reporter: corkitse > Priority: Major > > h3. Description > Currently, Kafka fetch requests only support limiting the total size of > messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition > ({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the > *number of messages* fetched per request—neither globally nor on a > per-partition basis. > While Kafka was originally designed as a high-throughput distributed > messaging platform and has traditionally focused more on throughput than > individual message control, its role has since evolved. Kafka is now not only > a leading message queue but also a core component in modern {*}data pipelines > and stream processing frameworks{*}. > In these newer use cases, especially for downstream services and streaming > applications, *rate-limiting by message count* is a common requirement. > Currently, the workaround is for clients to {*}fetch a batch of messages, > manually truncate them based on count, and then adjust offsets manually{*}, > which is inefficient, error-prone, and significantly reduces throughput. In > practice, this forces developers to use external tools such as Redis to > implement additional buffering or rate control mechanisms—adding complexity > and overhead. > Adding *native support* for a message count limit in fetch requests would > offer the following benefits: > h3. Benefits > # {*}Make Kafka a more mature and production-ready stream processing > platform{*}, by supporting more granular rate-limiting use cases. > # *Improve overall system throughput* for consumers that need to limit by > message count, by eliminating inefficient post-processing workarounds. > ---- > h3. Potential Challenges > # Due to compression and batching, Kafka consumers do not always have direct > access to message counts in a fetch response. This means any solution would > need to {*}estimate or calculate message counts indirectly{*}—possibly based > on batch metadata. > # Implementation must ensure that {*}Kafka’s high-performance > characteristics are preserved{*}. Any support for message-count limits must > avoid excessive decompression or deserialization on the broker side. > Moreover, from what I’ve observed, {*}many capable companies have already > implemented their own internal forks or wrappers of Kafka to support this > feature{*}, highlighting the demand and practical importance of this > functionality. Therefore, it would be highly beneficial for Kafka to provide > a {*}unified and officially supported solution{*}. > > The parameter reference from our internal version. > > > > {code:java} > ... > "validVersions": "4-18", > "flexibleVersions": "12+", > "fields": [ > { "name": "ClusterId", "type": "string", "versions": "12+", > "nullableVersions": "12+", "default": "null", > "taggedVersions": "12+", "tag": 0, "ignorable": true, > "about": "The clusterId if known. This is used to validate metadata > fetches prior to broker registration." }, > { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": > "-1", "entityType": "brokerId", > "about": "The broker ID of the follower, of -1 if this request is from a > consumer." }, > { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", > "taggedVersions": "15+", "tag": 1, > "about": "The state of the replica in the follower.", "fields": [ > { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": > "-1", "entityType": "brokerId", > "about": "The replica ID of the follower, or -1 if this request is from > a consumer." }, > { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": > "-1", > "about": "The epoch of this follower, or -1 if not available." } > ]}, > { "name": "MaxWaitMs", "type": "int32", "versions": "0+", > "about": "The maximum time in milliseconds to wait for the response." }, > { "name": "MinBytes", "type": "int32", "versions": "0+", > "about": "The minimum bytes to accumulate in the response." }, > { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": > "0x7fffffff", "ignorable": true, > "about": "The maximum bytes to fetch. See KIP-74 for cases where this > limit may not be honored." }, > { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1", > "ignorable": true, > "about": "The maximum number of messages to fetch. -1 means no limit." }, > { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": > "0", "ignorable": true, > "about": "This setting controls the visibility of transactional records. > Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With > READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED > transactional records are visible. To be more concrete, READ_COMMITTED > returns all data from offsets smaller than the current LSO (last stable > offset), and enables the inclusion of the list of aborted transactions in the > result, which allows consumers to discard ABORTED transactional records." }, > { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", > "ignorable": true, > "about": "The fetch session ID." }, > { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": > "-1", "ignorable": true, > "about": "The fetch session epoch, which is used for ordering requests in > a session." }, > { "name": "Topics", "type": "[]FetchTopic", "versions": "0+", > "about": "The topics to fetch.", "fields": [ > { "name": "Topic", "type": "string", "versions": "0-12", "entityType": > "topicName", "ignorable": true, > "about": "The name of the topic to fetch." }, > { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": > true, "about": "The unique topic ID."}, > { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+", > "about": "The partitions to fetch.", "fields": [ > { "name": "Partition", "type": "int32", "versions": "0+", > "about": "The partition index." }, > { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", > "default": "-1", "ignorable": true, > "about": "The current leader epoch of the partition." }, > { "name": "FetchOffset", "type": "int64", "versions": "0+", > "about": "The message offset." }, > { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", > "default": "-1", "ignorable": false, > "about": "The epoch of the last fetched record or -1 if there is > none."}, > { "name": "LogStartOffset", "type": "int64", "versions": "5+", > "default": "-1", "ignorable": true, > "about": "The earliest available offset of the follower replica. The > field is only used when the request is sent by the follower."}, > { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+", > "about": "The maximum bytes to fetch from this partition. See KIP-74 > for cases where this limit may not be honored." }, > { "name": "PartitionMaxNum", "type": "int32", "versions": "18+", > "default": "-1", "ignorable": true, > "about": "The maximum number of messages to fetch from this > partition. -1 means no limit." }, > { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", > "taggedVersions": "17+", "tag": 0, "ignorable": true, > "about": "The directory id of the follower fetching." } > ]} > ]}, > ...{code} > > ---- > -- This message was sent by Atlassian Jira (v8.20.10#820010)