Hi Andrew,
Thanks for your comments and sorry for the delayed response.

AS1: 
I think your intention is to control the fetched records solely by 
share.max.fetch.records rather than introducing a new acquireMode parameter. 
However, the concept of acquireMode is intended to achieve the following things:
Disable batches (only one batch will be returned to the consumer in one fetch 
in strict mode)
Make a distinction from the current broker behavior where the maximum number of 
records is set as a soft limit.

As for the first point, it would be better if we wanted to extend the lock 
timeout on a ”record“ basis rather than on a ”record batches” basis. And as 
Mittal suggested, we assume that the client application cares more about the 
precise count of messages rather than the throughput, so I think it makes sense 
that batching is not allowed in this mode. What do you think?

AS2: I agree with your idea and have changed the client configuration to 
share.max.fetch.records. Thanks for your advice.

Best,
Jimmy Wang

On 2025/09/03 15:46:06 Andrew Schofield wrote:
> Hi Jimmy,
> Thank you for the KIP. I'm sure I'll have more comments yet as
> I think through how it will work in practice, and also the work that
> we are looking to do in the consumer as part of Kafka 4.2 around
> flow control and memory usage.
> 
> 
> The behaviour in KIP-932 is expecting that the consuming application
> will be able to consume the fetched records in a timely fashion so
> that it does not inadvertently breach the acquisition lock time-out.
> It lets the application limit the amount of memory used for buffered
> records and also limit the number of fetched records. The limit of
> the number of records is applied as a soft limit, meaning that
> complete record batches (as written to the log) will be acquired.
> Providing a way to control the number of records more strictly
> will be useful for some situations, at the expense of throughput.
> 
> AS1: I suggest using `share.fetch.max.records` as the way to control
> the maximum number of records. If not specified, you would get what
> you get today, which is a soft limit based on `max.poll.records`.
> If specified, the number of acquired records would not exceed this
> number. The broker would return complete record batches to the
> consumer application (to prevent decompression in the broker to
> split batches), but the number of acquired records would not
> exceed the limit specified.
> 
> I suggest `share.fetch.max.records` with the "share." at the start.
> KIP-1199 is looking to introduce a maximum number of records for
> regular fetches. Because the behaviour would be quite different,
> I think it's preferable to have a different configuration
> property.
> 
> 
> Thanks,
> Andrew
> ________________________________________
> From: Wang Jimmy <[email protected]>
> Sent: 31 August 2025 17:54
> To: [email protected] <[email protected]>
> Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch
> 
> Hi Mittal,
> Thanks for your thoughtful feedback on the KIP!
> 
> AM1:
> I agree with your point. I have updated the KIP to explain the pros and cons 
> of the “strict” mode.
> 
> AM2:
> Surely. After implementing strict mode, share-consumer can leverage 
> max.poll.records ( or fetch.max.records, as mentioned in AM4) to control the 
> fetch rate of shareFetchManager. This prevents scenarios where one consumer 
> fetches too many records while others suffer from starvation, thereby ensure 
> balanced throughput among different consumers.
> 
> AM3:
> Thanks for pointing this out, I'll update the document. But I think this KIP 
> won't change behavior of acquisition lock timeout or session timeout, which 
> will stay the same as stated in KIP-932.
> 
> AM4a:
> I overlooked this point and I think you are right. In “strict” mode, the 
> share fetch response will contain only one batch, with maximum records upper 
> bounded by max(BatchSize, MaxRecords).
> 
> AM4b:
> From my point of view, it would be better to introduce a new 
> max.fetch.records configuration since it has different meaning compared to 
> max.poll.records. Regarding the pre-fetch behavior, regardless of the current 
> implementation for implicit or explicit mode, all records should be 
> acknowledged before sending the next fetch request. To achieve "pre-fetch”, 
> my initial thought is that broker needs to allow the same member in share 
> group to send multiple shareFetch requests, but with an upper bound on the 
> total number of delivered records set to max.fetch.records. I am not quite 
> sure, but I think I could also finish it in this KIP. What do you think?
> 
> AM5:
> Since “AcquireMode” is needed for both the share-consumer(as client 
> configuration) and broker(determine the mode used), it should ideally placed 
> in two separate class under core and client module.
> 
> Best,
> Jimmy Wang
> 
> 
> 2025年8月27日 04:01,Apoorv Mittal <[email protected]> 写道:
> 
> Hi Jimmy,
> Thanks for the KIP. Please find some comments below:
> 
> AM1: The KIP mentions the current behaviour of soft limit but it would be
> helpful to explain the reasoning as well in KIP. Else it seems like the
> "strict" should always be the preferred fetch behaviour. However, that's
> not true. The broker never reads the actual data records, rather sends back
> the batch of records as produced. Hence, say in strict mode the MaxRecords
> is set to 1 but the producer generates a single batch of 5 records on log
> then only 1 record will be acquired but the whole batch of 5 records will
> be sent to the client. This will have higher egress from the broker and
> wasted memory on the client. The strict behaviour is helpful in some
> scenarios but not always.
> 
> AM2: When we say "Strict max fetch records enables clients to achieve
> predictable
> throughput", can you please help explain what is meant by it? An example
> could help here.
> 
> AM3: As mentioned in the KIP "In scenarios where record processing is
> time-consuming" hence strict mode is advisable. The client connection shall
> be disconnected post session timeout configuration. Hence it means that if
> processing is taking longer than the session timeout then client sessions
> will be dropped and held records will be released. Shall we propose to
> handle the behaviour for such scenarios in the KIP as well?
> 
> AM4: Currently, other than max and min bytes, there are 2 other parameters
> in ShareFetch request 1) MaxRecords 2) BatchSize. Both of these share fetch
> params currently use max.poll.records client configuration. Which means
> that a single batch of records will be fetched as per max.poll.records
> client configuration. But the MaxRecords and BatchSize were added because
> of following reasons a) Have some predictable number of records returned
> from broker as the records are backed by acquisition lock timeout, in case
> client takes more time in processing higher number of records b) Generate
> batches so client can "pre-fetch" record batches which can be
> acknowledged individually (batch) rather waiting for all records to be
> processed by client. Pre-fetch needs additional handling in client and
> broker to renew the lock timeout for acquired-waiting record batches in
> client, which currently does not exist. Questions:
> 
> AM4-a: What would be the suggested behaviour with "strict" mode and
> BatchSize i.e. shall always only a single batch be allowed to fetch in
> "strict" mode? Or there could be any reason to fetch multiple batches even
> in strict mode? I am assuming, and as KIP mentions as well, applications
> will generally use strict mode when the processing time is higher on
> clients for records, then does it make sense to allow multiple batches?
> 
> AM4-b: As defined in the KIP-1199
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-1199%3A+Add+max+record+count+limit+to+FetchRequest>,
> there might be a separate config fetch.max.message.count (preferably
> fetch.max.records) which will be used for MaxRecords. Hence, should we
> introduce the fetch.max.records configuration in this KIP for ShareFetch
> and think about how prefetching will work? Or if we want to leave this for
> a separate KIP then do we want to define behaviour for MaxRecords in strict
> mode i.e. should MaxRecords be same as max.poll.records and pre-fetching
> should not be supported?
> 
> AM5: AcquireMode is also used by clients so should the enum AcquireMode reside
> in the server module or it should be in the clients module?
> 
> Regards,
> Apoorv Mittal
> 
> 
> On Thu, Aug 21, 2025 at 6:55 PM Wang Jimmy <[email protected]> wrote:
> 
> Hello all,
> I would like to start a discussion on KIP-1206: Strict max fetch records
> in share fetch.
> This KIP introduces the AcquireMode in ShareFetchRequest, which provides
> two options: Strict or loose.  When strict mode is selected, we should only
> acquire records till maxFetchRecords.
> 
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch
> 
> Thanks,
> Jimmy Wang
> 
> 

Reply via email to