Hi Jimmy,
Thanks for the updates. Just a few minor comments remain, but it's nearly
ready.

AS10: There is no max.fetch.records parameter. I think you mean
max.poll.records. There might be share.max.fetch.records in the future
but not yet.

AS11: I don't understand the point about poison records. The fix to
that issue will likely be for the broker independently to decide to
acquire only a subset of the records in a batch to ensure that any
bad records failing delivery do not impact the delivery count of
neighbouring records. This KIP is not required for that.

AS12: On line 30 of the ShareFetchRequest schema, please put the
values for the AcquireMode in the about string such as
"The acquire mode to control the fetch behavior: 0 - batch-optimized, 1 - 
record-limit"
This will end up in the protocol documentation automatically so
having the values makes the documentation more complete.

AS13: You should not include the details of the SharePartitionManager
or ShareFetch classes in the KIP. The KIP is a specification of the
protocol and public programming interfaces only.

AS14: Thanks for adding the examples. These are just illustrations
of some permitted behaviour, but the broker is only required to
keep within the limits specified in the ShareFetch request. As a result,
your rejected alternative of "Only one entire batch...." is actually
incorrect. The broker could do that and the KIP is not prescriptive about
the details of broker behaviour. So, please remove this rejected alternative.

AS15: The standard for string config values in Kafka is snake_case.
You are adding a new enum called ShareAcquireMode (in some places you've
called it AcquireMode). The values should be snake_case, such as
"batch_optimized", although the enum names would be capitalised like
BATCH_OPTIMIZED. Then the description of the values for the config
would also be snake_case.

If you look at auto.offset.reset consumer config and the
related AutoOffsetResetStrategy class you'll see what I mean.


Thanks,
Andrew

________________________________________
From: Wang Jimmy
Sent: 09 October 2025 12:25
To: [email protected]
Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share 
fetch
 
Hi Andrew,
Thanks for reading and responding to the KIP!

AS3 - AS8:
 All these comments make sense to me, I have updated the KIP according to 
your suggestions.

AS9:
> Return two entire batches 0-5 inclusive, but only acquire records 0-4
> inclusive. If a batch is "split" like this, the broker has returned record
> data which has not been acquired by this consumer.

From my point of view, the method mentioned above is the best way to implement 
the record-limit mode in Kafka. Although this will introduce overhead on either 
the client or server side, I believe it represents a necessary trade-off 
between the current Kafka log segment structure (which relies on producer 
batches) and the need for strict record control. I have moved the rest of the 
strategies you mentioned into the 'Rejected Alternatives' section, and I’ve 
added two examples for reference.

I will continue refining the KIP and do my best to ensure this change is merged 
into Kafka 4.2. Please take another look before starting the vote thread.

Best,
Jimmy Wang

From: Andrew Schofield
Date: Monday, October 6, 2025 at 23:30
To: [email protected]
Subject: Re: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch

Hi Jimmy,
Sorry for the long delay responding to the KIP. I think it's important to be
careful with the refinements to KIP-932, so I'm taking my time to make sure
whatever we do is good.

There are good reasons why KIP-932 optimised for batch delivery, but I know
from talking to early users of the share consumer, that it's not always ideal.
I'm strongly in favour of providing a way of limiting the number of records
regardless of the batching, for applications which need it.

There is another improvement that I foresee here and that is for pre-fetching
of records. For example, an application could ask for 500 records, but actually
want to fetch 1000 so that it can overlap the request-response of acknowledging
the first 500 records with the processing of the next 500 already fetched.
However, getting pre-fetching to work in all situations while the acquisition
locks are ticking down is more work. I don't want to introduce pre-fetching in
this KIP.

As a result, I would prefer that this KIP introduces just one new config,
share.acquire.mode. I think we will use share.fetch.max.records or something
like it for prefetching when it's time. I know this contradicts an comment
of mine, but I've changed my mind :)

I would also like to rename "strict" to "record limit" in the config. That
is a better description of the effect.

I hope this makes sense.

Here are my specific comments.

AS3: I would remove "strict" and "loose" from the description. Having
"Strict" in the KIP title is fine, but the modes should be "batch-optimised"
and "record-limit".

AS4: I suggest removing `share.max.fetch.records` from this KIP. It moves us
in the direction of having MaxRecords and BatchSize differing in the ShareFetch
request, and that's intended for pre-fetching when we get around to it.

AS5: I suggest changing "strict" to "record_limit" in the config for
`share.acquire.mode`. Of course, this affects the `ShareAcquireMode` enum too.

AS6: Since "BATCH_OPTIMIZED" is the default for the config, I would prefer
this enum value to be the first in the ShareAcquireMode enum.

AS7: The package and naming of the AcquireMode enum is inconsistent and
incorrect. The internals package is not intended to be public. So, I think
the enum is org.apache.kafka.clients.consumer.ShareAcquireMode.

AS8: In the ShareFetchRequest, I would make the AcquireMode field have
type int8. There's no need to make it string. The values should match the
equivalent enum values.

AS9: In the proposed changes, I suggest an example such as this.

Let's say that the records produced onto a topic-partition each
contain 3 records. Then the batches would have offsets 0-2 inclusive,
3-5 inclusive, 6-8 inclusive, and so on.

Consider a share consumer with `max.poll.records=5` fetching records
from the topic. For batch-optimised mode, it will receive two whole
batches of records, from offsets 0 to 5 inclusive, which is 6 records
in total.

For record-limit mode, it will receive no more than 5 records. The broker
could choose to:

* Return just one entire batch 0-2 inclusive, which is 3 records in total.
* Return two entire batches 0-5 inclusive, but only acquire records 0-4
inclusive. If a batch is "split" like this, the broker has returned record
data which has not been acquired by this consumer.
* Adjust the record batching of the records returned so that a single
batch 0-4 inclusive is returned.
* Or any other strategy which acquires no more than 5 records.


I hope we can get this into Apache Kafka 4.2. I expect we will make it.

Thanks,
Andrew
________________________________
From: Wang Jimmy
Sent: 10 September 2025 19:16
To: [email protected]
Subject: RE: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch

Hi Andrew,
Thanks for your comments and sorry for the delayed response.

AS1:
I think your intention is to control the fetched records solely by 
share.max.fetch.records rather than introducing a new acquireMode parameter.
However, the concept of acquireMode is intended to achieve the following things:
Disable batches (only one batch will be returned to the consumer in one fetch 
in strict mode)
Make a distinction from the current broker behavior where the maximum number of 
records is set as a soft limit.

As for the first point, it would be better if we wanted to extend the lock 
timeout on a ”record“ basis rather than on a ”record batches” basis. And as 
Mittal suggested, we assume that the client application cares more about the 
precise count of messages rather than the throughput, so I think it makes sense 
that batching is not allowed in this mode. What do you think?

AS2: I agree with your idea and have changed the client configuration to 
share.max.fetch.records. Thanks for your advice.

Best,
Jimmy Wang

On 2025/09/03 15:46:06 Andrew Schofield wrote:
> Hi Jimmy,
> Thank you for the KIP. I'm sure I'll have more comments yet as
> I think through how it will work in practice, and also the work that
> we are looking to do in the consumer as part of Kafka 4.2 around
> flow control and memory usage.
>
>
> The behaviour in KIP-932 is expecting that the consuming application
> will be able to consume the fetched records in a timely fashion so
> that it does not inadvertently breach the acquisition lock time-out.
> It lets the application limit the amount of memory used for buffered
> records and also limit the number of fetched records. The limit of
> the number of records is applied as a soft limit, meaning that
> complete record batches (as written to the log) will be acquired.
> Providing a way to control the number of records more strictly
> will be useful for some situations, at the expense of throughput.
>
> AS1: I suggest using `share.fetch.max.records` as the way to control
> the maximum number of records. If not specified, you would get what
> you get today, which is a soft limit based on `max.poll.records`.
> If specified, the number of acquired records would not exceed this
> number. The broker would return complete record batches to the
> consumer application (to prevent decompression in the broker to
> split batches), but the number of acquired records would not
> exceed the limit specified.
>
> I suggest `share.fetch.max.records` with the "share." at the start.
> KIP-1199 is looking to introduce a maximum number of records for
> regular fetches. Because the behaviour would be quite different,
> I think it's preferable to have a different configuration
> property.
>
>
> Thanks,
> Andrew
> ________________________________________
> From: Wang Jimmy
> Sent: 31 August 2025 17:54
> To: [email protected]
> Subject: Re: [DISCUSS] KIP-1206: Strict max fetch records in share fetch
>
> Hi Mittal,
> Thanks for your thoughtful feedback on the KIP!
>
> AM1:
> I agree with your point. I have updated the KIP to explain the pros and 
cons of the “strict” mode.
>
> AM2:
> Surely. After implementing strict mode, share-consumer can leverage 
max.poll.records ( or fetch.max.records, as mentioned in AM4) to control the 
fetch rate of shareFetchManager. This prevents scenarios where one consumer 
fetches too many records while others suffer from starvation, thereby ensure 
balanced throughput among different consumers.
>
> AM3:
> Thanks for pointing this out, I'll update the document. But I think this 
KIP won't change behavior of acquisition lock timeout or session timeout, which 
will stay the same as stated in KIP-932.
>
> AM4a:
> I overlooked this point and I think you are right. In “strict” mode, the 
share fetch response will contain only one batch, with maximum records upper 
bounded by max(BatchSize, MaxRecords).
>
> AM4b:
> From my point of view, it would be better to introduce a new 
max.fetch.records configuration since it has different meaning compared to 
max.poll.records. Regarding the pre-fetch behavior, regardless of the current 
implementation for implicit or explicit mode, all records should be 
acknowledged before sending the next fetch request. To achieve "pre-fetch”, my 
initial thought is that broker needs to allow the same member in share group to 
send multiple shareFetch requests, but with an upper bound on the total number 
of delivered records set to max.fetch.records. I am not quite sure, but I think 
I could also finish it in this KIP. What do you think?
>
> AM5:
> Since “AcquireMode” is needed for both the share-consumer(as client 
configuration) and broker(determine the mode used), it should ideally placed in 
two separate class under core and client module.
>
> Best,
> Jimmy Wang
>
>
> 2025年8月27日 04:01,Apoorv Mittal 写道:
>
> Hi Jimmy,
> Thanks for the KIP. Please find some comments below:
>
> AM1: The KIP mentions the current behaviour of soft limit but it would be
> helpful to explain the reasoning as well in KIP. Else it seems like the
> "strict" should always be the preferred fetch behaviour. However, that's
> not true. The broker never reads the actual data records, rather sends back
> the batch of records as produced. Hence, say in strict mode the MaxRecords
> is set to 1 but the producer generates a single batch of 5 records on log
> then only 1 record will be acquired but the whole batch of 5 records will
> be sent to the client. This will have higher egress from the broker and
> wasted memory on the client. The strict behaviour is helpful in some
> scenarios but not always.
>
> AM2: When we say "Strict max fetch records enables clients to achieve
> predictable
> throughput", can you please help explain what is meant by it? An example
> could help here.
>
> AM3: As mentioned in the KIP "In scenarios where record processing is
> time-consuming" hence strict mode is advisable. The client connection shall
> be disconnected post session timeout configuration. Hence it means that if
> processing is taking longer than the session timeout then client sessions
> will be dropped and held records will be released. Shall we propose to
> handle the behaviour for such scenarios in the KIP as well?
>
> AM4: Currently, other than max and min bytes, there are 2 other parameters
> in ShareFetch request 1) MaxRecords 2) BatchSize. Both of these share fetch
> params currently use max.poll.records client configuration. Which means
> that a single batch of records will be fetched as per max.poll.records
> client configuration. But the MaxRecords and BatchSize were added because
> of following reasons a) Have some predictable number of records returned
> from broker as the records are backed by acquisition lock timeout, in case
> client takes more time in processing higher number of records b) Generate
> batches so client can "pre-fetch" record batches which can be
> acknowledged individually (batch) rather waiting for all records to be
> processed by client. Pre-fetch needs additional handling in client and
> broker to renew the lock timeout for acquired-waiting record batches in
> client, which currently does not exist. Questions:
>
> AM4-a: What would be the suggested behaviour with "strict" mode and
> BatchSize i.e. shall always only a single batch be allowed to fetch in
> "strict" mode? Or there could be any reason to fetch multiple batches even
> in strict mode? I am assuming, and as KIP mentions as well, applications
> will generally use strict mode when the processing time is higher on
> clients for records, then does it make sense to allow multiple batches?
>
> AM4-b: As defined in the KIP-1199
> >,
> there might be a separate config fetch.max.message.count (preferably
> fetch.max.records) which will be used for MaxRecords. Hence, should we
> introduce the fetch.max.records configuration in this KIP for ShareFetch
> and think about how prefetching will work? Or if we want to leave this for
> a separate KIP then do we want to define behaviour for MaxRecords in strict
> mode i.e. should MaxRecords be same as max.poll.records and pre-fetching
> should not be supported?
>
> AM5: AcquireMode is also used by clients so should the enum AcquireMode 
reside
> in the server module or it should be in the clients module?
>
> Regards,
> Apoorv Mittal
>
>
> On Thu, Aug 21, 2025 at 6:55 PM Wang Jimmy wrote:
>
> Hello all,
> I would like to start a discussion on KIP-1206: Strict max fetch records
> in share fetch.
> This KIP introduces the AcquireMode in ShareFetchRequest, which provides
> two options: Strict or loose.  When strict mode is selected, we 
should only
> acquire records till maxFetchRecords.
>
> KIP:
> 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1206:+Strict+max+fetch+records+in+share+fetch
>
> Thanks,
> Jimmy Wang
>
>

Reply via email to