Hi, Artem,

Thanks for the reply. A few more comments.

1. Perhaps you could elaborate a bit more on how the producer determines
the partition if the partitioner returns -1. This will help understand why
encapsulating that logic as a partitioner is not clean.

2. I am not sure that I understand this part. If 15.5KB is more efficient,
could we just set batch.size to 15.5KB?

4. Yes, we could add a switch (or a variant of the partitioner) for
enabling this behavior. Also, choosing partitions based on broker readiness
can be made in a smoother way. For example, we could track the last time a
broker has drained any batches from the accumulator. We can then select
partitions from brokers proportionally to the inverse of that time. This
seems smoother than a cutoff based on a partition.availability.timeout.ms
 threshold.

Thanks,

Jun

On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits
<alivsh...@confluent.io.invalid> wrote:

> Hello Luke, Jun,
>
> Thank you for your feedback.  I've added the Rejected Alternative section
> that may clarify some of the questions w.r.t. returning -1.
>
> 1. I've elaborated on the -1 in the KIP.  The problem is that a significant
> part of the logic needs to be in the producer (because it now uses
> information about brokers that only the producer knows), so encapsulation
> of the logic within the default partitioner isn't as clean.   I've added
> the Rejected Alternative section that documents an attempt to keep the
> encapsulation by providing new callbacks to the partitioner.
>
> 2. The meaning of the partitioner.sticky.batch.size is explained in the
> Uniform Sticky Batch Size section.  Basically, we track the amount of bytes
> produced to the partition and if it exceeds partitioner.sticky.batch.size
> then we switch to the next partition.  As far as the reason to make it
> different from batch.size, I think Luke answered this with the question #3
> -- what if the load pattern is such that 15.5KB would be more efficient
> than 16KB?
>
> 3. I think it's hard to have one size that would fit all patterns.  E.g. if
> the load pattern is such that there is linger and the app fills the batch
> before linger expires, then having 16KB would most likely synchronize
> batching and partition switching, so each partition would get a full
> batch.  If load pattern is such that there are a few non-complete batches
> go out before a larger batch starts to fill, then it may actually be
> beneficial to make slightly larger (e.g. linger=0, first few records go in
> the first batch, then next few records go to second batch, and so on, until
> 5 in-flight, then larger batch would form while waiting for broker to
> respond, but the partition switch would happen before the larger batch is
> full).
>
> 4. There are a couple of reasons for introducing
> partition.availability.timeout.ms.  Luke's an Jun's questions are slightly
> different, so I'm going to separate replies.
> (Luke) Is the queue size a good enough signal?  I think it's a good default
> signal as it tries to preserve general fairness and not overreact on the
> broker's state at each moment in time.  But because it's smooth, it may not
> be reactive enough to instantaneous latency jumps.  For latency-sensitive
> workloads, it may be desirable to react faster when a broker becomes
> unresponsive (but that may make the distribution really choppy), so
> partition.availability.timeout.ms provides an opportunity to tune
> adaptiveness.
>
> (Jun) Can we just not assign partitions to brokers that are not ready?
> Switching partitions purely based on current broker readiness information
> can really skew workload I think (or at least I couldn't build a model that
> proves that over time it's going to be generally fair), I feel that the
> algorithm should try to be fair in general and use smoother signals by
> default (e.g. a broker with choppier latency may get much less load even
> though it can handle throughput, it then may potentially skew consumption),
> note that the queue-size-based logic uses probabilities (so we don't fully
> remove brokers, just make it less likely) and relative info rather than a
> threshold (so if all brokers are heavily, but equally loaded, they will get
> equal distribution, rather than get removed because they exceed some
> threshold).  So at the very least, I would like this logic to be turned off
> by default as it's hard to predict what it could do with different patterns
> (which means that there would need to be some configuration).  We could
> just not use brokers that are not ready, but again, I think that it's good
> to try to be fair under normal circumstances, so if normally brokers can
> respond under some partition.availability.timeout.ms threshold and the
> application works well with those latencies, then we could distribute data
> equally between brokers that don't exceed the latencies.  The value, of
> course, would depend on the environment and app requirements, hence it's
> configurable.
>
> 10. Added a statement at the beginning of the proposed changes.
>
> -Artem
>
>
> On Thu, Feb 17, 2022 at 3:46 PM Jun Rao <j...@confluent.io.invalid> wrote:
>
> > Hi, Artem,
> >
> > Thanks for the KIP. A few comments below.
> >
> > 1. I agree with Luke that having the partitioner returning -1 is kind of
> > weird. Could we just change the implementation of DefaultPartitioner to
> the
> > new behavior?
> >
> > 2. partitioner.sticky.batch.size: Similar question to Luke. I am not sure
> > why we want to introduce this new configuration. Could we just use the
> > existing batch.size?
> >
> > 4. I also agree with Luke that it's not clear why we need
> > partition.availability.timeout.ms. The KIP says the broker "would not be
> > chosen until the broker is able to accept the next ready batch from the
> > partition". If we are keeping track of this, could we just avoid
> assigning
> > records to partitions whose leader is not able to accept the next batch?
> If
> > we do that, perhaps we don't need partition.availability.timeout.ms.
> >
> > 10. Currently, partitioner.class defaults to DefaultPartitioner, which
> uses
> > StickyPartitioner when the key is specified. Since this KIP improves upon
> > StickyPartitioner, it would be useful to make the new behavior the
> default
> > and document that in the KIP.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Feb 16, 2022 at 7:30 PM Luke Chen <show...@gmail.com> wrote:
> >
> > > Hi Artem,
> > >
> > > Also, one more thing I think you need to know.
> > > As this bug KAFKA-7572 <
> https://issues.apache.org/jira/browse/KAFKA-7572
> > >
> > > mentioned, sometimes the custom partitioner would return negative
> > partition
> > > id accidentally.
> > > If it returned -1, how could you know if it is expected or not
> expected?
> > >
> > > Thanks.
> > > Luke
> > >
> > > On Wed, Feb 16, 2022 at 3:28 PM Luke Chen <show...@gmail.com> wrote:
> > >
> > > > Hi Artem,
> > > >
> > > > Thanks for the update. I have some questions about it:
> > > >
> > > > 1. Could you explain why you need the `partitioner` return -1? In
> which
> > > > case we need it? And how it is used in your KIP?
> > > > 2. What does the "partitioner.sticky.batch.size" mean? In the
> > > > "Configuration" part, you didn't explain it. And default to 0, I
> guess
> > > it's
> > > > the same as current behavior for backward compatibility, right? You
> > > should
> > > > mention it.
> > > > 3. I'm thinking we can have a threshold to the
> > > > "partitioner.sticky.batch.size". Let's say, we already accumulate
> > 15.5KB
> > > in
> > > > partition1, and sent. So when next batch created, in your current
> > design,
> > > > we still stick to partition1, until 16KB reached, and then we create
> a
> > > new
> > > > batch to change to next partition, ex: partition2. But I think if we
> > set
> > > a
> > > > threshold to 95% (for example), we can know previous 15.5KB already
> > > exceeds
> > > > the threshold so that we can directly create new batch for next
> > records.
> > > > This way should be able to make it more efficient. WDYT?
> > > > 4. I think the improved queuing logic should be good enough. I can't
> > get
> > > > the benefit of having `partition.availability.timeout.ms` config. In
> > > > short, you want to make the partitioner take the broker load into
> > > > consideration. We can just improve that in the queuing logic (and you
> > > > already did it). Why should we add the config? Could you use some
> > > examples
> > > > to explain why we need it.
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Wed, Feb 16, 2022 at 8:57 AM Artem Livshits
> > > > <alivsh...@confluent.io.invalid> wrote:
> > > >
> > > >> Hello,
> > > >>
> > > >> Please add your comments about the KIP.  If there are no
> > considerations,
> > > >> I'll put it up for vote in the next few days.
> > > >>
> > > >> -Artem
> > > >>
> > > >> On Mon, Feb 7, 2022 at 6:01 PM Artem Livshits <
> alivsh...@confluent.io
> > >
> > > >> wrote:
> > > >>
> > > >> > Hello,
> > > >> >
> > > >> > After trying a few prototypes, I've made some changes to the
> public
> > > >> > interface.  Please see the updated document
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > >> > .
> > > >> >
> > > >> > -Artem
> > > >> >
> > > >> > On Thu, Nov 4, 2021 at 10:37 AM Artem Livshits <
> > > alivsh...@confluent.io>
> > > >> > wrote:
> > > >> >
> > > >> >> Hello,
> > > >> >>
> > > >> >> This is the discussion thread for
> > > >> >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > >> >> .
> > > >> >>
> > > >> >> The proposal is a bug fix for
> > > >> >> https://issues.apache.org/jira/browse/KAFKA-10888, but it does
> > > >> include a
> > > >> >> client config change, therefore we have a KIP to discuss.
> > > >> >>
> > > >> >> -Artem
> > > >> >>
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to