Hi Artem,
Yes, I agree if we go with random selection instead of round-robin
selection, the latency issue will be more fair. That is, if there are 10
partitions, the 10th partition will always be the last choice in each round
in current design, but with random selection, the chance to be selected is
more fair.

However, I think that's kind of out of scope with this KIP. This is an
existing issue, and it might need further discussion to decide if this
change is necessary.

I agree the default 32KB for "batch.max.size" might be not huge improvement
compared with 256KB. I'm thinking, maybe default to "64KB" for
"batch.max.size", and make the documentation clear that if the "batch.max.size"
is increased, there might be chances that the "ready" partitions need to
wait for next request to send to broker, because of the "max.request.size"
(default 1MB) limitation. "max.request.size" can also be considered to
increase to avoid this issue. What do you think?

Thank you.
Luke

On Wed, Nov 24, 2021 at 2:26 AM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

> >  maybe I can firstly decrease the "batch.max.size" to 32KB
>
> I think 32KB is too small.  With 5 in-flight and 100ms latency we can
> produce 1.6MB/s per partition.  With 256KB we can produce 12.8MB/s per
> partition.  We should probably set up some testing and see if 256KB has
> problems.
>
> To illustrate latency dynamics, let's consider a simplified model: 1
> in-flight request per broker, produce latency 125ms, 256KB max request
> size, 16 partitions assigned to the same broker, every second 128KB is
> produced to each partition (total production rate is 2MB/sec).
>
> If the batch size is 16KB, then the pattern would be the following:
>
> 0ms - produce 128KB into each partition
> 0ms - take 16KB from each partition send (total 256KB)
> 125ms - complete first 16KB from each partition, send next 16KB
> 250ms - complete second 16KB, send next 16KB
> ...
> 1000ms - complete 8th 16KB from each partition
>
> from this model it's easy to see that there are 256KB that are sent
> immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> 875ms.
>
> If the batch size is 256KB, then the pattern would be the following:
>
> 0ms - produce 128KB into each partition
> 0ms - take 128KB each from first 2 partitions and send (total 256KB)
> 125ms - complete 2 first partitions, send data from next 2 partitions
> ...
> 1000ms - complete last 2 partitions
>
> even though the pattern is different, there are still 256KB that are sent
> immediately, 256KB that are sent in 125ms, ... 256KB that are sent in
> 875ms.
>
> Now, in this example if we do strictly round-robin (current implementation)
> and we have this exact pattern (not sure how often such regular pattern
> would happen in practice -- I would expect that it would be a bit more
> random), some partitions would experience higher latency than others (not
> sure how much it would matter in practice -- in the end of the day some
> bytes produced to a topic would have higher latency and some bytes would
> have lower latency).  This pattern is easily fixed by choosing the next
> partition randomly instead of using round-robin.
>
> -Artem
>
> On Tue, Nov 23, 2021 at 12:08 AM Luke Chen <show...@gmail.com> wrote:
>
> > Hi Tom,
> > Thanks for your comments. And thanks for Artem's explanation.
> > Below is my response:
> >
> > > Currently because buffers are allocated using batch.size it means we
> can
> > handle records that are that large (e.g. one big record per batch).
> Doesn't
> > the introduction of smaller buffer sizes (batch.initial.size) mean a
> > corresponding decrease in the maximum record size that the producer can
> > handle?
> >
> > Actually, the "batch.size" is only like a threshold to decide if the
> batch
> > is "ready to be sent". That is, even if you set the "batch.size=16KB"
> > (default value), users can still send one record sized with 20KB, as long
> > as the size is less than "max.request.size" in producer (default 1MB).
> > Therefore, the introduction of "batch.initial.size" won't decrease the
> > maximum record size that the producer can handle.
> >
> > > But isn't there the risk that drainBatchesForOneNode would end up not
> > sending ready
> > batches well past when they ought to be sent (according to their
> linger.ms
> > ),
> > because it's sending buffers for earlier partitions too aggressively?
> >
> > Did you mean that we have a "max.request.size" per request (default is
> > 1MB), and before this KIP, the request can include 64 batches in single
> > request ["batch.size"(16KB) * 64 = 1MB], but now, we might be able to
> > include 32 batches or less, because we aggressively sent more records in
> > one batch, is that what you meant? That's a really good point that I've
> > never thought about. I think your suggestion to go through other
> partitions
> > that just fit "batch.size", or expire "linger.ms" first, before handling
> > the one that is > "batch.size" limit is not a good way, because it might
> > cause the one with size > "batch.size" always in the lowest priority, and
> > cause starving issue that the batch won't have chance to get sent.
> >
> > I don't have better solution for it, but maybe I can firstly decrease the
> > "batch.max.size" to 32KB, instead of aggressively 256KB in the KIP. That
> > should alleviate the problem. And still improve the throughput. What do
> you
> > think?
> >
> > Thank you.
> > Luke
> >
> > On Tue, Nov 23, 2021 at 9:04 AM Artem Livshits
> > <alivsh...@confluent.io.invalid> wrote:
> >
> > > > I think this KIP would change the behaviour of producers when there
> are
> > > multiple partitions ready to be sent
> > >
> > > This is correct, the pattern changes and becomes more coarse-grained.
> > But
> > > I don't think it changes fairness over the long run.  I think it's a
> good
> > > idea to change drainIndex to be random rather than round robin to avoid
> > > forming patterns where some partitions would consistently get higher
> > > latencies than others because they wait longer for their turn.
> > >
> > > If we really wanted to preserve the exact patterns, we could either try
> > to
> > > support multiple 16KB batches from one partition per request (probably
> > > would require protocol change to change logic on the broker for
> duplicate
> > > detection) or try to re-batch 16KB batches from accumulator into larger
> > > batches during send (additional computations) or try to consider all
> > > partitions assigned to a broker to check if a new batch needs to be
> > created
> > > (i.e. compare cumulative batch size from all partitions assigned to a
> > > broker and create new batch when cumulative size is 1MB, more complex).
> > >
> > > Overall, it seems like just increasing the max batch size is a simpler
> > > solution and it does favor larger batch sizes, which is beneficial not
> > just
> > > for production.
> > >
> > > > ready batches well past when they ought to be sent (according to
> their
> > > linger.ms)
> > >
> > > The trigger for marking batches ready to be sent isn't changed - a
> batch
> > is
> > > ready to be sent once it reaches 16KB, so by the time larger batches
> > start
> > > forming, linger.ms wouldn't matter much because the batching goal is
> met
> > > and the batch can be sent immediately.  Larger batches start forming
> once
> > > the client starts waiting for the server, in which case some data will
> > wait
> > > its turn to be sent.  This will happen for some data regardless of how
> we
> > > pick data to send, the question is just whether we'd have some
> scenarios
> > > where some partitions would consistently experience higher latency than
> > > others.  I think picking drainIndex randomly would prevent such
> > scenarios.
> > >
> > > -Artem
> > >
> > > On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <tbent...@redhat.com>
> wrote:
> > >
> > > > Hi Luke,
> > > >
> > > > Thanks for the KIP!
> > > >
> > > > Currently because buffers are allocated using batch.size it means we
> > can
> > > > handle records that are that large (e.g. one big record per batch).
> > > Doesn't
> > > > the introduction of smaller buffer sizes (batch.initial.size) mean a
> > > > corresponding decrease in the maximum record size that the producer
> can
> > > > handle? That might not be a problem if the user knows their maximum
> > > record
> > > > size and has tuned batch.initial.size accordingly, but if the default
> > for
> > > > batch.initial.size < batch.size it could cause regressions for
> existing
> > > > users with a large record size, I think. It should be enough for
> > > > batch.initial.size to default to batch.size, allowing users who care
> > > about
> > > > the memory saving in the off-peak throughput case to do the tuning,
> but
> > > not
> > > > causing a regression for existing users.
> > > >
> > > > I think this KIP would change the behaviour of producers when there
> are
> > > > multiple partitions ready to be sent: By sending all the ready
> buffers
> > > > (which may now be > batch.size) for the first partition, we could end
> > up
> > > > excluding ready buffers for other partitions from the current send.
> In
> > > > other words, as I understand the KIP currently, there's a change in
> > > > fairness. I think the code in
> RecordAccumulator#drainBatchesForOneNode
> > > will
> > > > ensure fairness in the long run, because the drainIndex will ensure
> > that
> > > > those other partitions each get their turn at being the first. But
> > isn't
> > > > there the risk that drainBatchesForOneNode would end up not sending
> > ready
> > > > batches well past when they ought to be sent (according to their
> > > linger.ms
> > > > ),
> > > > because it's sending buffers for earlier partitions too aggressively?
> > Or,
> > > > to put it another way, perhaps the RecordAccumulator should
> round-robin
> > > the
> > > > ready buffers for _all_ the partitions before trying to fill the
> > > remaining
> > > > space with the extra buffers (beyond the batch.size limit) for the
> > first
> > > > partitions?
> > > >
> > > > Kind regards,
> > > >
> > > > Tom
> > > >
> > > > On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <show...@gmail.com> wrote:
> > > >
> > > > > Hi Ismael and all devs,
> > > > > Is there any comments/suggestions to this KIP?
> > > > > If no, I'm going to update the KIP based on my previous mail, and
> > > start a
> > > > > vote tomorrow or next week.
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <show...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi Ismael,
> > > > > > Thanks for your comments.
> > > > > >
> > > > > > 1. Why do we have to reallocate the buffer? We can keep a list of
> > > > buffers
> > > > > > instead and avoid reallocation.
> > > > > > -> Do you mean we allocate multiple buffers with
> > > "buffer.initial.size",
> > > > > > and link them together (with linked list)?
> > > > > > ex:
> > > > > > a. We allocate 4KB initial buffer
> > > > > > | 4KB |
> > > > > >
> > > > > > b. when new records reached and the remaining buffer is not
> enough
> > > for
> > > > > the
> > > > > > records, we create another batch with "batch.initial.size" buffer
> > > > > > ex: we already have 3KB of data in the 1st buffer, and here comes
> > the
> > > > 2KB
> > > > > > record
> > > > > >
> > > > > > | 4KB (1KB remaining) |
> > > > > > now, record: 2KB coming
> > > > > > We fill the 1st 1KB into 1st buffer, and create new buffer, and
> > > linked
> > > > > > together, and fill the rest of data into it
> > > > > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > > > > >
> > > > > > Is that what you mean?
> > > > > > If so, I think I like this idea!
> > > > > > If not, please explain more detail about it.
> > > > > > Thank you.
> > > > > >
> > > > > > 2. I think we should also consider tweaking the semantics of
> > > batch.size
> > > > > so
> > > > > > that the sent batches can be larger if the batch is not ready to
> be
> > > > sent
> > > > > > (while still respecting max.request.size and perhaps a new
> > > > > max.batch.size).
> > > > > >
> > > > > > --> In the KIP, I was trying to make the "batch.size" as the
> upper
> > > > bound
> > > > > > of the batch size, and introduce a "batch.initial.size" as
> initial
> > > > batch
> > > > > > size.
> > > > > > So are you saying that we can let "batch.size" as initial batch
> > size
> > > > and
> > > > > > introduce a "max.batch.size" as upper bound value?
> > > > > > That's a good suggestion, but that would change the semantics of
> > > > > > "batch.size", which might surprise some users. I think my
> original
> > > > > proposal
> > > > > > ("batch.initial.size") is safer for users. What do you think?
> > > > > >
> > > > > > Thank you.
> > > > > > Luke
> > > > > >
> > > > > >
> > > > > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <ism...@juma.me.uk>
> > > wrote:
> > > > > >
> > > > > >> I think we should also consider tweaking the semantics of
> > batch.size
> > > > so
> > > > > >> that the sent batches can be larger if the batch is not ready to
> > be
> > > > sent
> > > > > >> (while still respecting max.request.size and perhaps a new
> > > > > >> max.batch.size).
> > > > > >>
> > > > > >> Ismael
> > > > > >>
> > > > > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <ism...@juma.me.uk>
> > > wrote:
> > > > > >>
> > > > > >> > Hi Luke,
> > > > > >> >
> > > > > >> > Thanks for the KIP. Why do we have to reallocate the buffer?
> We
> > > can
> > > > > >> keep a
> > > > > >> > list of buffers instead and avoid reallocation.
> > > > > >> >
> > > > > >> > Ismael
> > > > > >> >
> > > > > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <show...@gmail.com>
> > > wrote:
> > > > > >> >
> > > > > >> >> Hi Kafka dev,
> > > > > >> >> I'd like to start the discussion for the proposal: KIP-782:
> > > > > Expandable
> > > > > >> >> batch size in producer.
> > > > > >> >>
> > > > > >> >> The main purpose for this KIP is to have better memory usage
> in
> > > > > >> producer,
> > > > > >> >> and also save users from the dilemma while setting the batch
> > size
> > > > > >> >> configuration. After this KIP, users can set a higher
> > batch.size
> > > > > >> without
> > > > > >> >> worries, and of course, with an appropriate
> > "batch.initial.size"
> > > > and
> > > > > >> >> "batch.reallocation.factor".
> > > > > >> >>
> > > > > >> >> Derailed description can be found here:
> > > > > >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > > > > >> >>
> > > > > >> >> Any comments and feedback are welcome.
> > > > > >> >>
> > > > > >> >> Thank you.
> > > > > >> >> Luke
> > > > > >> >>
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to