Hi Jun, thanks for the feedback!  (sorry for the delay, Current/travelling)

JR1: Agreed, I updated the example (aligned with the same mixed workload
case mentioned at the beginning of the motivation)

JR3: The "scale down" was referring to the reservation only (memory held by
open batches, less during low-traffic vs high traffic period). Clarified in
the KIP to make clear that it's not about pool memory scaling down, just
about reservation in open batches (pool memory free for other partitions if
needed).

JR4: Yes, it was confusing indeed. The intention was just to refer to the
producer thread marking the batch for closing (not the actual close). This
will all be the same as today when the batch fills up, as you described
(producer just "marks for close", sender does the actual close and frees
memory up). I clarified it all in the KIP to be accurate.

Thanks!
Lianet


On Fri, May 15, 2026 at 9:39 PM Jun Rao via dev <[email protected]>
wrote:

> Hi, Lianet,
>
> Thanks for the reply.
>
> JR1. "Memory usage: under the current static strategy, a producer writing
> 10 MiB/s of aggregate throughput to a 1000-partition topic with
> RoundRobinPartitioner struggles to achieve a meaningful fraction of that at
> the default 16384 bytes "batch.size". Each partition only sends 16384 bytes
> at a time over a high-latency link, so per-partition throughput is bounded
> by "16384 bytes / RTT". Increasing "batch.size" to 4 MiB unblocks
> throughput but the producer would need 4 MiB × 1000 partitions = 4 GiB of
> pool memory (regardless of actual volume of data flowing per partition).
> Under the dynamic strategy and the same batch.size = 4 MiB, target
> throughput of ~10 KiB/s and linger.ms = 100ms, per-partition memory
> becomes
> ≈ ~1 KiB (10 KiB throughput × 100 ms linger), so total memory ≈ 1000 × 1
> KiB = ~1 MiB (orders of magnitude less than the 4 GiB used under the static
> allocation). Similar savings apply to any workload where per-batch data
> falls short of batch.size: hot-cold partition distributions (skewed key
> traffic), bursty workloads with quiet periods, and over-provisioned
> batch.size settings."
> This example is still not very convincing. It's true that one can set
> batch.size=4MB without running out of memory, but it doesn't achieve the
> batching benefit. So, why will a user bother setting a high batch size? One
> possible example is a client that publishes to a high volume topic without
> keys, and to a low-volume topic with keys, using the default partitioning
> strategy. When a high batch size is set, the static approach may exhaust
> the buffer pool, whereas the dynamic approach avoids exhausting the pool
> and still achieves the batching benefit for the high volume topic.
>
>
> JR3. "Dynamic uses aggregate_throughput × linger.ms, which operators
> control. During lower-traffic periods, static still reserves 400 MiB until
> batches close; dynamic scales down proportionally."
> Hmm, if the dynamic approach ever allocates 400MB worth of chunks, it never
> deallocates them right? Then, how will dynamic scale down?
>
>
> JR4. "If the non-blocking acquire fails (pool exhausted), the producer will
> close the current batch (making it eligible to drain), and blocks on the
> pool to allocate the chunks for the new record (up to max.block.ms)."
> To be precise, currently, when the buffer pool is exhausted, the producer
> doesn't close the batch directly. The background sender thread drains and
> closes the batch.
>
> Jun
>
> On Thu, May 14, 2026 at 2:30 PM Lianet Magrans <[email protected]> wrote:
>
> > Hi Jun,
> >
> > JR1: The example's point was about the case where flow remains under the
> > batch limit (those are the cases where we would get significant memory
> > improvement/differences). But I do get your point and agree: in scenarios
> > where the full batch is used, the dynamic strategy would end up using the
> > same amount of memory. Still, in those cases the value comes from the
> > predictability/tuning of the buffer.memory (memory consumption depends on
> > known factors, not workload-dependant ones). I clarified the first
> example,
> > and added a second one to showcase the case where it's not about memory
> > gains but about predictability.
> >
> > JR2: The main concern with keeping the same close-and-block as trunk in
> > this case was the change it would bring into the send() blocking pattern.
> > On trunk, send only blocks for memory for the first record of a batch,
> but
> > never mid-batch. Applying this close-and-block to the dynamic strategy
> > would change this (send() could block on any record regardless of an open
> > batch). I leaned initially toward avoiding changing the blocking
> behaviour
> > (and pay the extra direct allocation with visibility), but on second
> > thoughts I agree it's cleaner to surface the situation to the API
> (blocking
> > on send, aligned with what trunk does on new batch only, and dynamic
> would
> > do at the record level). It's no change to the send or max.bloc.ms
> > <
> https://urldefense.com/v3/__http://max.bloc.ms__;!!Ayb5sqE7!tij1l2481b8WaW403I3sX_JjzjVmH3SFTImLH03m5t8v-95dzvTWsUQaSx4vXv1j93EM6pqXPd0mg4my$
> >
> > contract really, just a different pattern that seems sensible given the
> > "on-demand" allocation. I updated the KIP with this, and left a rejected
> > alternative for the record. Also, with this I opted for dropping the new
> > metric I had (which was mainly to have visbility over this new
> > direct-allocation path, now removed)
> >
> > Hi TengYao:
> >
> > TYC1: interesting point, agree that your suggested metric would give
> > visibility on what's actually allocated from the pool (which is dynamic
> > now, didn't make too much sense before because it was "static",
> > ~batch.size). I believe that for some of the scenarios you shared, we
> would
> > be covered with the metrics that already exist in trunk (e.g.,
> > bufferpool-wait-*, buffer-available/total-bytes, batch-size-avg), still,
> > it's a fact that the new strategy allocates differently from the pool,
> > dynamically, and only a metric like you suggest would let us see how that
> > goes (batch-size-avg is the closest but is post-compression so not the
> > same). I just wonder if it would make more sense to represent it in
> bytes,
> > rather than in chunks?? (e.g, "batch-pool-bytes-avg"). It would align
> > better with existing metrics in this space, all in bytes. Also I expect
> > operators probably think in bytes (not a new "chunk" concept, which is
> just
> > an internal implementation grouping bytes), and maybe better not to
> expose
> > chunk as a unit of measure to make sure the metric ages well even if the
> > internal chunk details move). What do you think? Will wait to hear back
> and
> > align before updating the KIP
> >
> > Thanks both!
> > Cheers,
> > Lianet
> >
> >
> > On Wed, May 13, 2026 at 4:40 PM Jun Rao via dev <[email protected]>
> > wrote:
> >
> >> Hi, Lianet,
> >>
> >> Thanks for the reply.
> >>
> >> JR1. "As an example: a producer writing 10 MiB/s of aggregate throughput
> >> to
> >> a 1000-partition topic with RoundRobinPartitioner struggles to achieve a
> >> meaningful fraction of that at the default 16384 bytes "batch.size".
> Each
> >> partition only sends 16384 bytes at a time over a high-latency link, so
> >> per-partition throughput is bounded by "16384 bytes / RTT". Increasing
> >> "batch.size" to 4 MiB unblocks throughput but the producer would need 4
> >> MiB
> >> × 1000 partitions = 4 GiB of pool memory to accommodate all partitions
> >> simultaneously (regardless of actual volume of data flowing per
> >> partition)."
> >> This example does not seem strong. In this case, the producer still
> >> requires 4GB of memory even with the proposed KIP to achieve high
> >> throughput because all 1000 partitions are active.
> >>
> >> JR2. "When a new record arrives mid-batch and the pool is exhausted, it
> >> will perform direct heap allocation to allocate all the chunks estimated
> >> needed for the record uncompressed size."
> >> Why do we need to introduce this new case for direct allocation? This
> case
> >> exists in the static allocation approach. If the buffer pool is
> exhausted,
> >> the send() call blocks but all pending batches become drainable to
> prevent
> >> deadlock. Is there any issue with using the same mechanism for dynamic
> >> allocation?
> >>
> >> Jun
> >>
> >>
> >> On Wed, May 13, 2026 at 8:53 AM Lianet Magrans <[email protected]>
> >> wrote:
> >>
> >> > Hi Jun,
> >> >
> >> > JR1: Agreed, I updated the motivation section to clarify the different
> >> > scenarios based on keys and partitioner, and under which situations it
> >> > becomes problematic.
> >> >
> >> > JR2: The KIP preserves the 2 existing direct allocation triggers you
> >> > mentioned (compressed data exceeding allocation and batch split), and
> >> also
> >> > introduces a new one (on new record mid-batch when pool exhausted,
> >> > basically due to the per-record reservation approach). To mitigate,
> >> direct
> >> > allocation is limited to one record's worth of growth per batch (batch
> >> > closed right after it), and we're also introducing the new metric to
> >> have
> >> > visblity and allow to tune buffer.memory. Under normal pool
> conditions,
> >> > direct allocations with the new strategy should happen less often than
> >> with
> >> > the current behaviour, mainly because of the proposed improvement to
> try
> >> > the pool first, non-blocking before falling back to heap allocation. I
> >> > clarified it all in the Internal allocation strategy section
> (extending
> >> on
> >> > new sections "Blocking behaviour" and "Direct heap allocation").
> Please
> >> > take a look and let me know.
> >> >
> >> > Thanks for the review!
> >> > Lianet
> >> >
> >> > PS: addressing TengYao's feedback shortly, thanks!
> >> >
> >> > On Tue, May 12, 2026 at 11:31 AM TengYao Chi <[email protected]>
> >> > wrote:
> >> >
> >> > > Hi Lianet,
> >> > >
> >> > > Thanks for this great KIP.
> >> > >
> >> > > TYC1. I have one consideration regarding observability: Do we need a
> >> new
> >> > > metric for average-chunks-per-batch? With the introduction of the
> >> > > chunked-buffer strategy, memory usage per partition is no longer a
> >> fixed
> >> > > batch.size. While this significantly improves memory efficiency, it
> >> might
> >> > > be beneficial for operators to understand the actual "chunk
> >> utilization"
> >> > or
> >> > > fragmentation under different workloads. Specifically, I think this
> >> > metric
> >> > > would be valuable when combined with the proposed
> bufferpool-overflow
> >> > > metrics: it would help operators distinguish whether memory pressure
> >> is
> >> > > being driven by a large number of active partitions (many small
> >> batches)
> >> > or
> >> > > by individual batches becoming unexpectedly large (many chunks per
> >> batch,
> >> > > perhaps due to large records or low compression ratios). What do you
> >> > think?
> >> > >
> >> > > Best,
> >> > > TengYao Chi
> >> > >
> >> > > On 2026/05/11 23:03:53 Jun Rao via dev wrote:
> >> > > > Hi, Lianet,
> >> > > >
> >> > > > Thanks for the KIP.
> >> > > >
> >> > > > JR1. It would be useful to provide a bit more motivation for the
> >> KIP.
> >> > The
> >> > > > batches allocated from the buffer pool are proportional to the
> >> number
> >> > of
> >> > > > active partitions. For publishing records without keys, the active
> >> > > > partition is 1 by default, independent of the number of partitions
> >> in a
> >> > > > topic. It's only when publishing records with keys that the active
> >> > > > partition can be the total number of partitions in a topic. So, a
> >> > > possible
> >> > > > scenario is that a client publishes records without keys to one
> >> topic
> >> > > while
> >> > > > publishing records with keys to another.
> >> > > >
> >> > > > JR2. "Following records appended to the batch do not block or
> throw.
> >> > They
> >> > > > attempt non-blocking pool allocation and fall back to direct heap
> if
> >> > the
> >> > > > pool is exhausted.
> >> > > > Ensures not blocking on pool memory while already holding some
> for a
> >> > > batch".
> >> > > >
> >> > > > Currently, the producer only allocates memory exceeding the
> >> configured
> >> > > > buffer pool size in two cases.
> >> > > > (1) Compressed data exceeding the estimated size
> >> > > > (2) When a batch is too large for the broker's max.message.bytes
> and
> >> > gets
> >> > > > split, each sub-batch is allocated via
> >> ByteBuffer.allocate(initialSize)
> >> > > > directly.
> >> > > >
> >> > > > With the KIP, are we introducing new cases in addition to the
> above
> >> > two?
> >> > > >
> >> > > > Jun
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Fri, May 1, 2026 at 6:03 AM Lianet Magrans <[email protected]
> >
> >> > > wrote:
> >> > > >
> >> > > > > Thanks for the feedback Jaisen! I like your proposed "static"
> for
> >> the
> >> > > > > current behaviour, it aligns nicely. All updated.
> >> > > > >
> >> > > > > Best!
> >> > > > > Lianet
> >> > > > >
> >> > > > > On Thu, Apr 30, 2026 at 4:27 PM Jaisen Mathai via dev <
> >> > > > > [email protected]>
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Thanks Lianet.
> >> > > > > >
> >> > > > > > I like the proposal.
> >> > > > > >
> >> > > > > > I suggest a descriptive name such as static or fixed instead
> of
> >> > > legacy
> >> > > > > for
> >> > > > > > the default configuration value. I think these will age better
> >> > while
> >> > > > > still
> >> > > > > > communicating that users should strongly consider using the
> >> > > non-default
> >> > > > > > value of dynamic.
> >> > > > > >
> >> > > > > > Jaisen
> >> > > > > >
> >> > > > > > On Thu, Apr 30, 2026 at 8:02 AM Lianet Magrans <
> >> [email protected]
> >> > >
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi all,
> >> > > > > > >
> >> > > > > > > I would like to start a discussion on KIP-1332 that
> proposes a
> >> > > dynamic
> >> > > > > > > memory allocation strategy for the Kafka producer, to unlock
> >> > > > > high-latency
> >> > > > > > > scenarios increasingly common as Kafka moves toward object
> >> > storage.
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/KAFKA/KIP-1332*3A*Dynamic*memory*allocation*for*the*Kafka*producer__;JSsrKysrKys!!Ayb5sqE7!t4yI-C5BwMxJ6dMJC7tuQhu94KuolbgKXyEnl4GChJGLYY2eS4NXk-GZYlnVPnuw3ESrGwKjyPDr5Bjp0Gk$
> >> > > > > > >
> >> > > > > > > Thanks!
> >> > > > > > > Lianet
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to