Thanks for the answers and updates Jimmy, just one last question:

LM4: Under this new record_limit fetch mode, the fetch.max.bytes
client config would not be honoured, correct? (and it's always the
max.poll.records that approximately defines how much data we may
include in a single share fetch response).

Thanks!
Lianet

On Mon, Oct 13, 2025 at 4:20 AM Andrew Schofield
<[email protected]> wrote:
>
> Hi Jimmy,
> Thanks for the updates.
>
> AS15: I see that you have used the GroupProtocol class as your inspiration
> for the ShareAcquireMode. It is still the case that the documentation for the
> group.protocol config uses lower-case values. I would prefer that the
> documentation for share.acquire.mode also used lower-case “batch_optimized”
> and “record_limit”. You should apply case-insensitive matching in the code
> and the way that GroupProtocol does it is fine. In summary, apart from the
> documentation of the values for the share.acquire.mode which I think should
> be lower-case for consistency with other string-based enum configs,
> the description in the KIP is good.
>
>
> Apart from this small detail, it’s ready for voting from my point of view.
>
> Thanks,
> Andrew
>
> > On 10 Oct 2025, at 18:33, Wang Jimmy <[email protected]> wrote:
> >
> > Hi Andrew,
> > Thanks for your feedback!
> >
> > AS10:
> > You’re right. I have replaced the max.fetch.records with max.poll.records.
> > AS11:
> > Hmm, my intention was to illustrate the potential features that could be 
> > enabled after implementing this KIP, such as better handling poison 
> > records. However, I agree with your point and have removed that part.
> > AS12 - AS14:
> > Thank you for your suggestions. I have made the corresponding changes based 
> > on your feedback.
> > AS15:
> > I understand what you mean, but I was referring to the GroupProtocol enum 
> > class. There may be some differences in the implementation, but I think 
> > both approaches are acceptable, so I have made the changes.  Additionally, 
> > I have also renamed all  AcquireMode variables to ShareAcquireMode for 
> > alignment (including the ShareFetchRequest schema).
> >
> > Thanks again for your review. Please feel free to take another look when 
> > you have time.
> >
> > Best,
> > Jimmy Wang
> >
> > From: Andrew Schofield <[email protected]>
> > Date: Thursday, October 9, 2025 at 21:59
> > To: [email protected] <[email protected]>
> > Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch
> >
> > Hi Jimmy,
> > Thanks for the updates. Just a few minor comments remain, but it's nearly
> > ready.
> >
> > AS10: There is no max.fetch.records parameter. I think you mean
> > max.poll.records. There might be share.max.fetch.records in the future
> > but not yet.
> >
> > AS11: I don't understand the point about poison records. The fix to
> > that issue will likely be for the broker independently to decide to
> > acquire only a subset of the records in a batch to ensure that any
> > bad records failing delivery do not impact the delivery count of
> > neighbouring records. This KIP is not required for that.
> >
> > AS12: On line 30 of the ShareFetchRequest schema, please put the
> > values for the AcquireMode in the about string such as
> > "The acquire mode to control the fetch behavior: 0 - batch-optimized, 1 - 
> > record-limit"
> > This will end up in the protocol documentation automatically so
> > having the values makes the documentation more complete.
> >
> > AS13: You should not include the details of the SharePartitionManager
> > or ShareFetch classes in the KIP. The KIP is a specification of the
> > protocol and public programming interfaces only.
> >
> > AS14: Thanks for adding the examples. These are just illustrations
> > of some permitted behaviour, but the broker is only required to
> > keep within the limits specified in the ShareFetch request. As a result,
> > your rejected alternative of "Only one entire batch...." is actually
> > incorrect. The broker could do that and the KIP is not prescriptive about
> > the details of broker behaviour. So, please remove this rejected 
> > alternative.
> >
> > AS15: The standard for string config values in Kafka is snake_case.
> > You are adding a new enum called ShareAcquireMode (in some places you've
> > called it AcquireMode). The values should be snake_case, such as
> > "batch_optimized", although the enum names would be capitalised like
> > BATCH_OPTIMIZED. Then the description of the values for the config
> > would also be snake_case.
> >
> > If you look at auto.offset.reset consumer config and the
> > related AutoOffsetResetStrategy class you'll see what I mean.
> >
> >
> > Thanks,
> > Andrew
> >
> > ________________________________________
> > From:&nbsp;Wang Jimmy
> > Sent:&nbsp;09 October 2025 12:25
> > To:&nbsp;[email protected]
> > Subject:&nbsp;Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share 
> > fetch
> > &nbsp;
> > Hi Andrew,
> > Thanks for reading and responding to the KIP!
> >
> > AS3 - AS8:
> > &nbsp;All these comments make sense to me, I have updated the KIP according 
> > to your suggestions.
> >
> > AS9:
> > &gt; Return two entire batches 0-5 inclusive, but only acquire records 0-4
> > &gt; inclusive. If a batch is "split" like this, the broker has returned 
> > record
> > &gt; data which has not been acquired by this consumer.
> >
> > From my point of view, the method mentioned above is the best way to 
> > implement the record-limit mode in Kafka. Although this will introduce 
> > overhead on either the client or server side, I believe it represents a 
> > necessary trade-off between the current Kafka log segment structure (which 
> > relies on producer batches) and the need for strict record control. I have 
> > moved the rest of the strategies you mentioned into the 'Rejected 
> > Alternatives' section, and I’ve added two examples for reference.
> >
> > I will continue refining the KIP and do my best to ensure this change is 
> > merged into Kafka 4.2. Please take another look before starting the vote 
> > thread.
> >
> > Best,
> > Jimmy Wang
> >
> > From: Andrew Schofield
> > Date: Monday, October 6, 2025 at 23:30
> > To: [email protected]
> > Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch
> >
> > Hi Jimmy,
> > Sorry for the long delay responding to the KIP. I think it's important to be
> > careful with the refinements to KIP-932, so I'm taking my time to make sure
> > whatever we do is good.
> >
> > There are good reasons why KIP-932 optimised for batch delivery, but I know
> > from talking to early users of the share consumer, that it's not always 
> > ideal.
> > I'm strongly in favour of providing a way of limiting the number of records
> > regardless of the batching, for applications which need it.
> >
> > There is another improvement that I foresee here and that is for 
> > pre-fetching
> > of records. For example, an application could ask for 500 records, but 
> > actually
> > want to fetch 1000 so that it can overlap the request-response of 
> > acknowledging
> > the first 500 records with the processing of the next 500 already fetched.
> > However, getting pre-fetching to work in all situations while the 
> > acquisition
> > locks are ticking down is more work. I don't want to introduce pre-fetching 
> > in
> > this KIP.
> >
> > As a result, I would prefer that this KIP introduces just one new config,
> > share.acquire.mode. I think we will use share.fetch.max.records or something
> > like it for prefetching when it's time. I know this contradicts an comment
> > of mine, but I've changed my mind :)
> >
> > I would also like to rename "strict" to "record limit" in the config. That
> > is a better description of the effect.
> >
> > I hope this makes sense.
> >
> > Here are my specific comments.
> >
> > AS3: I would remove "strict" and "loose" from the description. Having
> > "Strict" in the KIP title is fine, but the modes should be "batch-optimised"
> > and "record-limit".
> >
> > AS4: I suggest removing `share.max.fetch.records` from this KIP. It moves us
> > in the direction of having MaxRecords and BatchSize differing in the 
> > ShareFetch
> > request, and that's intended for pre-fetching when we get around to it.
> >
> > AS5: I suggest changing "strict" to "record_limit" in the config for
> > `share.acquire.mode`. Of course, this affects the `ShareAcquireMode` enum 
> > too.
> >
> > AS6: Since "BATCH_OPTIMIZED" is the default for the config, I would prefer
> > this enum value to be the first in the ShareAcquireMode enum.
> >
> > AS7: The package and naming of the AcquireMode enum is inconsistent and
> > incorrect. The internals package is not intended to be public. So, I think
> > the enum is org.apache.kafka.clients.consumer.ShareAcquireMode.
> >
> > AS8: In the ShareFetchRequest, I would make the AcquireMode field have
> > type int8. There's no need to make it string. The values should match the
> > equivalent enum values.
> >
> > AS9: In the proposed changes, I suggest an example such as this.
> >
> > Let's say that the records produced onto a topic-partition each
> > contain 3 records. Then the batches would have offsets 0-2 inclusive,
> > 3-5 inclusive, 6-8 inclusive, and so on.
> >
> > Consider a share consumer with `max.poll.records=5` fetching records
> > from the topic. For batch-optimised mode, it will receive two whole
> > batches of records, from offsets 0 to 5 inclusive, which is 6 records
> > in total.
> >
> > For record-limit mode, it will receive no more than 5 records. The broker
> > could choose to:
> >
> > * Return just one entire batch 0-2 inclusive, which is 3 records in total.
> > * Return two entire batches 0-5 inclusive, but only acquire records 0-4
> > inclusive. If a batch is "split" like this, the broker has returned record
> > data which has not been acquired by this consumer.
> > * Adjust the record batching of the records returned so that a single
> > batch 0-4 inclusive is returned.
> > * Or any other strategy which acquires no more than 5 records.
> >
> >
> > I hope we can get this into Apache Kafka 4.2. I expect we will make it.
> >
> > Thanks,
> > Andrew
> > ________________________________
> > From: Wang Jimmy
> > Sent: 10 September 2025 19:16
> > To: [email protected]
> > Subject: RE: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch
> >
> > Hi Andrew,
> > Thanks for your comments and sorry for the delayed response.
> >
> > AS1:
> > I think your intention is to control the fetched records solely by 
> > share.max.fetch.records rather than introducing a new acquireMode parameter.
> > However, the concept of acquireMode is intended to achieve the following 
> > things:
> > Disable batches (only one batch will be returned to the consumer in one 
> > fetch in strict mode)
> > Make a distinction from the current broker behavior where the maximum 
> > number of records is set as a soft limit.
> >
> > As for the first point, it would be better if we wanted to extend the lock 
> > timeout on a ”record“ basis rather than on a ”record batches” basis. And as 
> > Mittal suggested, we assume that the client application cares more about 
> > the precise count of messages rather than the throughput, so I think it 
> > makes sense that batching is not allowed in this mode. What do you think?
> >
> > AS2: I agree with your idea and have changed the client configuration to 
> > share.max.fetch.records. Thanks for your advice.
> >
> > Best,
> > Jimmy Wang
> >
> > On 2025/09/03 15:46:06 Andrew Schofield wrote:
> > &gt; Hi Jimmy,
> > &gt; Thank you for the KIP. I'm sure I'll have more comments yet as
> > &gt; I think through how it will work in practice, and also the work that
> > &gt; we are looking to do in the consumer as part of Kafka 4.2 around
> > &gt; flow control and memory usage.
> > &gt;
> > &gt;
> > &gt; The behaviour in KIP-932 is expecting that the consuming application
> > &gt; will be able to consume the fetched records in a timely fashion so
> > &gt; that it does not inadvertently breach the acquisition lock time-out.
> > &gt; It lets the application limit the amount of memory used for buffered
> > &gt; records and also limit the number of fetched records. The limit of
> > &gt; the number of records is applied as a soft limit, meaning that
> > &gt; complete record batches (as written to the log) will be acquired.
> > &gt; Providing a way to control the number of records more strictly
> > &gt; will be useful for some situations, at the expense of throughput.
> > &gt;
> > &gt; AS1: I suggest using `share.fetch.max.records` as the way to control
> > &gt; the maximum number of records. If not specified, you would get what
> > &gt; you get today, which is a soft limit based on `max.poll.records`.
> > &gt; If specified, the number of acquired records would not exceed this
> > &gt; number. The broker would return complete record batches to the
> > &gt; consumer application (to prevent decompression in the broker to
> > &gt; split batches), but the number of acquired records would not
> > &gt; exceed the limit specified.
> > &gt;
> > &gt; I suggest `share.fetch.max.records` with the "share." at the start.
> > &gt; KIP-1199 is looking to introduce a maximum number of records for
> > &gt; regular fetches. Because the behaviour would be quite different,
> > &gt; I think it's preferable to have a different configuration
> > &gt; property.
> > &gt;
> > &gt;
> > &gt; Thanks,
> > &gt; Andrew
> > &gt; ________________________________________
> > &gt; From: Wang Jimmy
> > &gt; Sent: 31 August 2025 17:54
> > &gt; To: [email protected]
> > &gt; Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share 
> > fetch
> > &gt;
> > &gt; Hi Mittal,
> > &gt; Thanks for your thoughtful feedback on the KIP!
> > &gt;
> > &gt; AM1:
> > &gt; I agree with your point. I have updated the KIP to explain the pros 
> > and cons of the “strict” mode.
> > &gt;
> > &gt; AM2:
> > &gt; Surely. After implementing strict mode, share-consumer can leverage 
> > max.poll.records ( or fetch.max.records, as mentioned in AM4) to control 
> > the fetch rate of shareFetchManager. This prevents scenarios where one 
> > consumer fetches too many records while others suffer from starvation, 
> > thereby ensure balanced throughput among different consumers.
> > &gt;
> > &gt; AM3:
> > &gt; Thanks for pointing this out, I'll update the document. But I think 
> > this KIP won't change behavior of acquisition lock timeout or session 
> > timeout, which will stay the same as stated in KIP-932.
> > &gt;
> > &gt; AM4a:
> > &gt; I overlooked this point and I think you are right. In “strict” mode, 
> > the share fetch response will contain only one batch, with maximum records 
> > upper bounded by max(BatchSize, MaxRecords).
> > &gt;
> > &gt; AM4b:
> > &gt; From my point of view, it would be better to introduce a new 
> > max.fetch.records configuration since it has different meaning compared to 
> > max.poll.records. Regarding the pre-fetch behavior, regardless of the 
> > current implementation for implicit or explicit mode, all records should be 
> > acknowledged before sending the next fetch request. To achieve "pre-fetch”, 
> > my initial thought is that broker needs to allow the same member in share 
> > group to send multiple shareFetch requests, but with an upper bound on the 
> > total number of delivered records set to max.fetch.records. I am not quite 
> > sure, but I think I could also finish it in this KIP. What do you think?
> > &gt;
> > &gt; AM5:
> > &gt; Since “AcquireMode” is needed for both the share-consumer(as client 
> > configuration) and broker(determine the mode used), it should ideally 
> > placed in two separate class under core and client module.
> > &gt;
> > &gt; Best,
> > &gt; Jimmy Wang
> > &gt;
> > &gt;
> > &gt; 2025年8月27日 04:01,Apoorv Mittal 写道:
> > &gt;
> > &gt; Hi Jimmy,
> > &gt; Thanks for the KIP. Please find some comments below:
> > &gt;
> > &gt; AM1: The KIP mentions the current behaviour of soft limit but it would 
> > be
> > &gt; helpful to explain the reasoning as well in KIP. Else it seems like the
> > &gt; "strict" should always be the preferred fetch behaviour. However, 
> > that's
> > &gt; not true. The broker never reads the actual data records, rather sends 
> > back
> > &gt; the batch of records as produced. Hence, say in strict mode the 
> > MaxRecords
> > &gt; is set to 1 but the producer generates a single batch of 5 records on 
> > log
> > &gt; then only 1 record will be acquired but the whole batch of 5 records 
> > will
> > &gt; be sent to the client. This will have higher egress from the broker and
> > &gt; wasted memory on the client. The strict behaviour is helpful in some
> > &gt; scenarios but not always.
> > &gt;
> > &gt; AM2: When we say "Strict max fetch records enables clients to achieve
> > &gt; predictable
> > &gt; throughput", can you please help explain what is meant by it? An 
> > example
> > &gt; could help here.
> > &gt;
> > &gt; AM3: As mentioned in the KIP "In scenarios where record processing is
> > &gt; time-consuming" hence strict mode is advisable. The client connection 
> > shall
> > &gt; be disconnected post session timeout configuration. Hence it means 
> > that if
> > &gt; processing is taking longer than the session timeout then client 
> > sessions
> > &gt; will be dropped and held records will be released. Shall we propose to
> > &gt; handle the behaviour for such scenarios in the KIP as well?
> > &gt;
> > &gt; AM4: Currently, other than max and min bytes, there are 2 other 
> > parameters
> > &gt; in ShareFetch request 1) MaxRecords 2) BatchSize. Both of these share 
> > fetch
> > &gt; params currently use max.poll.records client configuration. Which means
> > &gt; that a single batch of records will be fetched as per max.poll.records
> > &gt; client configuration. But the MaxRecords and BatchSize were added 
> > because
> > &gt; of following reasons a) Have some predictable number of records 
> > returned
> > &gt; from broker as the records are backed by acquisition lock timeout, in 
> > case
> > &gt; client takes more time in processing higher number of records b) 
> > Generate
> > &gt; batches so client can "pre-fetch" record batches which can be
> > &gt; acknowledged individually (batch) rather waiting for all records to be
> > &gt; processed by client. Pre-fetch needs additional handling in client and
> > &gt; broker to renew the lock timeout for acquired-waiting record batches in
> > &gt; client, which currently does not exist. Questions:
> > &gt;
> > &gt; AM4-a: What would be the suggested behaviour with "strict" mode and
> > &gt; BatchSize i.e. shall always only a single batch be allowed to fetch in
> > &gt; "strict" mode? Or there could be any reason to fetch multiple batches 
> > even
> > &gt; in strict mode? I am assuming, and as KIP mentions as well, 
> > applications
> > &gt; will generally use strict mode when the processing time is higher on
> > &gt; clients for records, then does it make sense to allow multiple batches?
> > &gt;
> > &gt; AM4-b: As defined in the KIP-1199
> > &gt; &gt;,
> > &gt; there might be a separate config fetch.max.message.count (preferably
> > &gt; fetch.max.records) which will be used for MaxRecords. Hence, should we
> > &gt; introduce the fetch.max.records configuration in this KIP for 
> > ShareFetch
> > &gt; and think about how prefetching will work? Or if we want to leave this 
> > for
> > &gt; a separate KIP then do we want to define behaviour for MaxRecords in 
> > strict
> > &gt; mode i.e. should MaxRecords be same as max.poll.records and 
> > pre-fetching
> > &gt; should not be supported?
> > &gt;
> > &gt; AM5: AcquireMode is also used by clients so should the enum 
> > AcquireMode reside
> > &gt; in the server module or it should be in the clients module?
> > &gt;
> > &gt; Regards,
> > &gt; Apoorv Mittal
> > &gt;
> > &gt;
> > &gt; On Thu, Aug 21, 2025 at 6:55 PM Wang Jimmy wrote:
> > &gt;
> > &gt; Hello all,
> > &gt; I would like to start a discussion on KIP-1206: Strict max fetch 
> > records
> > &gt; in share fetch.
> > &gt; This KIP introduces the AcquireMode in ShareFetchRequest, which 
> > provides
> > &gt; two options: Strict or loose.&nbsp; When strict mode is selected, we 
> > should only
> > &gt; acquire records till maxFetchRecords.
> > &gt;
> > &gt; KIP:
> > &gt; 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch
> > &gt;
> > &gt; Thanks,
> > &gt; Jimmy Wang
> > &gt;
> > &gt;
>

Reply via email to