[ 
https://issues.apache.org/jira/browse/KAFKA-19554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

corkitse updated KAFKA-19554:
-----------------------------
    Description: 
h3. Description

Currently, Kafka fetch requests only support limiting the total size of 
messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition 
({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the 
*number of messages* fetched per request—neither globally nor on a 
per-partition basis.

While Kafka was originally designed as a high-throughput distributed messaging 
platform and has traditionally focused more on throughput than individual 
message control, its role has since evolved. Kafka is now not only a leading 
message queue but also a core component in modern {*}data pipelines and stream 
processing frameworks{*}.

In these newer use cases, especially for downstream services and streaming 
applications, *rate-limiting by message count* is a common requirement.

Currently, the workaround is for clients to {*}fetch a batch of messages, 
manually truncate them based on count, and then adjust offsets manually{*}, 
which is inefficient, error-prone, and significantly reduces throughput. In 
practice, this forces developers to use external tools such as Redis to 
implement additional buffering or rate control mechanisms—adding complexity and 
overhead.

Adding *native support* for a message count limit in fetch requests would offer 
the following benefits:
h3. Benefits
 # {*}Make Kafka a more mature and production-ready stream processing 
platform{*}, by supporting more granular rate-limiting use cases.

 # *Improve overall system throughput* for consumers that need to limit by 
message count, by eliminating inefficient post-processing workarounds.

----
h3. Potential Challenges
 # Due to compression and batching, Kafka consumers do not always have direct 
access to message counts in a fetch response. This means any solution would 
need to {*}estimate or calculate message counts indirectly{*}—possibly based on 
batch metadata.

 # Implementation must ensure that {*}Kafka’s high-performance characteristics 
are preserved{*}. Any support for message-count limits must avoid excessive 
decompression or deserialization on the broker side.

Moreover, from what I’ve observed, {*}many capable companies have already 
implemented their own internal forks or wrappers of Kafka to support this 
feature{*}, highlighting the demand and practical importance of this 
functionality. Therefore, it would be highly beneficial for Kafka to provide a 
{*}unified and officially supported solution{*}.

 
The parameter reference from our internal version.
 
 
 
{code:java}
...
"validVersions": "4-18",
"flexibleVersions": "12+",
"fields": [
  { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
    "taggedVersions": "12+", "tag": 0, "ignorable": true,
    "about": "The clusterId if known. This is used to validate metadata fetches 
prior to broker registration." },
  { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", 
"entityType": "brokerId",
    "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
  { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", 
"taggedVersions": "15+", "tag": 1,
    "about": "The state of the replica in the follower.", "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", 
"entityType": "brokerId",
      "about": "The replica ID of the follower, or -1 if this request is from a 
consumer." },
    { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": 
"-1",
      "about": "The epoch of this follower, or -1 if not available." }
  ]},
  { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
    "about": "The maximum time in milliseconds to wait for the response." },
  { "name": "MinBytes", "type": "int32", "versions": "0+",
    "about": "The minimum bytes to accumulate in the response." },
  { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": 
"0x7fffffff", "ignorable": true,
    "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
limit may not be honored." },
  { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1", 
"ignorable": true,
    "about": "The maximum number of messages to fetch. -1 means no limit." },
  { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", 
"ignorable": true,
    "about": "This setting controls the visibility of transactional records. 
Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With 
READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED 
transactional records are visible. To be more concrete, READ_COMMITTED returns 
all data from offsets smaller than the current LSO (last stable offset), and 
enables the inclusion of the list of aborted transactions in the result, which 
allows consumers to discard ABORTED transactional records." },
  { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", 
"ignorable": true,
    "about": "The fetch session ID." },
  { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", 
"ignorable": true,
    "about": "The fetch session epoch, which is used for ordering requests in a 
session." },
  { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
    "about": "The topics to fetch.", "fields": [
    { "name": "Topic", "type": "string", "versions": "0-12", "entityType": 
"topicName", "ignorable": true,
      "about": "The name of the topic to fetch." },
    { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, 
"about": "The unique topic ID."},
    { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
      "about": "The partitions to fetch.", "fields": [
      { "name": "Partition", "type": "int32", "versions": "0+",
        "about": "The partition index." },
      { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", 
"default": "-1", "ignorable": true,
        "about": "The current leader epoch of the partition." },
      { "name": "FetchOffset", "type": "int64", "versions": "0+",
        "about": "The message offset." },
      { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", 
"default": "-1", "ignorable": false,
        "about": "The epoch of the last fetched record or -1 if there is 
none."},
      { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": 
"-1", "ignorable": true,
        "about": "The earliest available offset of the follower replica.  The 
field is only used when the request is sent by the follower."},
      { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
        "about": "The maximum bytes to fetch from this partition.  See KIP-74 
for cases where this limit may not be honored." },
      { "name": "PartitionMaxNum", "type": "int32", "versions": "18+", 
"default": "-1", "ignorable": true,
        "about": "The maximum number of messages to fetch from this partition. 
-1 means no limit." },
      { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", 
"taggedVersions": "17+", "tag": 0, "ignorable": true,
        "about": "The directory id of the follower fetching." }
    ]}
  ]}, 

...{code}
 
----

 

  was:
h3. Description

Currently, Kafka fetch requests only support limiting the total size of 
messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition 
({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the 
*number of messages* fetched per request—neither globally nor on a 
per-partition basis.

While Kafka was originally designed as a high-throughput distributed messaging 
platform and has traditionally focused more on throughput than individual 
message control, its role has since evolved. Kafka is now not only a leading 
message queue but also a core component in modern {*}data pipelines and stream 
processing frameworks{*}.

In these newer use cases, especially for downstream services and streaming 
applications, *rate-limiting by message count* is a common requirement.

Currently, the workaround is for clients to {*}fetch a batch of messages, 
manually truncate them based on count, and then adjust offsets manually{*}, 
which is inefficient, error-prone, and significantly reduces throughput. In 
practice, this forces developers to use external tools such as Redis to 
implement additional buffering or rate control mechanisms—adding complexity and 
overhead.

Adding *native support* for a message count limit in fetch requests would offer 
the following benefits:
h3. Benefits
 # {*}Make Kafka a more mature and production-ready stream processing 
platform{*}, by supporting more granular rate-limiting use cases.

 # *Improve overall system throughput* for consumers that need to limit by 
message count, by eliminating inefficient post-processing workarounds.

----
h3. Potential Challenges
 # Due to compression and batching, Kafka consumers do not always have direct 
access to message counts in a fetch response. This means any solution would 
need to {*}estimate or calculate message counts indirectly{*}—possibly based on 
batch metadata.

 # Implementation must ensure that {*}Kafka’s high-performance characteristics 
are preserved{*}. Any support for message-count limits must avoid excessive 
decompression or deserialization on the broker side.



Moreover, from what I’ve observed, {*}many capable companies have already 
implemented their own internal forks or wrappers of Kafka to support this 
feature{*}, highlighting the demand and practical importance of this 
functionality. Therefore, it would be highly beneficial for Kafka to provide a 
{*}unified and officially supported solution{*}.

 
The parameter reference from our internal version.
 
 
 
{code:java}
...
"validVersions": "4-18",
"flexibleVersions": "12+",
"fields": [
  { "name": "ClusterId", "type": "string", "versions": "12+", 
"nullableVersions": "12+", "default": "null",
    "taggedVersions": "12+", "tag": 0, "ignorable": true,
    "about": "The clusterId if known. This is used to validate metadata fetches 
prior to broker registration." },
  { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": "-1", 
"entityType": "brokerId",
    "about": "The broker ID of the follower, of -1 if this request is from a 
consumer." },
  { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", 
"taggedVersions": "15+", "tag": 1,
    "about": "The state of the replica in the follower.", "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": "-1", 
"entityType": "brokerId",
      "about": "The replica ID of the follower, or -1 if this request is from a 
consumer." },
    { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": 
"-1",
      "about": "The epoch of this follower, or -1 if not available." }
  ]},
  { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
    "about": "The maximum time in milliseconds to wait for the response." },
  { "name": "MinBytes", "type": "int32", "versions": "0+",
    "about": "The minimum bytes to accumulate in the response." },
  { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": 
"0x7fffffff", "ignorable": true,
    "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
limit may not be honored." },
  { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1", 
"ignorable": true,
    "about": "The maximum number of messages to fetch. -1 means no limit." },
  { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", 
"ignorable": true,
    "about": "This setting controls the visibility of transactional records. 
Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With 
READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED 
transactional records are visible. To be more concrete, READ_COMMITTED returns 
all data from offsets smaller than the current LSO (last stable offset), and 
enables the inclusion of the list of aborted transactions in the result, which 
allows consumers to discard ABORTED transactional records." },
  { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", 
"ignorable": true,
    "about": "The fetch session ID." },
  { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": "-1", 
"ignorable": true,
    "about": "The fetch session epoch, which is used for ordering requests in a 
session." },
  { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
    "about": "The topics to fetch.", "fields": [
    { "name": "Topic", "type": "string", "versions": "0-12", "entityType": 
"topicName", "ignorable": true,
      "about": "The name of the topic to fetch." },
    { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, 
"about": "The unique topic ID."},
    { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
      "about": "The partitions to fetch.", "fields": [
      { "name": "Partition", "type": "int32", "versions": "0+",
        "about": "The partition index." },
      { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", 
"default": "-1", "ignorable": true,
        "about": "The current leader epoch of the partition." },
      { "name": "FetchOffset", "type": "int64", "versions": "0+",
        "about": "The message offset." },
      { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", 
"default": "-1", "ignorable": false,
        "about": "The epoch of the last fetched record or -1 if there is 
none."},
      { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": 
"-1", "ignorable": true,
        "about": "The earliest available offset of the follower replica.  The 
field is only used when the request is sent by the follower."},
      { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
        "about": "The maximum bytes to fetch from this partition.  See KIP-74 
for cases where this limit may not be honored." },
      { "name": "PartitionMaxNum", "type": "int32", "versions": "18+", 
"default": "-1", "ignorable": true,
        "about": "The maximum number of messages to fetch from this partition. 
-1 means no limit." },
      { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", 
"taggedVersions": "17+", "tag": 0, "ignorable": true,
        "about": "The directory id of the follower fetching." }
    ]}
  ]}, 

...{code}
 
----
h3. Proposal

Introduce a new optional parameter, such as:
 
java
复制编辑
{{fetch.max.message.count}}
to specify the *maximum number of messages* to fetch globally (or perhaps per 
partition via a separate config). This can be best-effort if precise control is 
too expensive, but even a close approximation would provide great value to many 
real-world use cases.


> Add a Kafka client parameter to limit number of messages fetched
> ----------------------------------------------------------------
>
>                 Key: KAFKA-19554
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19554
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: corkitse
>            Priority: Major
>
> h3. Description
> Currently, Kafka fetch requests only support limiting the total size of 
> messages fetched ({{{}fetch.max.bytes{}}}) and the size per partition 
> ({{{}max.partition.fetch.bytes{}}}). However, there is no way to limit the 
> *number of messages* fetched per request—neither globally nor on a 
> per-partition basis.
> While Kafka was originally designed as a high-throughput distributed 
> messaging platform and has traditionally focused more on throughput than 
> individual message control, its role has since evolved. Kafka is now not only 
> a leading message queue but also a core component in modern {*}data pipelines 
> and stream processing frameworks{*}.
> In these newer use cases, especially for downstream services and streaming 
> applications, *rate-limiting by message count* is a common requirement.
> Currently, the workaround is for clients to {*}fetch a batch of messages, 
> manually truncate them based on count, and then adjust offsets manually{*}, 
> which is inefficient, error-prone, and significantly reduces throughput. In 
> practice, this forces developers to use external tools such as Redis to 
> implement additional buffering or rate control mechanisms—adding complexity 
> and overhead.
> Adding *native support* for a message count limit in fetch requests would 
> offer the following benefits:
> h3. Benefits
>  # {*}Make Kafka a more mature and production-ready stream processing 
> platform{*}, by supporting more granular rate-limiting use cases.
>  # *Improve overall system throughput* for consumers that need to limit by 
> message count, by eliminating inefficient post-processing workarounds.
> ----
> h3. Potential Challenges
>  # Due to compression and batching, Kafka consumers do not always have direct 
> access to message counts in a fetch response. This means any solution would 
> need to {*}estimate or calculate message counts indirectly{*}—possibly based 
> on batch metadata.
>  # Implementation must ensure that {*}Kafka’s high-performance 
> characteristics are preserved{*}. Any support for message-count limits must 
> avoid excessive decompression or deserialization on the broker side.
> Moreover, from what I’ve observed, {*}many capable companies have already 
> implemented their own internal forks or wrappers of Kafka to support this 
> feature{*}, highlighting the demand and practical importance of this 
> functionality. Therefore, it would be highly beneficial for Kafka to provide 
> a {*}unified and officially supported solution{*}.
>  
> The parameter reference from our internal version.
>  
>  
>  
> {code:java}
> ...
> "validVersions": "4-18",
> "flexibleVersions": "12+",
> "fields": [
>   { "name": "ClusterId", "type": "string", "versions": "12+", 
> "nullableVersions": "12+", "default": "null",
>     "taggedVersions": "12+", "tag": 0, "ignorable": true,
>     "about": "The clusterId if known. This is used to validate metadata 
> fetches prior to broker registration." },
>   { "name": "ReplicaId", "type": "int32", "versions": "0-14", "default": 
> "-1", "entityType": "brokerId",
>     "about": "The broker ID of the follower, of -1 if this request is from a 
> consumer." },
>   { "name": "ReplicaState", "type": "ReplicaState", "versions": "15+", 
> "taggedVersions": "15+", "tag": 1,
>     "about": "The state of the replica in the follower.", "fields": [
>     { "name": "ReplicaId", "type": "int32", "versions": "15+", "default": 
> "-1", "entityType": "brokerId",
>       "about": "The replica ID of the follower, or -1 if this request is from 
> a consumer." },
>     { "name": "ReplicaEpoch", "type": "int64", "versions": "15+", "default": 
> "-1",
>       "about": "The epoch of this follower, or -1 if not available." }
>   ]},
>   { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
>     "about": "The maximum time in milliseconds to wait for the response." },
>   { "name": "MinBytes", "type": "int32", "versions": "0+",
>     "about": "The minimum bytes to accumulate in the response." },
>   { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": 
> "0x7fffffff", "ignorable": true,
>     "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
> limit may not be honored." },
>   { "name": "MaxNum", "type": "int32", "versions": "18+", "default": "-1", 
> "ignorable": true,
>     "about": "The maximum number of messages to fetch. -1 means no limit." },
>   { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
> "0", "ignorable": true,
>     "about": "This setting controls the visibility of transactional records. 
> Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With 
> READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED 
> transactional records are visible. To be more concrete, READ_COMMITTED 
> returns all data from offsets smaller than the current LSO (last stable 
> offset), and enables the inclusion of the list of aborted transactions in the 
> result, which allows consumers to discard ABORTED transactional records." },
>   { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", 
> "ignorable": true,
>     "about": "The fetch session ID." },
>   { "name": "SessionEpoch", "type": "int32", "versions": "7+", "default": 
> "-1", "ignorable": true,
>     "about": "The fetch session epoch, which is used for ordering requests in 
> a session." },
>   { "name": "Topics", "type": "[]FetchTopic", "versions": "0+",
>     "about": "The topics to fetch.", "fields": [
>     { "name": "Topic", "type": "string", "versions": "0-12", "entityType": 
> "topicName", "ignorable": true,
>       "about": "The name of the topic to fetch." },
>     { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": 
> true, "about": "The unique topic ID."},
>     { "name": "Partitions", "type": "[]FetchPartition", "versions": "0+",
>       "about": "The partitions to fetch.", "fields": [
>       { "name": "Partition", "type": "int32", "versions": "0+",
>         "about": "The partition index." },
>       { "name": "CurrentLeaderEpoch", "type": "int32", "versions": "9+", 
> "default": "-1", "ignorable": true,
>         "about": "The current leader epoch of the partition." },
>       { "name": "FetchOffset", "type": "int64", "versions": "0+",
>         "about": "The message offset." },
>       { "name": "LastFetchedEpoch", "type": "int32", "versions": "12+", 
> "default": "-1", "ignorable": false,
>         "about": "The epoch of the last fetched record or -1 if there is 
> none."},
>       { "name": "LogStartOffset", "type": "int64", "versions": "5+", 
> "default": "-1", "ignorable": true,
>         "about": "The earliest available offset of the follower replica.  The 
> field is only used when the request is sent by the follower."},
>       { "name": "PartitionMaxBytes", "type": "int32", "versions": "0+",
>         "about": "The maximum bytes to fetch from this partition.  See KIP-74 
> for cases where this limit may not be honored." },
>       { "name": "PartitionMaxNum", "type": "int32", "versions": "18+", 
> "default": "-1", "ignorable": true,
>         "about": "The maximum number of messages to fetch from this 
> partition. -1 means no limit." },
>       { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "17+", 
> "taggedVersions": "17+", "tag": 0, "ignorable": true,
>         "about": "The directory id of the follower fetching." }
>     ]}
>   ]}, 
> ...{code}
>  
> ----
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to