> I think this KIP would change the behaviour of producers when there are
multiple partitions ready to be sent

This is correct, the pattern changes and becomes more coarse-grained.  But
I don't think it changes fairness over the long run.  I think it's a good
idea to change drainIndex to be random rather than round robin to avoid
forming patterns where some partitions would consistently get higher
latencies than others because they wait longer for their turn.

If we really wanted to preserve the exact patterns, we could either try to
support multiple 16KB batches from one partition per request (probably
would require protocol change to change logic on the broker for duplicate
detection) or try to re-batch 16KB batches from accumulator into larger
batches during send (additional computations) or try to consider all
partitions assigned to a broker to check if a new batch needs to be created
(i.e. compare cumulative batch size from all partitions assigned to a
broker and create new batch when cumulative size is 1MB, more complex).

Overall, it seems like just increasing the max batch size is a simpler
solution and it does favor larger batch sizes, which is beneficial not just
for production.

> ready batches well past when they ought to be sent (according to their
linger.ms)

The trigger for marking batches ready to be sent isn't changed - a batch is
ready to be sent once it reaches 16KB, so by the time larger batches start
forming, linger.ms wouldn't matter much because the batching goal is met
and the batch can be sent immediately.  Larger batches start forming once
the client starts waiting for the server, in which case some data will wait
its turn to be sent.  This will happen for some data regardless of how we
pick data to send, the question is just whether we'd have some scenarios
where some partitions would consistently experience higher latency than
others.  I think picking drainIndex randomly would prevent such scenarios.

-Artem

On Mon, Nov 22, 2021 at 2:28 AM Tom Bentley <tbent...@redhat.com> wrote:

> Hi Luke,
>
> Thanks for the KIP!
>
> Currently because buffers are allocated using batch.size it means we can
> handle records that are that large (e.g. one big record per batch). Doesn't
> the introduction of smaller buffer sizes (batch.initial.size) mean a
> corresponding decrease in the maximum record size that the producer can
> handle? That might not be a problem if the user knows their maximum record
> size and has tuned batch.initial.size accordingly, but if the default for
> batch.initial.size < batch.size it could cause regressions for existing
> users with a large record size, I think. It should be enough for
> batch.initial.size to default to batch.size, allowing users who care about
> the memory saving in the off-peak throughput case to do the tuning, but not
> causing a regression for existing users.
>
> I think this KIP would change the behaviour of producers when there are
> multiple partitions ready to be sent: By sending all the ready buffers
> (which may now be > batch.size) for the first partition, we could end up
> excluding ready buffers for other partitions from the current send. In
> other words, as I understand the KIP currently, there's a change in
> fairness. I think the code in RecordAccumulator#drainBatchesForOneNode will
> ensure fairness in the long run, because the drainIndex will ensure that
> those other partitions each get their turn at being the first. But isn't
> there the risk that drainBatchesForOneNode would end up not sending ready
> batches well past when they ought to be sent (according to their linger.ms
> ),
> because it's sending buffers for earlier partitions too aggressively? Or,
> to put it another way, perhaps the RecordAccumulator should round-robin the
> ready buffers for _all_ the partitions before trying to fill the remaining
> space with the extra buffers (beyond the batch.size limit) for the first
> partitions?
>
> Kind regards,
>
> Tom
>
> On Wed, Oct 20, 2021 at 1:35 PM Luke Chen <show...@gmail.com> wrote:
>
> > Hi Ismael and all devs,
> > Is there any comments/suggestions to this KIP?
> > If no, I'm going to update the KIP based on my previous mail, and start a
> > vote tomorrow or next week.
> >
> > Thank you.
> > Luke
> >
> > On Mon, Oct 18, 2021 at 2:40 PM Luke Chen <show...@gmail.com> wrote:
> >
> > > Hi Ismael,
> > > Thanks for your comments.
> > >
> > > 1. Why do we have to reallocate the buffer? We can keep a list of
> buffers
> > > instead and avoid reallocation.
> > > -> Do you mean we allocate multiple buffers with "buffer.initial.size",
> > > and link them together (with linked list)?
> > > ex:
> > > a. We allocate 4KB initial buffer
> > > | 4KB |
> > >
> > > b. when new records reached and the remaining buffer is not enough for
> > the
> > > records, we create another batch with "batch.initial.size" buffer
> > > ex: we already have 3KB of data in the 1st buffer, and here comes the
> 2KB
> > > record
> > >
> > > | 4KB (1KB remaining) |
> > > now, record: 2KB coming
> > > We fill the 1st 1KB into 1st buffer, and create new buffer, and linked
> > > together, and fill the rest of data into it
> > > | 4KB (full) | ---> | 4KB (3KB remaining) |
> > >
> > > Is that what you mean?
> > > If so, I think I like this idea!
> > > If not, please explain more detail about it.
> > > Thank you.
> > >
> > > 2. I think we should also consider tweaking the semantics of batch.size
> > so
> > > that the sent batches can be larger if the batch is not ready to be
> sent
> > > (while still respecting max.request.size and perhaps a new
> > max.batch.size).
> > >
> > > --> In the KIP, I was trying to make the "batch.size" as the upper
> bound
> > > of the batch size, and introduce a "batch.initial.size" as initial
> batch
> > > size.
> > > So are you saying that we can let "batch.size" as initial batch size
> and
> > > introduce a "max.batch.size" as upper bound value?
> > > That's a good suggestion, but that would change the semantics of
> > > "batch.size", which might surprise some users. I think my original
> > proposal
> > > ("batch.initial.size") is safer for users. What do you think?
> > >
> > > Thank you.
> > > Luke
> > >
> > >
> > > On Mon, Oct 18, 2021 at 3:12 AM Ismael Juma <ism...@juma.me.uk> wrote:
> > >
> > >> I think we should also consider tweaking the semantics of batch.size
> so
> > >> that the sent batches can be larger if the batch is not ready to be
> sent
> > >> (while still respecting max.request.size and perhaps a new
> > >> max.batch.size).
> > >>
> > >> Ismael
> > >>
> > >> On Sun, Oct 17, 2021, 12:08 PM Ismael Juma <ism...@juma.me.uk> wrote:
> > >>
> > >> > Hi Luke,
> > >> >
> > >> > Thanks for the KIP. Why do we have to reallocate the buffer? We can
> > >> keep a
> > >> > list of buffers instead and avoid reallocation.
> > >> >
> > >> > Ismael
> > >> >
> > >> > On Sun, Oct 17, 2021, 2:02 AM Luke Chen <show...@gmail.com> wrote:
> > >> >
> > >> >> Hi Kafka dev,
> > >> >> I'd like to start the discussion for the proposal: KIP-782:
> > Expandable
> > >> >> batch size in producer.
> > >> >>
> > >> >> The main purpose for this KIP is to have better memory usage in
> > >> producer,
> > >> >> and also save users from the dilemma while setting the batch size
> > >> >> configuration. After this KIP, users can set a higher batch.size
> > >> without
> > >> >> worries, and of course, with an appropriate "batch.initial.size"
> and
> > >> >> "batch.reallocation.factor".
> > >> >>
> > >> >> Derailed description can be found here:
> > >> >>
> > >> >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-782%3A+Expandable+batch+size+in+producer
> > >> >>
> > >> >> Any comments and feedback are welcome.
> > >> >>
> > >> >> Thank you.
> > >> >> Luke
> > >> >>
> > >> >
> > >>
> > >
> >
>

Reply via email to