Hi Andrew,
Thanks for reading and responding to the KIP!

AS3 - AS8:
 All these comments make sense to me, I have updated the KIP according to your 
suggestions.

AS9:
> Return two entire batches 0-5 inclusive, but only acquire records 0-4
> inclusive. If a batch is "split" like this, the broker has returned record
> data which has not been acquired by this consumer.

From my point of view, the method mentioned above is the best way to implement 
the record-limit mode in Kafka. Although this will introduce overhead on either 
the client or server side, I believe it represents a necessary trade-off 
between the current Kafka log segment structure (which relies on producer 
batches) and the need for strict record control. I have moved the rest of the 
strategies you mentioned into the 'Rejected Alternatives' section, and I’ve 
added two examples for reference.

I will continue refining the KIP and do my best to ensure this change is merged 
into Kafka 4.2. Please take another look before starting the vote thread.

Best,
Jimmy Wang

From: Andrew Schofield <[email protected]>
Date: Monday, October 6, 2025 at 23:30
To: [email protected] <[email protected]>
Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch

Hi Jimmy,
Sorry for the long delay responding to the KIP. I think it's important to be
careful with the refinements to KIP-932, so I'm taking my time to make sure
whatever we do is good.

There are good reasons why KIP-932 optimised for batch delivery, but I know
from talking to early users of the share consumer, that it's not always ideal.
I'm strongly in favour of providing a way of limiting the number of records
regardless of the batching, for applications which need it.

There is another improvement that I foresee here and that is for pre-fetching
of records. For example, an application could ask for 500 records, but actually
want to fetch 1000 so that it can overlap the request-response of acknowledging
the first 500 records with the processing of the next 500 already fetched.
However, getting pre-fetching to work in all situations while the acquisition
locks are ticking down is more work. I don't want to introduce pre-fetching in
this KIP.

As a result, I would prefer that this KIP introduces just one new config,
share.acquire.mode. I think we will use share.fetch.max.records or something
like it for prefetching when it's time. I know this contradicts an comment
of mine, but I've changed my mind :)

I would also like to rename "strict" to "record limit" in the config. That
is a better description of the effect.

I hope this makes sense.

Here are my specific comments.

AS3: I would remove "strict" and "loose" from the description. Having
"Strict" in the KIP title is fine, but the modes should be "batch-optimised"
and "record-limit".

AS4: I suggest removing `share.max.fetch.records` from this KIP. It moves us
in the direction of having MaxRecords and BatchSize differing in the ShareFetch
request, and that's intended for pre-fetching when we get around to it.

AS5: I suggest changing "strict" to "record_limit" in the config for
`share.acquire.mode`. Of course, this affects the `ShareAcquireMode` enum too.

AS6: Since "BATCH_OPTIMIZED" is the default for the config, I would prefer
this enum value to be the first in the ShareAcquireMode enum.

AS7: The package and naming of the AcquireMode enum is inconsistent and
incorrect. The internals package is not intended to be public. So, I think
the enum is org.apache.kafka.clients.consumer.ShareAcquireMode.

AS8: In the ShareFetchRequest, I would make the AcquireMode field have
type int8. There's no need to make it string. The values should match the
equivalent enum values.

AS9: In the proposed changes, I suggest an example such as this.

Let's say that the records produced onto a topic-partition each
contain 3 records. Then the batches would have offsets 0-2 inclusive,
3-5 inclusive, 6-8 inclusive, and so on.

Consider a share consumer with `max.poll.records=5` fetching records
from the topic. For batch-optimised mode, it will receive two whole
batches of records, from offsets 0 to 5 inclusive, which is 6 records
in total.

For record-limit mode, it will receive no more than 5 records. The broker
could choose to:

* Return just one entire batch 0-2 inclusive, which is 3 records in total.
* Return two entire batches 0-5 inclusive, but only acquire records 0-4
inclusive. If a batch is "split" like this, the broker has returned record
data which has not been acquired by this consumer.
* Adjust the record batching of the records returned so that a single
batch 0-4 inclusive is returned.
* Or any other strategy which acquires no more than 5 records.


I hope we can get this into Apache Kafka 4.2. I expect we will make it.

Thanks,
Andrew
________________________________
From: Wang Jimmy <[email protected]>
Sent: 10 September 2025 19:16
To: [email protected] <[email protected]>
Subject: RE: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch

Hi Andrew,
Thanks for your comments and sorry for the delayed response.

AS1:
I think your intention is to control the fetched records solely by 
share.max.fetch.records rather than introducing a new acquireMode parameter.
However, the concept of acquireMode is intended to achieve the following things:
Disable batches (only one batch will be returned to the consumer in one fetch 
in strict mode)
Make a distinction from the current broker behavior where the maximum number of 
records is set as a soft limit.

As for the first point, it would be better if we wanted to extend the lock 
timeout on a ”record“ basis rather than on a ”record batches” basis. And as 
Mittal suggested, we assume that the client application cares more about the 
precise count of messages rather than the throughput, so I think it makes sense 
that batching is not allowed in this mode. What do you think?

AS2: I agree with your idea and have changed the client configuration to 
share.max.fetch.records. Thanks for your advice.

Best,
Jimmy Wang

On 2025/09/03 15:46:06 Andrew Schofield wrote:
> Hi Jimmy,
> Thank you for the KIP. I'm sure I'll have more comments yet as
> I think through how it will work in practice, and also the work that
> we are looking to do in the consumer as part of Kafka 4.2 around
> flow control and memory usage.
>
>
> The behaviour in KIP-932 is expecting that the consuming application
> will be able to consume the fetched records in a timely fashion so
> that it does not inadvertently breach the acquisition lock time-out.
> It lets the application limit the amount of memory used for buffered
> records and also limit the number of fetched records. The limit of
> the number of records is applied as a soft limit, meaning that
> complete record batches (as written to the log) will be acquired.
> Providing a way to control the number of records more strictly
> will be useful for some situations, at the expense of throughput.
>
> AS1: I suggest using `share.fetch.max.records` as the way to control
> the maximum number of records. If not specified, you would get what
> you get today, which is a soft limit based on `max.poll.records`.
> If specified, the number of acquired records would not exceed this
> number. The broker would return complete record batches to the
> consumer application (to prevent decompression in the broker to
> split batches), but the number of acquired records would not
> exceed the limit specified.
>
> I suggest `share.fetch.max.records` with the "share." at the start.
> KIP-1199 is looking to introduce a maximum number of records for
> regular fetches. Because the behaviour would be quite different,
> I think it's preferable to have a different configuration
> property.
>
>
> Thanks,
> Andrew
> ________________________________________
> From: Wang Jimmy <[email protected]>
> Sent: 31 August 2025 17:54
> To: [email protected] <[email protected]>
> Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch
>
> Hi Mittal,
> Thanks for your thoughtful feedback on the KIP!
>
> AM1:
> I agree with your point. I have updated the KIP to explain the pros and cons 
> of the “strict” mode.
>
> AM2:
> Surely. After implementing strict mode, share-consumer can leverage 
> max.poll.records ( or fetch.max.records, as mentioned in AM4) to control the 
> fetch rate of shareFetchManager. This prevents scenarios where one consumer 
> fetches too many records while others suffer from starvation, thereby ensure 
> balanced throughput among different consumers.
>
> AM3:
> Thanks for pointing this out, I'll update the document. But I think this KIP 
> won't change behavior of acquisition lock timeout or session timeout, which 
> will stay the same as stated in KIP-932.
>
> AM4a:
> I overlooked this point and I think you are right. In “strict” mode, the 
> share fetch response will contain only one batch, with maximum records upper 
> bounded by max(BatchSize, MaxRecords).
>
> AM4b:
> From my point of view, it would be better to introduce a new 
> max.fetch.records configuration since it has different meaning compared to 
> max.poll.records. Regarding the pre-fetch behavior, regardless of the current 
> implementation for implicit or explicit mode, all records should be 
> acknowledged before sending the next fetch request. To achieve "pre-fetch”, 
> my initial thought is that broker needs to allow the same member in share 
> group to send multiple shareFetch requests, but with an upper bound on the 
> total number of delivered records set to max.fetch.records. I am not quite 
> sure, but I think I could also finish it in this KIP. What do you think?
>
> AM5:
> Since “AcquireMode” is needed for both the share-consumer(as client 
> configuration) and broker(determine the mode used), it should ideally placed 
> in two separate class under core and client module.
>
> Best,
> Jimmy Wang
>
>
> 2025年8月27日 04:01,Apoorv Mittal <[email protected]> 写道:
>
> Hi Jimmy,
> Thanks for the KIP. Please find some comments below:
>
> AM1: The KIP mentions the current behaviour of soft limit but it would be
> helpful to explain the reasoning as well in KIP. Else it seems like the
> "strict" should always be the preferred fetch behaviour. However, that's
> not true. The broker never reads the actual data records, rather sends back
> the batch of records as produced. Hence, say in strict mode the MaxRecords
> is set to 1 but the producer generates a single batch of 5 records on log
> then only 1 record will be acquired but the whole batch of 5 records will
> be sent to the client. This will have higher egress from the broker and
> wasted memory on the client. The strict behaviour is helpful in some
> scenarios but not always.
>
> AM2: When we say "Strict max fetch records enables clients to achieve
> predictable
> throughput", can you please help explain what is meant by it? An example
> could help here.
>
> AM3: As mentioned in the KIP "In scenarios where record processing is
> time-consuming" hence strict mode is advisable. The client connection shall
> be disconnected post session timeout configuration. Hence it means that if
> processing is taking longer than the session timeout then client sessions
> will be dropped and held records will be released. Shall we propose to
> handle the behaviour for such scenarios in the KIP as well?
>
> AM4: Currently, other than max and min bytes, there are 2 other parameters
> in ShareFetch request 1) MaxRecords 2) BatchSize. Both of these share fetch
> params currently use max.poll.records client configuration. Which means
> that a single batch of records will be fetched as per max.poll.records
> client configuration. But the MaxRecords and BatchSize were added because
> of following reasons a) Have some predictable number of records returned
> from broker as the records are backed by acquisition lock timeout, in case
> client takes more time in processing higher number of records b) Generate
> batches so client can "pre-fetch" record batches which can be
> acknowledged individually (batch) rather waiting for all records to be
> processed by client. Pre-fetch needs additional handling in client and
> broker to renew the lock timeout for acquired-waiting record batches in
> client, which currently does not exist. Questions:
>
> AM4-a: What would be the suggested behaviour with "strict" mode and
> BatchSize i.e. shall always only a single batch be allowed to fetch in
> "strict" mode? Or there could be any reason to fetch multiple batches even
> in strict mode? I am assuming, and as KIP mentions as well, applications
> will generally use strict mode when the processing time is higher on
> clients for records, then does it make sense to allow multiple batches?
>
> AM4-b: As defined in the KIP-1199
> <https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-1199%253A%2BAdd%2Bmax%2Brecord%2Bcount%2Blimit%2Bto%2BFetchRequest&data=05%7C02%7C%7C767791c4f14343ca9ffe08ddf09666b7%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C638931250964830686%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=qtauR2Lvc4E82TJBNDUab6qeCNaRDHuKuSVpxGykW3Y%3D&reserved=0<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1199%3A+Add+max+record+count+limit+to+FetchRequest>>,
> there might be a separate config fetch.max.message.count (preferably
> fetch.max.records) which will be used for MaxRecords. Hence, should we
> introduce the fetch.max.records configuration in this KIP for ShareFetch
> and think about how prefetching will work? Or if we want to leave this for
> a separate KIP then do we want to define behaviour for MaxRecords in strict
> mode i.e. should MaxRecords be same as max.poll.records and pre-fetching
> should not be supported?
>
> AM5: AcquireMode is also used by clients so should the enum AcquireMode reside
> in the server module or it should be in the clients module?
>
> Regards,
> Apoorv Mittal
>
>
> On Thu, Aug 21, 2025 at 6:55 PM Wang Jimmy <[email protected]> wrote:
>
> Hello all,
> I would like to start a discussion on KIP-1206: Strict max fetch records
> in share fetch.
> This KIP introduces the AcquireMode in ShareFetchRequest, which provides
> two options: Strict or loose.  When strict mode is selected, we should only
> acquire records till maxFetchRecords.
>
> KIP:
> https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-1206%3A%2BStrict%2Bmax%2Bfetch%2Brecords%2Bin%2Bshare%2Bfetch&data=05%7C02%7C%7C767791c4f14343ca9ffe08ddf09666b7%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C638931250964867929%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=82ysUJA8aeaKXEqV5Ck8HFxGS6XB0PrmNGeCmD0cQT8%3D&reserved=0<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch>
>
> Thanks,
> Jimmy Wang
>
>

Reply via email to