Hi Artem,
Thanks for your good suggestion again.
I've combined your idea into this KIP, and updated it.
Note, in the end, I still keep the "batch.initial.size" config (default is
0, which means "batch.size" will be initial batch size) for better memory
conservation.

Detailed description can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer

Let me know if you have other suggestions.

Thank you.
Luke

On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <show...@gmail.com> wrote:

> Hi Artem,
> Thanks for the suggestion. Let me confirm my understanding is correct.
> So, what you suggest is that the "batch.size" is more like a "soft limit"
> batch size, and the "hard limit" is "batch.max.size". When reaching the
> batch.size of the buffer, it means the buffer is "ready" to be be sent. But
> before the linger.ms reached, if there are more data coming, we can still
> accumulate it into the same buffer, until it reached the "batch.max.size".
> After it reached the "batch.max.size", we'll create another batch for it.
>
> So after your suggestion, we won't need the "batch.initial.size", and we
> can use "batch.size" as the initial batch size. We list each "batch.size"
> together, until it reached "batch.max.size". Something like this:
>
> [image: image.png]
> Is my understanding correct?
> If so, that sounds good to me.
> If not, please kindly explain more to me.
>
> Thank you.
> Luke
>
>
>
>
> On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
>> Hi Luke,
>>
>> Nice suggestion.  It should optimize how memory is used with different
>> production rates, but I wonder if we can take this idea further and
>> improve
>> batching in general.
>>
>> Currently batch.size is used in two conditions:
>>
>> 1. When we append records to a batch in the accumulator, we create a new
>> batch if the current batch would exceed the batch.size.
>> 2. When we drain the batch from the accumulator, a batch becomes 'ready'
>> when it reaches batch.size.
>>
>> The second condition is good with the current batch size, because if
>> linger.ms is greater than 0, the send can be triggered by accomplishing
>> the
>> batching goal.
>>
>> The first condition, though, leads to creating many batches if the network
>> latency or production rate (or both) is high, and with 5 in-flight and
>> 16KB
>> batches we can only have 80KB of data in-flight per partition.  Which
>> means
>> that with 50ms latency, we can only push ~1.6MB/sec per partition (this
>> goes down if we consider higher latencies, e.g. with 100ms we can only
>> push
>> ~0.8MB/sec).
>>
>> I think it would be great to separate the two sizes:
>>
>> 1. When appending records to a batch, create a new batch if the current
>> exceeds a larger size (we can call it batch.max.size), say 256KB by
>> default.
>> 2. When we drain, consider batch 'ready' if it exceeds batch.size, which
>> is
>> 16KB by default.
>>
>> For memory conservation we may introduce batch.initial.size if we want to
>> have a flexibility to make it even smaller than batch.size, or we can just
>> always use batch.size as the initial size (in which case we don't
>> need batch.initial.size config).
>>
>> -Artem
>>
>> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <show...@gmail.com> wrote:
>>
>> > Hi Kafka dev,
>> > I'd like to start a vote for the proposal: KIP-782: Expandable batch
>> size
>> > in producer.
>> >
>> > The main purpose for this KIP is to have better memory usage in
>> producer,
>> > and also save users from the dilemma while setting the batch size
>> > configuration. After this KIP, users can set a higher batch.size without
>> > worries, and of course, with an appropriate "batch.initial.size".
>> >
>> > Derailed description can be found here:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
>> >
>> > Any comments and feedback are welcome.
>> >
>> > Thank you.
>> > Luke
>> >
>>
>

Reply via email to