Hi Jun,

33.  That's an interesting point.  Technically, onNewBatch is just a way to
pass some signal to the partitioner, the sticky partitioner uses this
signal that is suboptimal, in theory someone could use it for something else

-Artem

On Mon, Mar 14, 2022 at 9:11 AM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the reply. A couple of more comments.
>
> 32. We could defer the metrics until we know what to add.
>
> 33. Since we are deprecating DefaultPartitioner and
> UniformStickyPartitioner, should we depreciate Partitioner.onNewBatch() too
> given its unexpected side effect?
>
> Thanks,
>
> Jun
>
> On Thu, Mar 10, 2022 at 5:20 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Jun,
> >
> > 32. Good point.  Do you think it's ok to defer the metrics until we run
> > some benchmarks so that we get a better idea of what metrics we need?
> >
> > -Artem
> >
> > On Thu, Mar 10, 2022 at 3:12 PM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Artem.
> > >
> > > Thanks for the reply. One more comment.
> > >
> > > 32. Do we need to add any new metric on the producer? For example, if
> > > partitioner.availability.timeout.ms is > 0, it might be useful to know
> > the
> > > number of unavailable partitions.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Mar 10, 2022 at 12:46 PM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > 30.  Clarified.
> > > >
> > > > 31. I plan to do some benchmarking once implementation is finished,
> > I'll
> > > > update the KIP with the results once I have them.  The reason to make
> > it
> > > > default is that it won't be used otherwise and we won't know if it's
> > good
> > > > or not in practical workloads.
> > > >
> > > > -Artem
> > > >
> > > > On Thu, Mar 10, 2022 at 11:42 AM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Artem,
> > > > >
> > > > > Thanks for the updated KIP. A couple of more comments.
> > > > >
> > > > > 30. For the 3 new configs, it would be useful to make it clear that
> > > they
> > > > > are only relevant when the partitioner class is null.
> > > > >
> > > > > 31. partitioner.adaptive.partitioning.enable : I am wondering
> whether
> > > it
> > > > > should default to true. This is a more complex behavior than
> "uniform
> > > > > sticky" and may take some time to get right. If we do want to
> enable
> > it
> > > > by
> > > > > default, it would be useful to validate it with some test results.
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Mar 9, 2022 at 10:05 PM Artem Livshits
> > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Thank you for feedback, I've discussed this offline with some of
> > the
> > > > > folks
> > > > > > and updated the KIP.  The main change is that now instead of
> using
> > > > > > DefaultPartitioner and UniformStickyPartitioners as flags, in the
> > new
> > > > > > proposal the default partitioner is null, so if no custom
> > partitioner
> > > > is
> > > > > > specified then the partitioning logic is implemented in
> > > KafkaProducer.
> > > > > > Compatibility section is updated as well.  Also the configuration
> > > > options
> > > > > > are renamed to be more consistent.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > > On Fri, Mar 4, 2022 at 10:38 PM Luke Chen <show...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi Artem,
> > > > > > >
> > > > > > > Thanks for your explanation and update to the KIP.
> > > > > > > Some comments:
> > > > > > >
> > > > > > > 5. In the description for `enable.adaptive.partitioning`, the
> > > `false`
> > > > > > case,
> > > > > > > you said:
> > > > > > > > the producer will try to distribute messages uniformly.
> > > > > > > I think we should describe the possible skewing distribution.
> > > > > Otherwise,
> > > > > > > user might be confused about why adaptive partitioning is
> > > important.
> > > > > > >
> > > > > > > 6. In the description for `partition.availability.timeout.ms`,
> I
> > > > think
> > > > > > we
> > > > > > > should mention in the last sentence about if
> > > > > > `enable.adaptive.partitioning`
> > > > > > > is disabled this logic is also disabled.
> > > > > > >
> > > > > > > 7. Similar thoughts as Ismael, I think we should have a POC and
> > > test
> > > > to
> > > > > > > prove that this adaptive partitioning algorithm can have better
> > > > uniform
> > > > > > > partitioning, compared with original sticky one.
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Fri, Mar 4, 2022 at 9:22 PM Ismael Juma <ism...@juma.me.uk>
> > > > wrote:
> > > > > > >
> > > > > > > > Regarding `3`, we should only deprecate it if we're sure the
> > new
> > > > > > approach
> > > > > > > > handles all cases better. Are we confident about that for
> both
> > of
> > > > the
> > > > > > > > previous partitioners?
> > > > > > > >
> > > > > > > > Ismael
> > > > > > > >
> > > > > > > > On Fri, Mar 4, 2022 at 1:37 AM David Jacot
> > > > > <dja...@confluent.io.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Artem,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP! I have a few comments:
> > > > > > > > >
> > > > > > > > > 1. In the preamble of the proposed change section, there is
> > > > still a
> > > > > > > > > mention of the
> > > > > > > > > -1 approach. My understanding is that we have moved away
> from
> > > it
> > > > > now.
> > > > > > > > >
> > > > > > > > > 2. I am a bit concerned by the trick suggested about the
> > > > > > > > > DefaultPartitioner and
> > > > > > > > > the UniformStickyPartitioner. I do agree that implementing
> > the
> > > > > logic
> > > > > > in
> > > > > > > > the
> > > > > > > > > producer itself is a good thing. However, it is weird from
> a
> > > user
> > > > > > > > > perspective
> > > > > > > > > that he can set a class as partitioner that is not used in
> > the
> > > > > end. I
> > > > > > > > > think that
> > > > > > > > > this will be confusing for our users. Have we considered
> > > changing
> > > > > the
> > > > > > > > > default
> > > > > > > > > value of partitioner.class to null to indicate that the new
> > > > > built-in
> > > > > > > > > partitioner
> > > > > > > > > must be used? By default, the built-in partitioner would be
> > > used
> > > > > > unless
> > > > > > > > the
> > > > > > > > > user explicitly specify one. The downside is that the new
> > > default
> > > > > > > > behavior
> > > > > > > > > would not work if the user explicitly specify the
> partitioner
> > > but
> > > > > we
> > > > > > > > could
> > > > > > > > > mitigate this with my next point.
> > > > > > > > >
> > > > > > > > > 3. Related to the previous point, I think that we could
> > > deprecate
> > > > > > both
> > > > > > > > the
> > > > > > > > > DefaultPartitioner and the UniformStickyPartitioner. I
> would
> > > also
> > > > > > add a
> > > > > > > > > warning if one of them is explicitly provided by the user
> to
> > > > inform
> > > > > > > them
> > > > > > > > > that they should switch to the new built-in one. I am
> pretty
> > > sure
> > > > > > that
> > > > > > > > most
> > > > > > > > > of the folks use the default configuration anyway.
> > > > > > > > >
> > > > > > > > > 4. It would be great if we could explain why the -1 way was
> > > > > rejected.
> > > > > > > At
> > > > > > > > > the moment, the rejected alternative only explain the idea
> > but
> > > > does
> > > > > > not
> > > > > > > > > say why we rejected it.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > David
> > > > > > > > >
> > > > > > > > > On Fri, Mar 4, 2022 at 6:03 AM Artem Livshits
> > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Jun,
> > > > > > > > > >
> > > > > > > > > > 2. Removed the option from the KIP.  Now the sticky
> > > > partitioning
> > > > > > > > > threshold
> > > > > > > > > > is hardcoded to batch.size.
> > > > > > > > > >
> > > > > > > > > > 20. Added the corresponding wording to the KIP.
> > > > > > > > > >
> > > > > > > > > > -Artem
> > > > > > > > > >
> > > > > > > > > > On Thu, Mar 3, 2022 at 10:52 AM Jun Rao
> > > > <j...@confluent.io.invalid
> > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi, Artem,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the reply.
> > > > > > > > > > >
> > > > > > > > > > > 1. Sounds good.
> > > > > > > > > > >
> > > > > > > > > > > 2. If we don't expect users to change it, we probably
> > could
> > > > > just
> > > > > > > > leave
> > > > > > > > > out
> > > > > > > > > > > the new config. In general, it's easy to add a new
> > config,
> > > > but
> > > > > > hard
> > > > > > > > to
> > > > > > > > > > > remove an existing config.
> > > > > > > > > > >
> > > > > > > > > > > 20. The two new configs enable.adaptive.partitioning
> and
> > > > > > > > > > > partition.availability.timeout.ms only apply to the
> two
> > > > > built-in
> > > > > > > > > > > partitioners DefaultPartitioner and
> > > UniformStickyPartitioner,
> > > > > > > right?
> > > > > > > > It
> > > > > > > > > > > would be useful to document that in the KIP.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Mar 3, 2022 at 9:47 AM Artem Livshits
> > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > >
> > > > > > > > > > > > Thank you for the suggestions.
> > > > > > > > > > > >
> > > > > > > > > > > > 1. As we discussed offline, we can hardcode the logic
> > for
> > > > > > > > > > > > DefaultPartitioner and UniformStickyPartitioner in
> the
> > > > > > > > KafkaProducer
> > > > > > > > > > > (i.e.
> > > > > > > > > > > > the DefaultPartitioner.partition won't get called,
> > > instead
> > > > > > > > > KafkaProducer
> > > > > > > > > > > > would check if the partitioner is an instance of
> > > > > > > DefaultPartitioner
> > > > > > > > > and
> > > > > > > > > > > > then run the actual partitioning logic itself).  Then
> > the
> > > > > > change
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > Partitioner wouldn't be required.  I'll update the
> KIP
> > to
> > > > > > reflect
> > > > > > > > > that.
> > > > > > > > > > > >
> > > > > > > > > > > > 2. I don't expect users to change this too often, as
> > > > changing
> > > > > > it
> > > > > > > > > would
> > > > > > > > > > > > require a bit of studying of the production patterns.
> > > As a
> > > > > > > general
> > > > > > > > > > > > principle, if I can think of a model that requires a
> > > > > deviation
> > > > > > > from
> > > > > > > > > > > > default, I tend to add a configuration option.  It
> > could
> > > be
> > > > > > that
> > > > > > > > > it'll
> > > > > > > > > > > > never get used in practice, but I cannot prove that.
> > I'm
> > > > ok
> > > > > > with
> > > > > > > > > > > removing
> > > > > > > > > > > > the option, let me know what you think.
> > > > > > > > > > > >
> > > > > > > > > > > > -Artem
> > > > > > > > > > > >
> > > > > > > > > > > > On Mon, Feb 28, 2022 at 2:06 PM Jun Rao
> > > > > > <j...@confluent.io.invalid
> > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi, Artem,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the reply. A few more comments.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. Since we control the implementation and the
> usage
> > of
> > > > > > > > > > > > DefaultPartitioner,
> > > > > > > > > > > > > another way is to instantiate the
> DefaultPartitioner
> > > > with a
> > > > > > > > special
> > > > > > > > > > > > > constructor, which allows it to have more access to
> > > > > internal
> > > > > > > > > > > information.
> > > > > > > > > > > > > Then we could just change the behavior of
> > > > > DefaultPartitioner
> > > > > > > > such
> > > > > > > > > that
> > > > > > > > > > > > it
> > > > > > > > > > > > > can use the internal infoamtion when choosing the
> > > > > partition.
> > > > > > > This
> > > > > > > > > seems
> > > > > > > > > > > > > more intuitive than having DefaultPartitioner
> return
> > -1
> > > > > > > > partition.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2. I guess partitioner.sticky.batch.size is
> > introduced
> > > > > > because
> > > > > > > > the
> > > > > > > > > > > > > effective batch size could be less than batch.size
> > and
> > > we
> > > > > > want
> > > > > > > to
> > > > > > > > > align
> > > > > > > > > > > > > partition switching with the effective batch size.
> > How
> > > > > would
> > > > > > a
> > > > > > > > user
> > > > > > > > > > > know
> > > > > > > > > > > > > the effective batch size to set
> > > > > partitioner.sticky.batch.size
> > > > > > > > > properly?
> > > > > > > > > > > > If
> > > > > > > > > > > > > the user somehow knows the effective batch size,
> does
> > > > > setting
> > > > > > > > > > > batch.size
> > > > > > > > > > > > to
> > > > > > > > > > > > > the effective batch size achieve the same result?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 4. Thanks for the explanation. Makes sense to me.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jun
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jun
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Feb 25, 2022 at 8:26 PM Artem Livshits
> > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. Updated the KIP to add a couple paragraphs
> about
> > > > > > > > > implementation
> > > > > > > > > > > > > > necessities in the Proposed Changes section.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. Sorry if my reply was confusing, what I meant
> to
> > > say
> > > > > > (and
> > > > > > > I
> > > > > > > > > > > > elaborated
> > > > > > > > > > > > > > on that in point #3) is that there could be
> > patterns
> > > > for
> > > > > > > which
> > > > > > > > > 16KB
> > > > > > > > > > > > > > wouldn't be the most effective setting, thus it
> > would
> > > > be
> > > > > > good
> > > > > > > > to
> > > > > > > > > make
> > > > > > > > > > > > it
> > > > > > > > > > > > > > configurable.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 4. We could use broker readiness timeout.  But
> I'm
> > > not
> > > > > sure
> > > > > > > it
> > > > > > > > > would
> > > > > > > > > > > > > > correctly model the broker load.  The problem is
> > that
> > > > > > latency
> > > > > > > > is
> > > > > > > > > not
> > > > > > > > > > > an
> > > > > > > > > > > > > > accurate measure of throughput, we could have 2
> > > brokers
> > > > > > that
> > > > > > > > have
> > > > > > > > > > > equal
> > > > > > > > > > > > > > throughput but one has higher latency (so it
> takes
> > > > larger
> > > > > > > > batches
> > > > > > > > > > > less
> > > > > > > > > > > > > > frequently, but still takes the same load).
> > > > > Latency-based
> > > > > > > > logic
> > > > > > > > > is
> > > > > > > > > > > > > likely
> > > > > > > > > > > > > > to send less data to the broker with higher
> > latency.
> > > > > Using
> > > > > > > the
> > > > > > > > > queue
> > > > > > > > > > > > > size
> > > > > > > > > > > > > > would adapt to throughput, regardless of latency
> > > (which
> > > > > > could
> > > > > > > > be
> > > > > > > > > > > just a
> > > > > > > > > > > > > > result of deployment topology), so that's the
> model
> > > > > chosen
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > > > proposal.  The partition.availability.timeout.ms
> > > logic
> > > > > > > > > approaches
> > > > > > > > > > > the
> > > > > > > > > > > > > > model
> > > > > > > > > > > > > > from a slightly different angle, say we have a
> > > > > requirement
> > > > > > to
> > > > > > > > > deliver
> > > > > > > > > > > > > > messages via brokers that have a certain latency,
> > > then
> > > > > > > > > > > > > > partition.availability.timeout.ms could be used
> to
> > > > tune
> > > > > > > that.
> > > > > > > > > > > Latency
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > much more volatile metric than throughput
> (latency
> > > > > depends
> > > > > > on
> > > > > > > > > > > external
> > > > > > > > > > > > > > load, on capacity, on deployment topology, on
> > jitter
> > > in
> > > > > > > > network,
> > > > > > > > > on
> > > > > > > > > > > > > jitter
> > > > > > > > > > > > > > in disk, etc.) and I think it would be best to
> > leave
> > > > > > > > > latency-based
> > > > > > > > > > > > > > thresholds configurable to tune for the
> > environment.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > -Artem
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Feb 23, 2022 at 11:14 AM Jun Rao
> > > > > > > > > <j...@confluent.io.invalid>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi, Artem,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for the reply. A few more comments.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. Perhaps you could elaborate a bit more on
> how
> > > the
> > > > > > > producer
> > > > > > > > > > > > > determines
> > > > > > > > > > > > > > > the partition if the partitioner returns -1.
> This
> > > > will
> > > > > > help
> > > > > > > > > > > > understand
> > > > > > > > > > > > > > why
> > > > > > > > > > > > > > > encapsulating that logic as a partitioner is
> not
> > > > clean.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2. I am not sure that I understand this part.
> If
> > > > 15.5KB
> > > > > > is
> > > > > > > > more
> > > > > > > > > > > > > > efficient,
> > > > > > > > > > > > > > > could we just set batch.size to 15.5KB?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 4. Yes, we could add a switch (or a variant of
> > the
> > > > > > > > > partitioner) for
> > > > > > > > > > > > > > > enabling this behavior. Also, choosing
> partitions
> > > > based
> > > > > > on
> > > > > > > > > broker
> > > > > > > > > > > > > > readiness
> > > > > > > > > > > > > > > can be made in a smoother way. For example, we
> > > could
> > > > > > track
> > > > > > > > the
> > > > > > > > > last
> > > > > > > > > > > > > time
> > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > broker has drained any batches from the
> > > accumulator.
> > > > We
> > > > > > can
> > > > > > > > > then
> > > > > > > > > > > > select
> > > > > > > > > > > > > > > partitions from brokers proportionally to the
> > > inverse
> > > > > of
> > > > > > > that
> > > > > > > > > time.
> > > > > > > > > > > > > This
> > > > > > > > > > > > > > > seems smoother than a cutoff based on a
> > > > > > > > > > > > > > partition.availability.timeout.ms
> > > > > > > > > > > > > > >  threshold.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits
> > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hello Luke, Jun,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thank you for your feedback.  I've added the
> > > > Rejected
> > > > > > > > > Alternative
> > > > > > > > > > > > > > section
> > > > > > > > > > > > > > > > that may clarify some of the questions w.r.t.
> > > > > returning
> > > > > > > -1.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 1. I've elaborated on the -1 in the KIP.  The
> > > > problem
> > > > > > is
> > > > > > > > > that a
> > > > > > > > > > > > > > > significant
> > > > > > > > > > > > > > > > part of the logic needs to be in the producer
> > > > > (because
> > > > > > it
> > > > > > > > now
> > > > > > > > > > > uses
> > > > > > > > > > > > > > > > information about brokers that only the
> > producer
> > > > > > knows),
> > > > > > > so
> > > > > > > > > > > > > > encapsulation
> > > > > > > > > > > > > > > > of the logic within the default partitioner
> > isn't
> > > > as
> > > > > > > clean.
> > > > > > > > > > >  I've
> > > > > > > > > > > > > > added
> > > > > > > > > > > > > > > > the Rejected Alternative section that
> documents
> > > an
> > > > > > > attempt
> > > > > > > > to
> > > > > > > > > > > keep
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > encapsulation by providing new callbacks to
> the
> > > > > > > > partitioner.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2. The meaning of the
> > > partitioner.sticky.batch.size
> > > > > is
> > > > > > > > > explained
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > Uniform Sticky Batch Size section.
> Basically,
> > we
> > > > > track
> > > > > > > the
> > > > > > > > > > > amount
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > bytes
> > > > > > > > > > > > > > > > produced to the partition and if it exceeds
> > > > > > > > > > > > > > partitioner.sticky.batch.size
> > > > > > > > > > > > > > > > then we switch to the next partition.  As far
> > as
> > > > the
> > > > > > > reason
> > > > > > > > > to
> > > > > > > > > > > make
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > different from batch.size, I think Luke
> > answered
> > > > this
> > > > > > > with
> > > > > > > > > the
> > > > > > > > > > > > > question
> > > > > > > > > > > > > > > #3
> > > > > > > > > > > > > > > > -- what if the load pattern is such that
> 15.5KB
> > > > would
> > > > > > be
> > > > > > > > more
> > > > > > > > > > > > > efficient
> > > > > > > > > > > > > > > > than 16KB?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 3. I think it's hard to have one size that
> > would
> > > > fit
> > > > > > all
> > > > > > > > > > > patterns.
> > > > > > > > > > > > > > E.g.
> > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > the load pattern is such that there is linger
> > and
> > > > the
> > > > > > app
> > > > > > > > > fills
> > > > > > > > > > > the
> > > > > > > > > > > > > > batch
> > > > > > > > > > > > > > > > before linger expires, then having 16KB would
> > > most
> > > > > > likely
> > > > > > > > > > > > synchronize
> > > > > > > > > > > > > > > > batching and partition switching, so each
> > > partition
> > > > > > would
> > > > > > > > > get a
> > > > > > > > > > > > full
> > > > > > > > > > > > > > > > batch.  If load pattern is such that there
> are
> > a
> > > > few
> > > > > > > > > non-complete
> > > > > > > > > > > > > > batches
> > > > > > > > > > > > > > > > go out before a larger batch starts to fill,
> > then
> > > > it
> > > > > > may
> > > > > > > > > actually
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > beneficial to make slightly larger (e.g.
> > > linger=0,
> > > > > > first
> > > > > > > > few
> > > > > > > > > > > > records
> > > > > > > > > > > > > go
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > the first batch, then next few records go to
> > > second
> > > > > > > batch,
> > > > > > > > > and so
> > > > > > > > > > > > on,
> > > > > > > > > > > > > > > until
> > > > > > > > > > > > > > > > 5 in-flight, then larger batch would form
> while
> > > > > waiting
> > > > > > > for
> > > > > > > > > > > broker
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > respond, but the partition switch would
> happen
> > > > before
> > > > > > the
> > > > > > > > > larger
> > > > > > > > > > > > > batch
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > full).
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 4. There are a couple of reasons for
> > introducing
> > > > > > > > > > > > > > > > partition.availability.timeout.ms.  Luke's
> an
> > > > Jun's
> > > > > > > > > questions
> > > > > > > > > > > are
> > > > > > > > > > > > > > > slightly
> > > > > > > > > > > > > > > > different, so I'm going to separate replies.
> > > > > > > > > > > > > > > > (Luke) Is the queue size a good enough
> > signal?  I
> > > > > think
> > > > > > > > it's
> > > > > > > > > a
> > > > > > > > > > > good
> > > > > > > > > > > > > > > default
> > > > > > > > > > > > > > > > signal as it tries to preserve general
> fairness
> > > and
> > > > > not
> > > > > > > > > overreact
> > > > > > > > > > > > on
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > broker's state at each moment in time.  But
> > > because
> > > > > > it's
> > > > > > > > > smooth,
> > > > > > > > > > > it
> > > > > > > > > > > > > may
> > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > be reactive enough to instantaneous latency
> > > jumps.
> > > > > For
> > > > > > > > > > > > > > latency-sensitive
> > > > > > > > > > > > > > > > workloads, it may be desirable to react
> faster
> > > > when a
> > > > > > > > broker
> > > > > > > > > > > > becomes
> > > > > > > > > > > > > > > > unresponsive (but that may make the
> > distribution
> > > > > really
> > > > > > > > > choppy),
> > > > > > > > > > > so
> > > > > > > > > > > > > > > > partition.availability.timeout.ms provides
> an
> > > > > > > opportunity
> > > > > > > > to
> > > > > > > > > > > tune
> > > > > > > > > > > > > > > > adaptiveness.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > (Jun) Can we just not assign partitions to
> > > brokers
> > > > > that
> > > > > > > are
> > > > > > > > > not
> > > > > > > > > > > > > ready?
> > > > > > > > > > > > > > > > Switching partitions purely based on current
> > > broker
> > > > > > > > readiness
> > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > can really skew workload I think (or at
> least I
> > > > > > couldn't
> > > > > > > > > build a
> > > > > > > > > > > > > model
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > proves that over time it's going to be
> > generally
> > > > > > fair), I
> > > > > > > > > feel
> > > > > > > > > > > that
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > algorithm should try to be fair in general
> and
> > > use
> > > > > > > smoother
> > > > > > > > > > > signals
> > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > default (e.g. a broker with choppier latency
> > may
> > > > get
> > > > > > much
> > > > > > > > > less
> > > > > > > > > > > load
> > > > > > > > > > > > > > even
> > > > > > > > > > > > > > > > though it can handle throughput, it then may
> > > > > > potentially
> > > > > > > > skew
> > > > > > > > > > > > > > > consumption),
> > > > > > > > > > > > > > > > note that the queue-size-based logic uses
> > > > > probabilities
> > > > > > > (so
> > > > > > > > > we
> > > > > > > > > > > > don't
> > > > > > > > > > > > > > > fully
> > > > > > > > > > > > > > > > remove brokers, just make it less likely) and
> > > > > relative
> > > > > > > info
> > > > > > > > > > > rather
> > > > > > > > > > > > > > than a
> > > > > > > > > > > > > > > > threshold (so if all brokers are heavily, but
> > > > equally
> > > > > > > > loaded,
> > > > > > > > > > > they
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > > get
> > > > > > > > > > > > > > > > equal distribution, rather than get removed
> > > because
> > > > > > they
> > > > > > > > > exceed
> > > > > > > > > > > > some
> > > > > > > > > > > > > > > > threshold).  So at the very least, I would
> like
> > > > this
> > > > > > > logic
> > > > > > > > > to be
> > > > > > > > > > > > > turned
> > > > > > > > > > > > > > > off
> > > > > > > > > > > > > > > > by default as it's hard to predict what it
> > could
> > > do
> > > > > > with
> > > > > > > > > > > different
> > > > > > > > > > > > > > > patterns
> > > > > > > > > > > > > > > > (which means that there would need to be some
> > > > > > > > > configuration).  We
> > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > just not use brokers that are not ready, but
> > > > again, I
> > > > > > > think
> > > > > > > > > that
> > > > > > > > > > > > it's
> > > > > > > > > > > > > > > good
> > > > > > > > > > > > > > > > to try to be fair under normal circumstances,
> > so
> > > if
> > > > > > > > normally
> > > > > > > > > > > > brokers
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > respond under some
> > > > partition.availability.timeout.ms
> > > > > > > > > threshold
> > > > > > > > > > > and
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > application works well with those latencies,
> > then
> > > > we
> > > > > > > could
> > > > > > > > > > > > distribute
> > > > > > > > > > > > > > > data
> > > > > > > > > > > > > > > > equally between brokers that don't exceed the
> > > > > > latencies.
> > > > > > > > The
> > > > > > > > > > > > value,
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > course, would depend on the environment and
> app
> > > > > > > > requirements,
> > > > > > > > > > > hence
> > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > configurable.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 10. Added a statement at the beginning of the
> > > > > proposed
> > > > > > > > > changes.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > -Artem
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Thu, Feb 17, 2022 at 3:46 PM Jun Rao
> > > > > > > > > <j...@confluent.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi, Artem,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for the KIP. A few comments below.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 1. I agree with Luke that having the
> > > partitioner
> > > > > > > > returning
> > > > > > > > > -1
> > > > > > > > > > > is
> > > > > > > > > > > > > kind
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > weird. Could we just change the
> > implementation
> > > of
> > > > > > > > > > > > > DefaultPartitioner
> > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > new behavior?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 2. partitioner.sticky.batch.size: Similar
> > > > question
> > > > > to
> > > > > > > > > Luke. I
> > > > > > > > > > > am
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > > sure
> > > > > > > > > > > > > > > > > why we want to introduce this new
> > > configuration.
> > > > > > Could
> > > > > > > we
> > > > > > > > > just
> > > > > > > > > > > > use
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > existing batch.size?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 4. I also agree with Luke that it's not
> clear
> > > why
> > > > > we
> > > > > > > need
> > > > > > > > > > > > > > > > > partition.availability.timeout.ms. The KIP
> > > says
> > > > > the
> > > > > > > > broker
> > > > > > > > > > > > "would
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > chosen until the broker is able to accept
> the
> > > > next
> > > > > > > ready
> > > > > > > > > batch
> > > > > > > > > > > > from
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > partition". If we are keeping track of
> this,
> > > > could
> > > > > we
> > > > > > > > just
> > > > > > > > > > > avoid
> > > > > > > > > > > > > > > > assigning
> > > > > > > > > > > > > > > > > records to partitions whose leader is not
> > able
> > > to
> > > > > > > accept
> > > > > > > > > the
> > > > > > > > > > > next
> > > > > > > > > > > > > > > batch?
> > > > > > > > > > > > > > > > If
> > > > > > > > > > > > > > > > > we do that, perhaps we don't need
> > > > > > > > > > > > > partition.availability.timeout.ms.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 10. Currently, partitioner.class defaults
> to
> > > > > > > > > > > DefaultPartitioner,
> > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > uses
> > > > > > > > > > > > > > > > > StickyPartitioner when the key is
> specified.
> > > > Since
> > > > > > this
> > > > > > > > KIP
> > > > > > > > > > > > > improves
> > > > > > > > > > > > > > > upon
> > > > > > > > > > > > > > > > > StickyPartitioner, it would be useful to
> make
> > > the
> > > > > new
> > > > > > > > > behavior
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > default
> > > > > > > > > > > > > > > > > and document that in the KIP.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Jun
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, Feb 16, 2022 at 7:30 PM Luke Chen <
> > > > > > > > > show...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Artem,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Also, one more thing I think you need to
> > > know.
> > > > > > > > > > > > > > > > > > As this bug KAFKA-7572 <
> > > > > > > > > > > > > > > >
> > https://issues.apache.org/jira/browse/KAFKA-7572
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > mentioned, sometimes the custom
> partitioner
> > > > would
> > > > > > > > return
> > > > > > > > > > > > negative
> > > > > > > > > > > > > > > > > partition
> > > > > > > > > > > > > > > > > > id accidentally.
> > > > > > > > > > > > > > > > > > If it returned -1, how could you know if
> it
> > > is
> > > > > > > expected
> > > > > > > > > or
> > > > > > > > > > > not
> > > > > > > > > > > > > > > > expected?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks.
> > > > > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Wed, Feb 16, 2022 at 3:28 PM Luke
> Chen <
> > > > > > > > > show...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi Artem,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks for the update. I have some
> > > questions
> > > > > > about
> > > > > > > > it:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > 1. Could you explain why you need the
> > > > > > `partitioner`
> > > > > > > > > return
> > > > > > > > > > > > -1?
> > > > > > > > > > > > > In
> > > > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > > > > case we need it? And how it is used in
> > your
> > > > > KIP?
> > > > > > > > > > > > > > > > > > > 2. What does the
> > > > > "partitioner.sticky.batch.size"
> > > > > > > > mean?
> > > > > > > > > In
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > "Configuration" part, you didn't
> explain
> > > it.
> > > > > And
> > > > > > > > > default to
> > > > > > > > > > > > 0,
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > guess
> > > > > > > > > > > > > > > > > > it's
> > > > > > > > > > > > > > > > > > > the same as current behavior for
> backward
> > > > > > > > > compatibility,
> > > > > > > > > > > > right?
> > > > > > > > > > > > > > You
> > > > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > mention it.
> > > > > > > > > > > > > > > > > > > 3. I'm thinking we can have a threshold
> > to
> > > > the
> > > > > > > > > > > > > > > > > > > "partitioner.sticky.batch.size". Let's
> > say,
> > > > we
> > > > > > > > already
> > > > > > > > > > > > > accumulate
> > > > > > > > > > > > > > > > > 15.5KB
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > partition1, and sent. So when next
> batch
> > > > > created,
> > > > > > > in
> > > > > > > > > your
> > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > design,
> > > > > > > > > > > > > > > > > > > we still stick to partition1, until
> 16KB
> > > > > reached,
> > > > > > > and
> > > > > > > > > then
> > > > > > > > > > > we
> > > > > > > > > > > > > > > create
> > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > > > > batch to change to next partition, ex:
> > > > > > partition2.
> > > > > > > > But
> > > > > > > > > I
> > > > > > > > > > > > think
> > > > > > > > > > > > > if
> > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > set
> > > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > threshold to 95% (for example), we can
> > know
> > > > > > > previous
> > > > > > > > > 15.5KB
> > > > > > > > > > > > > > already
> > > > > > > > > > > > > > > > > > exceeds
> > > > > > > > > > > > > > > > > > > the threshold so that we can directly
> > > create
> > > > > new
> > > > > > > > batch
> > > > > > > > > for
> > > > > > > > > > > > next
> > > > > > > > > > > > > > > > > records.
> > > > > > > > > > > > > > > > > > > This way should be able to make it more
> > > > > > efficient.
> > > > > > > > > WDYT?
> > > > > > > > > > > > > > > > > > > 4. I think the improved queuing logic
> > > should
> > > > be
> > > > > > > good
> > > > > > > > > > > enough.
> > > > > > > > > > > > I
> > > > > > > > > > > > > > > can't
> > > > > > > > > > > > > > > > > get
> > > > > > > > > > > > > > > > > > > the benefit of having `
> > > > > > > > > partition.availability.timeout.ms`
> > > > > > > > > > > > > > config.
> > > > > > > > > > > > > > > In
> > > > > > > > > > > > > > > > > > > short, you want to make the partitioner
> > > take
> > > > > the
> > > > > > > > broker
> > > > > > > > > > > load
> > > > > > > > > > > > > into
> > > > > > > > > > > > > > > > > > > consideration. We can just improve that
> > in
> > > > the
> > > > > > > > queuing
> > > > > > > > > > > logic
> > > > > > > > > > > > > (and
> > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > already did it). Why should we add the
> > > > config?
> > > > > > > Could
> > > > > > > > > you
> > > > > > > > > > > use
> > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > examples
> > > > > > > > > > > > > > > > > > > to explain why we need it.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thank you.
> > > > > > > > > > > > > > > > > > > Luke
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Wed, Feb 16, 2022 at 8:57 AM Artem
> > > > Livshits
> > > > > > > > > > > > > > > > > > > <alivsh...@confluent.io.invalid>
> wrote:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> Hello,
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> Please add your comments about the
> KIP.
> > > If
> > > > > > there
> > > > > > > > are
> > > > > > > > > no
> > > > > > > > > > > > > > > > > considerations,
> > > > > > > > > > > > > > > > > > >> I'll put it up for vote in the next
> few
> > > > days.
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> -Artem
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> On Mon, Feb 7, 2022 at 6:01 PM Artem
> > > > Livshits
> > > > > <
> > > > > > > > > > > > > > > > alivsh...@confluent.io
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >> > Hello,
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > After trying a few prototypes, I've
> > made
> > > > > some
> > > > > > > > > changes to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > public
> > > > > > > > > > > > > > > > > > >> > interface.  Please see the updated
> > > > document
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > > > > > > > > > > > > >> > .
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > -Artem
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> > On Thu, Nov 4, 2021 at 10:37 AM
> Artem
> > > > > > Livshits <
> > > > > > > > > > > > > > > > > > alivsh...@confluent.io>
> > > > > > > > > > > > > > > > > > >> > wrote:
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >> >> Hello,
> > > > > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > > > > >> >> This is the discussion thread for
> > > > > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > > > > > > > > > > > > >> >> .
> > > > > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > > > > >> >> The proposal is a bug fix for
> > > > > > > > > > > > > > > > > > >> >>
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10888,
> > > > > > > > > but
> > > > > > > > > > > > it
> > > > > > > > > > > > > > does
> > > > > > > > > > > > > > > > > > >> include a
> > > > > > > > > > > > > > > > > > >> >> client config change, therefore we
> > > have a
> > > > > KIP
> > > > > > > to
> > > > > > > > > > > discuss.
> > > > > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > > > > >> >> -Artem
> > > > > > > > > > > > > > > > > > >> >>
> > > > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to