Hi Jun,

2. Removed the option from the KIP.  Now the sticky partitioning threshold
is hardcoded to batch.size.

20. Added the corresponding wording to the KIP.

-Artem

On Thu, Mar 3, 2022 at 10:52 AM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the reply.
>
> 1. Sounds good.
>
> 2. If we don't expect users to change it, we probably could just leave out
> the new config. In general, it's easy to add a new config, but hard to
> remove an existing config.
>
> 20. The two new configs enable.adaptive.partitioning and
> partition.availability.timeout.ms only apply to the two built-in
> partitioners DefaultPartitioner and UniformStickyPartitioner, right? It
> would be useful to document that in the KIP.
>
> Thanks,
>
> Jun
>
> On Thu, Mar 3, 2022 at 9:47 AM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Jun,
> >
> > Thank you for the suggestions.
> >
> > 1. As we discussed offline, we can hardcode the logic for
> > DefaultPartitioner and UniformStickyPartitioner in the KafkaProducer
> (i.e.
> > the DefaultPartitioner.partition won't get called, instead KafkaProducer
> > would check if the partitioner is an instance of DefaultPartitioner and
> > then run the actual partitioning logic itself).  Then the change to the
> > Partitioner wouldn't be required.  I'll update the KIP to reflect that.
> >
> > 2. I don't expect users to change this too often, as changing it would
> > require a bit of studying of the production patterns.  As a general
> > principle, if I can think of a model that requires a deviation from
> > default, I tend to add a configuration option.  It could be that it'll
> > never get used in practice, but I cannot prove that.  I'm ok with
> removing
> > the option, let me know what you think.
> >
> > -Artem
> >
> > On Mon, Feb 28, 2022 at 2:06 PM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply. A few more comments.
> > >
> > > 1. Since we control the implementation and the usage of
> > DefaultPartitioner,
> > > another way is to instantiate the DefaultPartitioner with a special
> > > constructor, which allows it to have more access to internal
> information.
> > > Then we could just change the behavior of  DefaultPartitioner such that
> > it
> > > can use the internal infoamtion when choosing the partition. This seems
> > > more intuitive than having DefaultPartitioner return -1 partition.
> > >
> > > 2. I guess partitioner.sticky.batch.size is introduced because the
> > > effective batch size could be less than batch.size and we want to align
> > > partition switching with the effective batch size. How would a user
> know
> > > the effective batch size to set partitioner.sticky.batch.size properly?
> > If
> > > the user somehow knows the effective batch size, does setting
> batch.size
> > to
> > > the effective batch size achieve the same result?
> > >
> > > 4. Thanks for the explanation. Makes sense to me.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Feb 25, 2022 at 8:26 PM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > 1. Updated the KIP to add a couple paragraphs about implementation
> > > > necessities in the Proposed Changes section.
> > > >
> > > > 2. Sorry if my reply was confusing, what I meant to say (and I
> > elaborated
> > > > on that in point #3) is that there could be patterns for which 16KB
> > > > wouldn't be the most effective setting, thus it would be good to make
> > it
> > > > configurable.
> > > >
> > > > 4. We could use broker readiness timeout.  But I'm not sure it would
> > > > correctly model the broker load.  The problem is that latency is not
> an
> > > > accurate measure of throughput, we could have 2 brokers that have
> equal
> > > > throughput but one has higher latency (so it takes larger batches
> less
> > > > frequently, but still takes the same load).  Latency-based logic is
> > > likely
> > > > to send less data to the broker with higher latency.  Using the queue
> > > size
> > > > would adapt to throughput, regardless of latency (which could be
> just a
> > > > result of deployment topology), so that's the model chosen in the
> > > > proposal.  The partition.availability.timeout.ms logic approaches
> the
> > > > model
> > > > from a slightly different angle, say we have a requirement to deliver
> > > > messages via brokers that have a certain latency, then
> > > > partition.availability.timeout.ms could be used to tune that.
> Latency
> > > is
> > > > a
> > > > much more volatile metric than throughput (latency depends on
> external
> > > > load, on capacity, on deployment topology, on jitter in network, on
> > > jitter
> > > > in disk, etc.) and I think it would be best to leave latency-based
> > > > thresholds configurable to tune for the environment.
> > > >
> > > > -Artem
> > > >
> > > > On Wed, Feb 23, 2022 at 11:14 AM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Artem,
> > > > >
> > > > > Thanks for the reply. A few more comments.
> > > > >
> > > > > 1. Perhaps you could elaborate a bit more on how the producer
> > > determines
> > > > > the partition if the partitioner returns -1. This will help
> > understand
> > > > why
> > > > > encapsulating that logic as a partitioner is not clean.
> > > > >
> > > > > 2. I am not sure that I understand this part. If 15.5KB is more
> > > > efficient,
> > > > > could we just set batch.size to 15.5KB?
> > > > >
> > > > > 4. Yes, we could add a switch (or a variant of the partitioner) for
> > > > > enabling this behavior. Also, choosing partitions based on broker
> > > > readiness
> > > > > can be made in a smoother way. For example, we could track the last
> > > time
> > > > a
> > > > > broker has drained any batches from the accumulator. We can then
> > select
> > > > > partitions from brokers proportionally to the inverse of that time.
> > > This
> > > > > seems smoother than a cutoff based on a
> > > > partition.availability.timeout.ms
> > > > >  threshold.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits
> > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > >
> > > > > > Hello Luke, Jun,
> > > > > >
> > > > > > Thank you for your feedback.  I've added the Rejected Alternative
> > > > section
> > > > > > that may clarify some of the questions w.r.t. returning -1.
> > > > > >
> > > > > > 1. I've elaborated on the -1 in the KIP.  The problem is that a
> > > > > significant
> > > > > > part of the logic needs to be in the producer (because it now
> uses
> > > > > > information about brokers that only the producer knows), so
> > > > encapsulation
> > > > > > of the logic within the default partitioner isn't as clean.
>  I've
> > > > added
> > > > > > the Rejected Alternative section that documents an attempt to
> keep
> > > the
> > > > > > encapsulation by providing new callbacks to the partitioner.
> > > > > >
> > > > > > 2. The meaning of the partitioner.sticky.batch.size is explained
> in
> > > the
> > > > > > Uniform Sticky Batch Size section.  Basically, we track the
> amount
> > of
> > > > > bytes
> > > > > > produced to the partition and if it exceeds
> > > > partitioner.sticky.batch.size
> > > > > > then we switch to the next partition.  As far as the reason to
> make
> > > it
> > > > > > different from batch.size, I think Luke answered this with the
> > > question
> > > > > #3
> > > > > > -- what if the load pattern is such that 15.5KB would be more
> > > efficient
> > > > > > than 16KB?
> > > > > >
> > > > > > 3. I think it's hard to have one size that would fit all
> patterns.
> > > > E.g.
> > > > > if
> > > > > > the load pattern is such that there is linger and the app fills
> the
> > > > batch
> > > > > > before linger expires, then having 16KB would most likely
> > synchronize
> > > > > > batching and partition switching, so each partition would get a
> > full
> > > > > > batch.  If load pattern is such that there are a few non-complete
> > > > batches
> > > > > > go out before a larger batch starts to fill, then it may actually
> > be
> > > > > > beneficial to make slightly larger (e.g. linger=0, first few
> > records
> > > go
> > > > > in
> > > > > > the first batch, then next few records go to second batch, and so
> > on,
> > > > > until
> > > > > > 5 in-flight, then larger batch would form while waiting for
> broker
> > to
> > > > > > respond, but the partition switch would happen before the larger
> > > batch
> > > > is
> > > > > > full).
> > > > > >
> > > > > > 4. There are a couple of reasons for introducing
> > > > > > partition.availability.timeout.ms.  Luke's an Jun's questions
> are
> > > > > slightly
> > > > > > different, so I'm going to separate replies.
> > > > > > (Luke) Is the queue size a good enough signal?  I think it's a
> good
> > > > > default
> > > > > > signal as it tries to preserve general fairness and not overreact
> > on
> > > > the
> > > > > > broker's state at each moment in time.  But because it's smooth,
> it
> > > may
> > > > > not
> > > > > > be reactive enough to instantaneous latency jumps.  For
> > > > latency-sensitive
> > > > > > workloads, it may be desirable to react faster when a broker
> > becomes
> > > > > > unresponsive (but that may make the distribution really choppy),
> so
> > > > > > partition.availability.timeout.ms provides an opportunity to
> tune
> > > > > > adaptiveness.
> > > > > >
> > > > > > (Jun) Can we just not assign partitions to brokers that are not
> > > ready?
> > > > > > Switching partitions purely based on current broker readiness
> > > > information
> > > > > > can really skew workload I think (or at least I couldn't build a
> > > model
> > > > > that
> > > > > > proves that over time it's going to be generally fair), I feel
> that
> > > the
> > > > > > algorithm should try to be fair in general and use smoother
> signals
> > > by
> > > > > > default (e.g. a broker with choppier latency may get much less
> load
> > > > even
> > > > > > though it can handle throughput, it then may potentially skew
> > > > > consumption),
> > > > > > note that the queue-size-based logic uses probabilities (so we
> > don't
> > > > > fully
> > > > > > remove brokers, just make it less likely) and relative info
> rather
> > > > than a
> > > > > > threshold (so if all brokers are heavily, but equally loaded,
> they
> > > will
> > > > > get
> > > > > > equal distribution, rather than get removed because they exceed
> > some
> > > > > > threshold).  So at the very least, I would like this logic to be
> > > turned
> > > > > off
> > > > > > by default as it's hard to predict what it could do with
> different
> > > > > patterns
> > > > > > (which means that there would need to be some configuration).  We
> > > could
> > > > > > just not use brokers that are not ready, but again, I think that
> > it's
> > > > > good
> > > > > > to try to be fair under normal circumstances, so if normally
> > brokers
> > > > can
> > > > > > respond under some partition.availability.timeout.ms threshold
> and
> > > the
> > > > > > application works well with those latencies, then we could
> > distribute
> > > > > data
> > > > > > equally between brokers that don't exceed the latencies.  The
> > value,
> > > of
> > > > > > course, would depend on the environment and app requirements,
> hence
> > > > it's
> > > > > > configurable.
> > > > > >
> > > > > > 10. Added a statement at the beginning of the proposed changes.
> > > > > >
> > > > > > -Artem
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 17, 2022 at 3:46 PM Jun Rao <j...@confluent.io.invalid
> >
> > > > > wrote:
> > > > > >
> > > > > > > Hi, Artem,
> > > > > > >
> > > > > > > Thanks for the KIP. A few comments below.
> > > > > > >
> > > > > > > 1. I agree with Luke that having the partitioner returning -1
> is
> > > kind
> > > > > of
> > > > > > > weird. Could we just change the implementation of
> > > DefaultPartitioner
> > > > to
> > > > > > the
> > > > > > > new behavior?
> > > > > > >
> > > > > > > 2. partitioner.sticky.batch.size: Similar question to Luke. I
> am
> > > not
> > > > > sure
> > > > > > > why we want to introduce this new configuration. Could we just
> > use
> > > > the
> > > > > > > existing batch.size?
> > > > > > >
> > > > > > > 4. I also agree with Luke that it's not clear why we need
> > > > > > > partition.availability.timeout.ms. The KIP says the broker
> > "would
> > > > not
> > > > > be
> > > > > > > chosen until the broker is able to accept the next ready batch
> > from
> > > > the
> > > > > > > partition". If we are keeping track of this, could we just
> avoid
> > > > > > assigning
> > > > > > > records to partitions whose leader is not able to accept the
> next
> > > > > batch?
> > > > > > If
> > > > > > > we do that, perhaps we don't need
> > > partition.availability.timeout.ms.
> > > > > > >
> > > > > > > 10. Currently, partitioner.class defaults to
> DefaultPartitioner,
> > > > which
> > > > > > uses
> > > > > > > StickyPartitioner when the key is specified. Since this KIP
> > > improves
> > > > > upon
> > > > > > > StickyPartitioner, it would be useful to make the new behavior
> > the
> > > > > > default
> > > > > > > and document that in the KIP.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Feb 16, 2022 at 7:30 PM Luke Chen <show...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > Hi Artem,
> > > > > > > >
> > > > > > > > Also, one more thing I think you need to know.
> > > > > > > > As this bug KAFKA-7572 <
> > > > > > https://issues.apache.org/jira/browse/KAFKA-7572
> > > > > > > >
> > > > > > > > mentioned, sometimes the custom partitioner would return
> > negative
> > > > > > > partition
> > > > > > > > id accidentally.
> > > > > > > > If it returned -1, how could you know if it is expected or
> not
> > > > > > expected?
> > > > > > > >
> > > > > > > > Thanks.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > > On Wed, Feb 16, 2022 at 3:28 PM Luke Chen <show...@gmail.com
> >
> > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Artem,
> > > > > > > > >
> > > > > > > > > Thanks for the update. I have some questions about it:
> > > > > > > > >
> > > > > > > > > 1. Could you explain why you need the `partitioner` return
> > -1?
> > > In
> > > > > > which
> > > > > > > > > case we need it? And how it is used in your KIP?
> > > > > > > > > 2. What does the "partitioner.sticky.batch.size" mean? In
> the
> > > > > > > > > "Configuration" part, you didn't explain it. And default to
> > 0,
> > > I
> > > > > > guess
> > > > > > > > it's
> > > > > > > > > the same as current behavior for backward compatibility,
> > right?
> > > > You
> > > > > > > > should
> > > > > > > > > mention it.
> > > > > > > > > 3. I'm thinking we can have a threshold to the
> > > > > > > > > "partitioner.sticky.batch.size". Let's say, we already
> > > accumulate
> > > > > > > 15.5KB
> > > > > > > > in
> > > > > > > > > partition1, and sent. So when next batch created, in your
> > > current
> > > > > > > design,
> > > > > > > > > we still stick to partition1, until 16KB reached, and then
> we
> > > > > create
> > > > > > a
> > > > > > > > new
> > > > > > > > > batch to change to next partition, ex: partition2. But I
> > think
> > > if
> > > > > we
> > > > > > > set
> > > > > > > > a
> > > > > > > > > threshold to 95% (for example), we can know previous 15.5KB
> > > > already
> > > > > > > > exceeds
> > > > > > > > > the threshold so that we can directly create new batch for
> > next
> > > > > > > records.
> > > > > > > > > This way should be able to make it more efficient. WDYT?
> > > > > > > > > 4. I think the improved queuing logic should be good
> enough.
> > I
> > > > > can't
> > > > > > > get
> > > > > > > > > the benefit of having `partition.availability.timeout.ms`
> > > > config.
> > > > > In
> > > > > > > > > short, you want to make the partitioner take the broker
> load
> > > into
> > > > > > > > > consideration. We can just improve that in the queuing
> logic
> > > (and
> > > > > you
> > > > > > > > > already did it). Why should we add the config? Could you
> use
> > > some
> > > > > > > > examples
> > > > > > > > > to explain why we need it.
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > > Luke
> > > > > > > > >
> > > > > > > > > On Wed, Feb 16, 2022 at 8:57 AM Artem Livshits
> > > > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > > > >
> > > > > > > > >> Hello,
> > > > > > > > >>
> > > > > > > > >> Please add your comments about the KIP.  If there are no
> > > > > > > considerations,
> > > > > > > > >> I'll put it up for vote in the next few days.
> > > > > > > > >>
> > > > > > > > >> -Artem
> > > > > > > > >>
> > > > > > > > >> On Mon, Feb 7, 2022 at 6:01 PM Artem Livshits <
> > > > > > alivsh...@confluent.io
> > > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> > Hello,
> > > > > > > > >> >
> > > > > > > > >> > After trying a few prototypes, I've made some changes to
> > the
> > > > > > public
> > > > > > > > >> > interface.  Please see the updated document
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > > >> > .
> > > > > > > > >> >
> > > > > > > > >> > -Artem
> > > > > > > > >> >
> > > > > > > > >> > On Thu, Nov 4, 2021 at 10:37 AM Artem Livshits <
> > > > > > > > alivsh...@confluent.io>
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> >> Hello,
> > > > > > > > >> >>
> > > > > > > > >> >> This is the discussion thread for
> > > > > > > > >> >>
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > > > >> >> .
> > > > > > > > >> >>
> > > > > > > > >> >> The proposal is a bug fix for
> > > > > > > > >> >> https://issues.apache.org/jira/browse/KAFKA-10888, but
> > it
> > > > does
> > > > > > > > >> include a
> > > > > > > > >> >> client config change, therefore we have a KIP to
> discuss.
> > > > > > > > >> >>
> > > > > > > > >> >> -Artem
> > > > > > > > >> >>
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to