Thank you, Artem!

@devs, welcome to vote for this KIP.
Key proposal:
1. allocate multiple smaller initial batch size buffer in producer, and
list them together when expansion for better memory usage
2. add a max batch size config in producer, so when producer rate is
suddenly high, we can still have high throughput with batch size larger
than "batch.size" (and less than "batch.max.size", where "batch.size" is
soft limit and "batch.max.size" is hard limit)
Here's the updated KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer

And, any comments and feedback are welcome.

Thank you.
Luke

On Tue, Oct 26, 2021 at 6:35 AM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

> Hi Luke,
>
> I've looked at the updated KIP-782, it looks good to me.
>
> -Artem
>
> On Sun, Oct 24, 2021 at 1:46 AM Luke Chen <show...@gmail.com> wrote:
>
> > Hi Artem,
> > Thanks for your good suggestion again.
> > I've combined your idea into this KIP, and updated it.
> > Note, in the end, I still keep the "batch.initial.size" config (default
> is
> > 0, which means "batch.size" will be initial batch size) for better memory
> > conservation.
> >
> > Detailed description can be found here:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> >
> > Let me know if you have other suggestions.
> >
> > Thank you.
> > Luke
> >
> > On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <show...@gmail.com> wrote:
> >
> >> Hi Artem,
> >> Thanks for the suggestion. Let me confirm my understanding is correct.
> >> So, what you suggest is that the "batch.size" is more like a "soft
> limit"
> >> batch size, and the "hard limit" is "batch.max.size". When reaching the
> >> batch.size of the buffer, it means the buffer is "ready" to be be sent.
> But
> >> before the linger.ms reached, if there are more data coming, we can
> >> still accumulate it into the same buffer, until it reached the
> >> "batch.max.size". After it reached the "batch.max.size", we'll create
> >> another batch for it.
> >>
> >> So after your suggestion, we won't need the "batch.initial.size", and we
> >> can use "batch.size" as the initial batch size. We list each
> "batch.size"
> >> together, until it reached "batch.max.size". Something like this:
> >>
> >> [image: image.png]
> >> Is my understanding correct?
> >> If so, that sounds good to me.
> >> If not, please kindly explain more to me.
> >>
> >> Thank you.
> >> Luke
> >>
> >>
> >>
> >>
> >> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
> >> <alivsh...@confluent.io.invalid> wrote:
> >>
> >>> Hi Luke,
> >>>
> >>> Nice suggestion.  It should optimize how memory is used with different
> >>> production rates, but I wonder if we can take this idea further and
> >>> improve
> >>> batching in general.
> >>>
> >>> Currently batch.size is used in two conditions:
> >>>
> >>> 1. When we append records to a batch in the accumulator, we create a
> new
> >>> batch if the current batch would exceed the batch.size.
> >>> 2. When we drain the batch from the accumulator, a batch becomes
> 'ready'
> >>> when it reaches batch.size.
> >>>
> >>> The second condition is good with the current batch size, because if
> >>> linger.ms is greater than 0, the send can be triggered by
> accomplishing
> >>> the
> >>> batching goal.
> >>>
> >>> The first condition, though, leads to creating many batches if the
> >>> network
> >>> latency or production rate (or both) is high, and with 5 in-flight and
> >>> 16KB
> >>> batches we can only have 80KB of data in-flight per partition.  Which
> >>> means
> >>> that with 50ms latency, we can only push ~1.6MB/sec per partition (this
> >>> goes down if we consider higher latencies, e.g. with 100ms we can only
> >>> push
> >>> ~0.8MB/sec).
> >>>
> >>> I think it would be great to separate the two sizes:
> >>>
> >>> 1. When appending records to a batch, create a new batch if the current
> >>> exceeds a larger size (we can call it batch.max.size), say 256KB by
> >>> default.
> >>> 2. When we drain, consider batch 'ready' if it exceeds batch.size,
> which
> >>> is
> >>> 16KB by default.
> >>>
> >>> For memory conservation we may introduce batch.initial.size if we want
> to
> >>> have a flexibility to make it even smaller than batch.size, or we can
> >>> just
> >>> always use batch.size as the initial size (in which case we don't
> >>> need batch.initial.size config).
> >>>
> >>> -Artem
> >>>
> >>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <show...@gmail.com> wrote:
> >>>
> >>> > Hi Kafka dev,
> >>> > I'd like to start a vote for the proposal: KIP-782: Expandable batch
> >>> size
> >>> > in producer.
> >>> >
> >>> > The main purpose for this KIP is to have better memory usage in
> >>> producer,
> >>> > and also save users from the dilemma while setting the batch size
> >>> > configuration. After this KIP, users can set a higher batch.size
> >>> without
> >>> > worries, and of course, with an appropriate "batch.initial.size".
> >>> >
> >>> > Derailed description can be found here:
> >>> >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> >>> >
> >>> > Any comments and feedback are welcome.
> >>> >
> >>> > Thank you.
> >>> > Luke
> >>> >
> >>>
> >>
>

Reply via email to