Thanks for the answers and updates Jimmy, just one last question: LM4: Under this new record_limit fetch mode, the fetch.max.bytes client config would not be honoured, correct? (and it's always the max.poll.records that approximately defines how much data we may include in a single share fetch response).
Thanks! Lianet On Mon, Oct 13, 2025 at 4:20 AM Andrew Schofield <[email protected]> wrote: > > Hi Jimmy, > Thanks for the updates. > > AS15: I see that you have used the GroupProtocol class as your inspiration > for the ShareAcquireMode. It is still the case that the documentation for the > group.protocol config uses lower-case values. I would prefer that the > documentation for share.acquire.mode also used lower-case “batch_optimized” > and “record_limit”. You should apply case-insensitive matching in the code > and the way that GroupProtocol does it is fine. In summary, apart from the > documentation of the values for the share.acquire.mode which I think should > be lower-case for consistency with other string-based enum configs, > the description in the KIP is good. > > > Apart from this small detail, it’s ready for voting from my point of view. > > Thanks, > Andrew > > > On 10 Oct 2025, at 18:33, Wang Jimmy <[email protected]> wrote: > > > > Hi Andrew, > > Thanks for your feedback! > > > > AS10: > > You’re right. I have replaced the max.fetch.records with max.poll.records. > > AS11: > > Hmm, my intention was to illustrate the potential features that could be > > enabled after implementing this KIP, such as better handling poison > > records. However, I agree with your point and have removed that part. > > AS12 - AS14: > > Thank you for your suggestions. I have made the corresponding changes based > > on your feedback. > > AS15: > > I understand what you mean, but I was referring to the GroupProtocol enum > > class. There may be some differences in the implementation, but I think > > both approaches are acceptable, so I have made the changes. Additionally, > > I have also renamed all AcquireMode variables to ShareAcquireMode for > > alignment (including the ShareFetchRequest schema). > > > > Thanks again for your review. Please feel free to take another look when > > you have time. > > > > Best, > > Jimmy Wang > > > > From: Andrew Schofield <[email protected]> > > Date: Thursday, October 9, 2025 at 21:59 > > To: [email protected] <[email protected]> > > Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch > > > > Hi Jimmy, > > Thanks for the updates. Just a few minor comments remain, but it's nearly > > ready. > > > > AS10: There is no max.fetch.records parameter. I think you mean > > max.poll.records. There might be share.max.fetch.records in the future > > but not yet. > > > > AS11: I don't understand the point about poison records. The fix to > > that issue will likely be for the broker independently to decide to > > acquire only a subset of the records in a batch to ensure that any > > bad records failing delivery do not impact the delivery count of > > neighbouring records. This KIP is not required for that. > > > > AS12: On line 30 of the ShareFetchRequest schema, please put the > > values for the AcquireMode in the about string such as > > "The acquire mode to control the fetch behavior: 0 - batch-optimized, 1 - > > record-limit" > > This will end up in the protocol documentation automatically so > > having the values makes the documentation more complete. > > > > AS13: You should not include the details of the SharePartitionManager > > or ShareFetch classes in the KIP. The KIP is a specification of the > > protocol and public programming interfaces only. > > > > AS14: Thanks for adding the examples. These are just illustrations > > of some permitted behaviour, but the broker is only required to > > keep within the limits specified in the ShareFetch request. As a result, > > your rejected alternative of "Only one entire batch...." is actually > > incorrect. The broker could do that and the KIP is not prescriptive about > > the details of broker behaviour. So, please remove this rejected > > alternative. > > > > AS15: The standard for string config values in Kafka is snake_case. > > You are adding a new enum called ShareAcquireMode (in some places you've > > called it AcquireMode). The values should be snake_case, such as > > "batch_optimized", although the enum names would be capitalised like > > BATCH_OPTIMIZED. Then the description of the values for the config > > would also be snake_case. > > > > If you look at auto.offset.reset consumer config and the > > related AutoOffsetResetStrategy class you'll see what I mean. > > > > > > Thanks, > > Andrew > > > > ________________________________________ > > From: Wang Jimmy > > Sent: 09 October 2025 12:25 > > To: [email protected] > > Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share > > fetch > > > > Hi Andrew, > > Thanks for reading and responding to the KIP! > > > > AS3 - AS8: > > All these comments make sense to me, I have updated the KIP according > > to your suggestions. > > > > AS9: > > > Return two entire batches 0-5 inclusive, but only acquire records 0-4 > > > inclusive. If a batch is "split" like this, the broker has returned > > record > > > data which has not been acquired by this consumer. > > > > From my point of view, the method mentioned above is the best way to > > implement the record-limit mode in Kafka. Although this will introduce > > overhead on either the client or server side, I believe it represents a > > necessary trade-off between the current Kafka log segment structure (which > > relies on producer batches) and the need for strict record control. I have > > moved the rest of the strategies you mentioned into the 'Rejected > > Alternatives' section, and I’ve added two examples for reference. > > > > I will continue refining the KIP and do my best to ensure this change is > > merged into Kafka 4.2. Please take another look before starting the vote > > thread. > > > > Best, > > Jimmy Wang > > > > From: Andrew Schofield > > Date: Monday, October 6, 2025 at 23:30 > > To: [email protected] > > Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch > > > > Hi Jimmy, > > Sorry for the long delay responding to the KIP. I think it's important to be > > careful with the refinements to KIP-932, so I'm taking my time to make sure > > whatever we do is good. > > > > There are good reasons why KIP-932 optimised for batch delivery, but I know > > from talking to early users of the share consumer, that it's not always > > ideal. > > I'm strongly in favour of providing a way of limiting the number of records > > regardless of the batching, for applications which need it. > > > > There is another improvement that I foresee here and that is for > > pre-fetching > > of records. For example, an application could ask for 500 records, but > > actually > > want to fetch 1000 so that it can overlap the request-response of > > acknowledging > > the first 500 records with the processing of the next 500 already fetched. > > However, getting pre-fetching to work in all situations while the > > acquisition > > locks are ticking down is more work. I don't want to introduce pre-fetching > > in > > this KIP. > > > > As a result, I would prefer that this KIP introduces just one new config, > > share.acquire.mode. I think we will use share.fetch.max.records or something > > like it for prefetching when it's time. I know this contradicts an comment > > of mine, but I've changed my mind :) > > > > I would also like to rename "strict" to "record limit" in the config. That > > is a better description of the effect. > > > > I hope this makes sense. > > > > Here are my specific comments. > > > > AS3: I would remove "strict" and "loose" from the description. Having > > "Strict" in the KIP title is fine, but the modes should be "batch-optimised" > > and "record-limit". > > > > AS4: I suggest removing `share.max.fetch.records` from this KIP. It moves us > > in the direction of having MaxRecords and BatchSize differing in the > > ShareFetch > > request, and that's intended for pre-fetching when we get around to it. > > > > AS5: I suggest changing "strict" to "record_limit" in the config for > > `share.acquire.mode`. Of course, this affects the `ShareAcquireMode` enum > > too. > > > > AS6: Since "BATCH_OPTIMIZED" is the default for the config, I would prefer > > this enum value to be the first in the ShareAcquireMode enum. > > > > AS7: The package and naming of the AcquireMode enum is inconsistent and > > incorrect. The internals package is not intended to be public. So, I think > > the enum is org.apache.kafka.clients.consumer.ShareAcquireMode. > > > > AS8: In the ShareFetchRequest, I would make the AcquireMode field have > > type int8. There's no need to make it string. The values should match the > > equivalent enum values. > > > > AS9: In the proposed changes, I suggest an example such as this. > > > > Let's say that the records produced onto a topic-partition each > > contain 3 records. Then the batches would have offsets 0-2 inclusive, > > 3-5 inclusive, 6-8 inclusive, and so on. > > > > Consider a share consumer with `max.poll.records=5` fetching records > > from the topic. For batch-optimised mode, it will receive two whole > > batches of records, from offsets 0 to 5 inclusive, which is 6 records > > in total. > > > > For record-limit mode, it will receive no more than 5 records. The broker > > could choose to: > > > > * Return just one entire batch 0-2 inclusive, which is 3 records in total. > > * Return two entire batches 0-5 inclusive, but only acquire records 0-4 > > inclusive. If a batch is "split" like this, the broker has returned record > > data which has not been acquired by this consumer. > > * Adjust the record batching of the records returned so that a single > > batch 0-4 inclusive is returned. > > * Or any other strategy which acquires no more than 5 records. > > > > > > I hope we can get this into Apache Kafka 4.2. I expect we will make it. > > > > Thanks, > > Andrew > > ________________________________ > > From: Wang Jimmy > > Sent: 10 September 2025 19:16 > > To: [email protected] > > Subject: RE: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch > > > > Hi Andrew, > > Thanks for your comments and sorry for the delayed response. > > > > AS1: > > I think your intention is to control the fetched records solely by > > share.max.fetch.records rather than introducing a new acquireMode parameter. > > However, the concept of acquireMode is intended to achieve the following > > things: > > Disable batches (only one batch will be returned to the consumer in one > > fetch in strict mode) > > Make a distinction from the current broker behavior where the maximum > > number of records is set as a soft limit. > > > > As for the first point, it would be better if we wanted to extend the lock > > timeout on a ”record“ basis rather than on a ”record batches” basis. And as > > Mittal suggested, we assume that the client application cares more about > > the precise count of messages rather than the throughput, so I think it > > makes sense that batching is not allowed in this mode. What do you think? > > > > AS2: I agree with your idea and have changed the client configuration to > > share.max.fetch.records. Thanks for your advice. > > > > Best, > > Jimmy Wang > > > > On 2025/09/03 15:46:06 Andrew Schofield wrote: > > > Hi Jimmy, > > > Thank you for the KIP. I'm sure I'll have more comments yet as > > > I think through how it will work in practice, and also the work that > > > we are looking to do in the consumer as part of Kafka 4.2 around > > > flow control and memory usage. > > > > > > > > > The behaviour in KIP-932 is expecting that the consuming application > > > will be able to consume the fetched records in a timely fashion so > > > that it does not inadvertently breach the acquisition lock time-out. > > > It lets the application limit the amount of memory used for buffered > > > records and also limit the number of fetched records. The limit of > > > the number of records is applied as a soft limit, meaning that > > > complete record batches (as written to the log) will be acquired. > > > Providing a way to control the number of records more strictly > > > will be useful for some situations, at the expense of throughput. > > > > > > AS1: I suggest using `share.fetch.max.records` as the way to control > > > the maximum number of records. If not specified, you would get what > > > you get today, which is a soft limit based on `max.poll.records`. > > > If specified, the number of acquired records would not exceed this > > > number. The broker would return complete record batches to the > > > consumer application (to prevent decompression in the broker to > > > split batches), but the number of acquired records would not > > > exceed the limit specified. > > > > > > I suggest `share.fetch.max.records` with the "share." at the start. > > > KIP-1199 is looking to introduce a maximum number of records for > > > regular fetches. Because the behaviour would be quite different, > > > I think it's preferable to have a different configuration > > > property. > > > > > > > > > Thanks, > > > Andrew > > > ________________________________________ > > > From: Wang Jimmy > > > Sent: 31 August 2025 17:54 > > > To: [email protected] > > > Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share > > fetch > > > > > > Hi Mittal, > > > Thanks for your thoughtful feedback on the KIP! > > > > > > AM1: > > > I agree with your point. I have updated the KIP to explain the pros > > and cons of the “strict” mode. > > > > > > AM2: > > > Surely. After implementing strict mode, share-consumer can leverage > > max.poll.records ( or fetch.max.records, as mentioned in AM4) to control > > the fetch rate of shareFetchManager. This prevents scenarios where one > > consumer fetches too many records while others suffer from starvation, > > thereby ensure balanced throughput among different consumers. > > > > > > AM3: > > > Thanks for pointing this out, I'll update the document. But I think > > this KIP won't change behavior of acquisition lock timeout or session > > timeout, which will stay the same as stated in KIP-932. > > > > > > AM4a: > > > I overlooked this point and I think you are right. In “strict” mode, > > the share fetch response will contain only one batch, with maximum records > > upper bounded by max(BatchSize, MaxRecords). > > > > > > AM4b: > > > From my point of view, it would be better to introduce a new > > max.fetch.records configuration since it has different meaning compared to > > max.poll.records. Regarding the pre-fetch behavior, regardless of the > > current implementation for implicit or explicit mode, all records should be > > acknowledged before sending the next fetch request. To achieve "pre-fetch”, > > my initial thought is that broker needs to allow the same member in share > > group to send multiple shareFetch requests, but with an upper bound on the > > total number of delivered records set to max.fetch.records. I am not quite > > sure, but I think I could also finish it in this KIP. What do you think? > > > > > > AM5: > > > Since “AcquireMode” is needed for both the share-consumer(as client > > configuration) and broker(determine the mode used), it should ideally > > placed in two separate class under core and client module. > > > > > > Best, > > > Jimmy Wang > > > > > > > > > 2025年8月27日 04:01,Apoorv Mittal 写道: > > > > > > Hi Jimmy, > > > Thanks for the KIP. Please find some comments below: > > > > > > AM1: The KIP mentions the current behaviour of soft limit but it would > > be > > > helpful to explain the reasoning as well in KIP. Else it seems like the > > > "strict" should always be the preferred fetch behaviour. However, > > that's > > > not true. The broker never reads the actual data records, rather sends > > back > > > the batch of records as produced. Hence, say in strict mode the > > MaxRecords > > > is set to 1 but the producer generates a single batch of 5 records on > > log > > > then only 1 record will be acquired but the whole batch of 5 records > > will > > > be sent to the client. This will have higher egress from the broker and > > > wasted memory on the client. The strict behaviour is helpful in some > > > scenarios but not always. > > > > > > AM2: When we say "Strict max fetch records enables clients to achieve > > > predictable > > > throughput", can you please help explain what is meant by it? An > > example > > > could help here. > > > > > > AM3: As mentioned in the KIP "In scenarios where record processing is > > > time-consuming" hence strict mode is advisable. The client connection > > shall > > > be disconnected post session timeout configuration. Hence it means > > that if > > > processing is taking longer than the session timeout then client > > sessions > > > will be dropped and held records will be released. Shall we propose to > > > handle the behaviour for such scenarios in the KIP as well? > > > > > > AM4: Currently, other than max and min bytes, there are 2 other > > parameters > > > in ShareFetch request 1) MaxRecords 2) BatchSize. Both of these share > > fetch > > > params currently use max.poll.records client configuration. Which means > > > that a single batch of records will be fetched as per max.poll.records > > > client configuration. But the MaxRecords and BatchSize were added > > because > > > of following reasons a) Have some predictable number of records > > returned > > > from broker as the records are backed by acquisition lock timeout, in > > case > > > client takes more time in processing higher number of records b) > > Generate > > > batches so client can "pre-fetch" record batches which can be > > > acknowledged individually (batch) rather waiting for all records to be > > > processed by client. Pre-fetch needs additional handling in client and > > > broker to renew the lock timeout for acquired-waiting record batches in > > > client, which currently does not exist. Questions: > > > > > > AM4-a: What would be the suggested behaviour with "strict" mode and > > > BatchSize i.e. shall always only a single batch be allowed to fetch in > > > "strict" mode? Or there could be any reason to fetch multiple batches > > even > > > in strict mode? I am assuming, and as KIP mentions as well, > > applications > > > will generally use strict mode when the processing time is higher on > > > clients for records, then does it make sense to allow multiple batches? > > > > > > AM4-b: As defined in the KIP-1199 > > > >, > > > there might be a separate config fetch.max.message.count (preferably > > > fetch.max.records) which will be used for MaxRecords. Hence, should we > > > introduce the fetch.max.records configuration in this KIP for > > ShareFetch > > > and think about how prefetching will work? Or if we want to leave this > > for > > > a separate KIP then do we want to define behaviour for MaxRecords in > > strict > > > mode i.e. should MaxRecords be same as max.poll.records and > > pre-fetching > > > should not be supported? > > > > > > AM5: AcquireMode is also used by clients so should the enum > > AcquireMode reside > > > in the server module or it should be in the clients module? > > > > > > Regards, > > > Apoorv Mittal > > > > > > > > > On Thu, Aug 21, 2025 at 6:55 PM Wang Jimmy wrote: > > > > > > Hello all, > > > I would like to start a discussion on KIP-1206: Strict max fetch > > records > > > in share fetch. > > > This KIP introduces the AcquireMode in ShareFetchRequest, which > > provides > > > two options: Strict or loose. When strict mode is selected, we > > should only > > > acquire records till maxFetchRecords. > > > > > > KIP: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch > > > > > > Thanks, > > > Jimmy Wang > > > > > > >
