Hi Artem, Thanks for your good suggestion again. I've combined your idea into this KIP, and updated it. Note, in the end, I still keep the "batch.initial.size" config (default is 0, which means "batch.size" will be initial batch size) for better memory conservation.
Detailed description can be found here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer Let me know if you have other suggestions. Thank you. Luke On Sat, Oct 23, 2021 at 10:50 AM Luke Chen <show...@gmail.com> wrote: > Hi Artem, > Thanks for the suggestion. Let me confirm my understanding is correct. > So, what you suggest is that the "batch.size" is more like a "soft limit" > batch size, and the "hard limit" is "batch.max.size". When reaching the > batch.size of the buffer, it means the buffer is "ready" to be be sent. But > before the linger.ms reached, if there are more data coming, we can still > accumulate it into the same buffer, until it reached the "batch.max.size". > After it reached the "batch.max.size", we'll create another batch for it. > > So after your suggestion, we won't need the "batch.initial.size", and we > can use "batch.size" as the initial batch size. We list each "batch.size" > together, until it reached "batch.max.size". Something like this: > > [image: image.png] > Is my understanding correct? > If so, that sounds good to me. > If not, please kindly explain more to me. > > Thank you. > Luke > > > > > On Sat, Oct 23, 2021 at 2:13 AM Artem Livshits > <alivsh...@confluent.io.invalid> wrote: > >> Hi Luke, >> >> Nice suggestion. It should optimize how memory is used with different >> production rates, but I wonder if we can take this idea further and >> improve >> batching in general. >> >> Currently batch.size is used in two conditions: >> >> 1. When we append records to a batch in the accumulator, we create a new >> batch if the current batch would exceed the batch.size. >> 2. When we drain the batch from the accumulator, a batch becomes 'ready' >> when it reaches batch.size. >> >> The second condition is good with the current batch size, because if >> linger.ms is greater than 0, the send can be triggered by accomplishing >> the >> batching goal. >> >> The first condition, though, leads to creating many batches if the network >> latency or production rate (or both) is high, and with 5 in-flight and >> 16KB >> batches we can only have 80KB of data in-flight per partition. Which >> means >> that with 50ms latency, we can only push ~1.6MB/sec per partition (this >> goes down if we consider higher latencies, e.g. with 100ms we can only >> push >> ~0.8MB/sec). >> >> I think it would be great to separate the two sizes: >> >> 1. When appending records to a batch, create a new batch if the current >> exceeds a larger size (we can call it batch.max.size), say 256KB by >> default. >> 2. When we drain, consider batch 'ready' if it exceeds batch.size, which >> is >> 16KB by default. >> >> For memory conservation we may introduce batch.initial.size if we want to >> have a flexibility to make it even smaller than batch.size, or we can just >> always use batch.size as the initial size (in which case we don't >> need batch.initial.size config). >> >> -Artem >> >> On Fri, Oct 22, 2021 at 1:52 AM Luke Chen <show...@gmail.com> wrote: >> >> > Hi Kafka dev, >> > I'd like to start a vote for the proposal: KIP-782: Expandable batch >> size >> > in producer. >> > >> > The main purpose for this KIP is to have better memory usage in >> producer, >> > and also save users from the dilemma while setting the batch size >> > configuration. After this KIP, users can set a higher batch.size without >> > worries, and of course, with an appropriate "batch.initial.size". >> > >> > Derailed description can be found here: >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer >> > >> > Any comments and feedback are welcome. >> > >> > Thank you. >> > Luke >> > >> >