Hi PoAn, Thanks for your response. I'm going to try again :) AS2: ProducerStateEntry#NUM_BATCHES_TO_RETAIN is already a partition-level concept. The new max.idempotence.batches.to.retain acts per-partition as far as I understand. The fact that we have connection-level and partition-level concepts colliding is weird to me.
If I created 20 partitions and set max.in.flight.requests.per.connection to 100, and also changed the producer to limit its in-flight requests at the partition level, couldn't I get the best of both worlds? I could use the usual scaling mechanism of adding partitions to get higher throughput and I could continue to use 5 requests per partition. I don't mind adding a config to set the number of batches to retain, but I think that's only half the problem. Thanks, Andrew On 2026/03/08 06:33:35 PoAn Yang wrote: > Hi Andrew, > > Thanks for the review and sorry for the late reply. I took some time to think > through how a partition-level approach could be implemented, what benefits it > might bring, and to run additional experiments in low-latency environments. > > AS1 & AS2: Since both comments converge on the idea of introducing > partition-level configuration, I'll address them together. > > 1. Partition-level configuration does not satisfy all use cases. While a > partition-level in-flight limit offers finer-grained control per partition, > it doesn't cover the case where a user wants to bound the total number of > in-flight requests on a single broker connection. These are two distinct > concerns: per-partition flow control vs. per-connection back-pressure. A > partition-level configuration alone cannot replace the connection-level limit > that max.in.flight.requests.per.connection currently provides. > > 2. Having two levels of in-flight limits increases both user-facing and > implementation complexity. > > 3. A broker-level configuration benefits both high-latency and low-latency > environments. I've added localhost benchmark results to the KIP. Even in a > low-latency environment, setting max.in.flight.requests.per.connection=1 > causes each request to queue at the producer level, resulting in > significantly higher latency compared to allowing more in-flight requests. > > Thank you, > PoAn > > > On Mar 2, 2026, at 7:11 AM, Andrew Schofield <[email protected]> wrote: > > > > Hi PoAn, > > Thanks for your KIP. This is seems like a good area to improve, not just > > for the high-latency connections between clients and brokers that you > > mentioned, but also because diskless is introducing topics which have high > > write latency too. > > > > AS1: In general, I'm nervous of having to set broker configurations based > > on knowledge of the client latency. If you have an asymmetrical > > configuration with a mixture of high and low latency clients, you end up > > having to configure for the worst case. I'd prefer the client to behave > > differently in the event that it is experiencing high latency, and also to > > be responsive to the difference in latency for specific topics which have > > higher latency, rather than to change the broker configuration for all > > clients. wdyt? > > > > AS2: If I understand the code correctly (and that's not guaranteed), > > ProducerStateEntry.NUM_BATCHES_TO_RETAIN (5) is the number of batches per > > producer ID per topic-partition that the broker can retain. The Java > > producer client uses max.in-flight.batches.per.connection (also 5) to limit > > how many requests it is prepared to have in flight, but this is at the > > level of the entire connection. Would an alternative change be to switch > > the producer's limit from a connection-level limit to a partition-level > > limit matching the broker implementation? You could get a lot of in-flight > > requests by using more partitions. The key is the amount of data in flight, > > not really the number of batches. I may have misunderstood how this area > > works, but it doesn't seem optimal. > > > > Thanks, > > Andrew > > > > On 2026/02/28 12:21:10 PoAn Yang wrote: > >> Hi Luke, > >> > >> Thanks for the review. > >> > >> 2 & 4. I add more background to Broker Configuration and > >> Dynamic Capacity Discovery paragraphs. In the initial state, > >> the producer can only send at most min(5, > >> max.in.flight.requests.per.connection) requests, so it doesn’t > >> break old brokers capacity. > >> > >> Thank you, > >> PoAn > >> > >>> On Feb 27, 2026, at 4:27 PM, Luke Chen <[email protected]> wrote: > >>> > >>> Hi PoAn, > >>> > >>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause > >>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove > >>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. > >>> > >>> OK, I see. > >>> > >>>> Yes, if max.in.flight.requests.per.connection is larger than > >>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. > >>> That is why we have initial state to make sure the producer sends > >>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. > >>> Only if it finds a broker can retain more batches, it adjusts its > >>> limitation. > >>> > >>> So, currently, when idempotent/transactional producer is enabled, we will > >>> throw exception if the max.in.flight.requests.per.connection > 5. > >>> When we allow users to configure the NUM_BATCHES_TO_RETAIN, the validation > >>> will not be applied before sending the produce request. > >>> And that's why we need the produce response to tell the producer what the > >>> setting in the broker side is. > >>> Could you make it more clear about this in the KIP? > >>> > >>> Also, if the max.in.flight.requests.per.connection is set to 100, > >>> and NUM_BATCHES_TO_RETAIN is 5, then it means it's a little late when the > >>> first producer response is received if we already allow producers to send > >>> 100 requests in flight. If we want to adopt this solution, maybe we need > >>> to > >>> let the producer begins from max.in.flight.requests.per.connection = 1 and > >>> then adjust it to the expected value after the first producer response is > >>> received. Does that make sense? > >>> > >>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, > >>> if a broker works with old producers, it may waste memory. Old > >>> producers can't send more in flight requests cause of ConfigException. > >>> How about we still use 5 in 4.x and adjust to a larger value in 5.0? > >>> > >>> Sounds good to me. > >>> > >>> Thank you, > >>> Luke > >>> > >>> > >>> > >>> On Thu, Feb 26, 2026 at 9:22 PM PoAn Yang <[email protected]> wrote: > >>> > >>>> Hi Luke, > >>>> > >>>> Thanks for the review and suggestions. > >>>> > >>>> 1. KAFKA-18905 or KAFKA-9199 are about leader changes cause > >>>> OUT_OF_ORDER_SEQUENCE error. This KIP is to remove > >>>> NUM_BATCHES_TO_RETAIN limitation. I think they’re not related. > >>>> > >>>> 2. Agree, transactional producers are based on idempotent producers. > >>>> Updated it. > >>>> > >>>> 3. > >>>>> So, I'd like to know why we have to adjust the > >>>>> `max.in.flight.requests.per.connection` value in the producer side? > >>>> > >>>> > >>>> User doesn’t need to update max.in.flight.requests.per.connection in > >>>> this case. The producer will automatically adjust internal limitation of > >>>> in flight requests. > >>>> > >>>>> Using the example above, after this KIP, > >>>>> the `max.in.flight.requests.per.connection=10` cannot be retained > >>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? > >>>> > >>>> > >>>> Yes, if max.in.flight.requests.per.connection is larger than > >>>> NUM_BATCHES_TO_RETAIN, the batches cannot be retained. > >>>> That is why we have initial state to make sure the producer sends > >>>> in flight requests less or equal to NUM_BATCHES_TO_RETAIN. > >>>> Only if it finds a broker can retain more batches, it adjusts its > >>>> limitation. > >>>> > >>>> 4. We can adjust the default NUM_BATCHES_TO_RETAIN. However, > >>>> if a broker works with old producers, it may waste memory. Old > >>>> producers can't send more in flight requests cause of ConfigException. > >>>> How about we still use 5 in 4.x and adjust to a larger value in 5.0? > >>>> > >>>> Thank you, > >>>> PoAn > >>>> > >>>>> On Feb 25, 2026, at 9:07 PM, Luke Chen <[email protected]> wrote: > >>>>> > >>>>> Hi PoAn, > >>>>> > >>>>> Thanks for the KIP! > >>>>> I agree the number of batches to retain should be configurable to > >>>>> improve > >>>>> the throughput. > >>>>> > >>>>> Comments: > >>>>> 1. Could you add the issue: KAFKA-18905 > >>>>> <https://issues.apache.org/jira/browse/KAFKA-18905> into the > >>>>> motivation section? I think this is the issue we want to address, right? > >>>>> > >>>>> 2. > Introduce a new config on the broker, as the broker must know how > >>>> much > >>>>> memory to allocate. Operators can set a limitation on the broker side to > >>>>> prevent malicious producers. This configuration only takes effect for > >>>>> idempotent producers. > >>>>> I think not only the idempotent producers, but also the > >>>>> transactional producers, as long as they have the PID. > >>>>> > >>>>> 3. About the producer response update, I'm wondering if it is necessary? > >>>>> Currently, when producer with `max.in.flight.requests.per.connection=10` > >>>>> and NUM_BATCHES_TO_RETAIN=5, we won't adjust the producer config to 5. > >>>>> Of course it is possible to the duplication cannot be detected, but that > >>>>> might be user's choice to improve the throughput (though it might be > >>>> rare). > >>>>> So, I'd like to know why we have to adjust the > >>>>> `max.in.flight.requests.per.connection` value in the producer side? > >>>>> Using the example above, after this KIP, > >>>>> the `max.in.flight.requests.per.connection=10` cannot be retained > >>>>> unless NUM_BATCHES_TO_RETAIN is set to 10, right? > >>>>> > >>>>> 4. The default value of `max.idempotence.batches.to.retain` > >>>>> In the performance test you showed, it obviously shows > >>>>> larger `max.idempotence.batches.to.retain` will get better throughput. > >>>>> Also, the memory usage is small, do we have any reason we keep the > >>>> default > >>>>> value for 5? > >>>>> > >>>>> Thank you, > >>>>> Luke > >>>>> > >>>>> > >>>>> > >>>>> On Sun, Feb 22, 2026 at 9:48 PM PoAn Yang <[email protected]> wrote: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> I would like to start a discussion thread on KIP-1269. In this KIP, we > >>>> aim > >>>>>> to remove limitation of maximal number of batches to retain for a > >>>>>> idempotent producer. In our test, it can improve throughput and reduce > >>>>>> latency. > >>>>>> > >>>>>> https://cwiki.apache.org/confluence/x/loI8G > >>>>>> > >>>>>> Please take a look and feel free to share any thoughts. > >>>>>> > >>>>>> Thanks. > >>>>>> PoAn > >>>> > >>>> > >> > >> > >
