Hi Jun,

1. Updated the KIP to add a couple paragraphs about implementation
necessities in the Proposed Changes section.

2. Sorry if my reply was confusing, what I meant to say (and I elaborated
on that in point #3) is that there could be patterns for which 16KB
wouldn't be the most effective setting, thus it would be good to make it
configurable.

4. We could use broker readiness timeout.  But I'm not sure it would
correctly model the broker load.  The problem is that latency is not an
accurate measure of throughput, we could have 2 brokers that have equal
throughput but one has higher latency (so it takes larger batches less
frequently, but still takes the same load).  Latency-based logic is likely
to send less data to the broker with higher latency.  Using the queue size
would adapt to throughput, regardless of latency (which could be just a
result of deployment topology), so that's the model chosen in the
proposal.  The partition.availability.timeout.ms logic approaches the model
from a slightly different angle, say we have a requirement to deliver
messages via brokers that have a certain latency, then
partition.availability.timeout.ms could be used to tune that.  Latency is a
much more volatile metric than throughput (latency depends on external
load, on capacity, on deployment topology, on jitter in network, on jitter
in disk, etc.) and I think it would be best to leave latency-based
thresholds configurable to tune for the environment.

-Artem

On Wed, Feb 23, 2022 at 11:14 AM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the reply. A few more comments.
>
> 1. Perhaps you could elaborate a bit more on how the producer determines
> the partition if the partitioner returns -1. This will help understand why
> encapsulating that logic as a partitioner is not clean.
>
> 2. I am not sure that I understand this part. If 15.5KB is more efficient,
> could we just set batch.size to 15.5KB?
>
> 4. Yes, we could add a switch (or a variant of the partitioner) for
> enabling this behavior. Also, choosing partitions based on broker readiness
> can be made in a smoother way. For example, we could track the last time a
> broker has drained any batches from the accumulator. We can then select
> partitions from brokers proportionally to the inverse of that time. This
> seems smoother than a cutoff based on a partition.availability.timeout.ms
>  threshold.
>
> Thanks,
>
> Jun
>
> On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hello Luke, Jun,
> >
> > Thank you for your feedback.  I've added the Rejected Alternative section
> > that may clarify some of the questions w.r.t. returning -1.
> >
> > 1. I've elaborated on the -1 in the KIP.  The problem is that a
> significant
> > part of the logic needs to be in the producer (because it now uses
> > information about brokers that only the producer knows), so encapsulation
> > of the logic within the default partitioner isn't as clean.   I've added
> > the Rejected Alternative section that documents an attempt to keep the
> > encapsulation by providing new callbacks to the partitioner.
> >
> > 2. The meaning of the partitioner.sticky.batch.size is explained in the
> > Uniform Sticky Batch Size section.  Basically, we track the amount of
> bytes
> > produced to the partition and if it exceeds partitioner.sticky.batch.size
> > then we switch to the next partition.  As far as the reason to make it
> > different from batch.size, I think Luke answered this with the question
> #3
> > -- what if the load pattern is such that 15.5KB would be more efficient
> > than 16KB?
> >
> > 3. I think it's hard to have one size that would fit all patterns.  E.g.
> if
> > the load pattern is such that there is linger and the app fills the batch
> > before linger expires, then having 16KB would most likely synchronize
> > batching and partition switching, so each partition would get a full
> > batch.  If load pattern is such that there are a few non-complete batches
> > go out before a larger batch starts to fill, then it may actually be
> > beneficial to make slightly larger (e.g. linger=0, first few records go
> in
> > the first batch, then next few records go to second batch, and so on,
> until
> > 5 in-flight, then larger batch would form while waiting for broker to
> > respond, but the partition switch would happen before the larger batch is
> > full).
> >
> > 4. There are a couple of reasons for introducing
> > partition.availability.timeout.ms.  Luke's an Jun's questions are
> slightly
> > different, so I'm going to separate replies.
> > (Luke) Is the queue size a good enough signal?  I think it's a good
> default
> > signal as it tries to preserve general fairness and not overreact on the
> > broker's state at each moment in time.  But because it's smooth, it may
> not
> > be reactive enough to instantaneous latency jumps.  For latency-sensitive
> > workloads, it may be desirable to react faster when a broker becomes
> > unresponsive (but that may make the distribution really choppy), so
> > partition.availability.timeout.ms provides an opportunity to tune
> > adaptiveness.
> >
> > (Jun) Can we just not assign partitions to brokers that are not ready?
> > Switching partitions purely based on current broker readiness information
> > can really skew workload I think (or at least I couldn't build a model
> that
> > proves that over time it's going to be generally fair), I feel that the
> > algorithm should try to be fair in general and use smoother signals by
> > default (e.g. a broker with choppier latency may get much less load even
> > though it can handle throughput, it then may potentially skew
> consumption),
> > note that the queue-size-based logic uses probabilities (so we don't
> fully
> > remove brokers, just make it less likely) and relative info rather than a
> > threshold (so if all brokers are heavily, but equally loaded, they will
> get
> > equal distribution, rather than get removed because they exceed some
> > threshold).  So at the very least, I would like this logic to be turned
> off
> > by default as it's hard to predict what it could do with different
> patterns
> > (which means that there would need to be some configuration).  We could
> > just not use brokers that are not ready, but again, I think that it's
> good
> > to try to be fair under normal circumstances, so if normally brokers can
> > respond under some partition.availability.timeout.ms threshold and the
> > application works well with those latencies, then we could distribute
> data
> > equally between brokers that don't exceed the latencies.  The value, of
> > course, would depend on the environment and app requirements, hence it's
> > configurable.
> >
> > 10. Added a statement at the beginning of the proposed changes.
> >
> > -Artem
> >
> >
> > On Thu, Feb 17, 2022 at 3:46 PM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the KIP. A few comments below.
> > >
> > > 1. I agree with Luke that having the partitioner returning -1 is kind
> of
> > > weird. Could we just change the implementation of DefaultPartitioner to
> > the
> > > new behavior?
> > >
> > > 2. partitioner.sticky.batch.size: Similar question to Luke. I am not
> sure
> > > why we want to introduce this new configuration. Could we just use the
> > > existing batch.size?
> > >
> > > 4. I also agree with Luke that it's not clear why we need
> > > partition.availability.timeout.ms. The KIP says the broker "would not
> be
> > > chosen until the broker is able to accept the next ready batch from the
> > > partition". If we are keeping track of this, could we just avoid
> > assigning
> > > records to partitions whose leader is not able to accept the next
> batch?
> > If
> > > we do that, perhaps we don't need partition.availability.timeout.ms.
> > >
> > > 10. Currently, partitioner.class defaults to DefaultPartitioner, which
> > uses
> > > StickyPartitioner when the key is specified. Since this KIP improves
> upon
> > > StickyPartitioner, it would be useful to make the new behavior the
> > default
> > > and document that in the KIP.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Feb 16, 2022 at 7:30 PM Luke Chen <show...@gmail.com> wrote:
> > >
> > > > Hi Artem,
> > > >
> > > > Also, one more thing I think you need to know.
> > > > As this bug KAFKA-7572 <
> > https://issues.apache.org/jira/browse/KAFKA-7572
> > > >
> > > > mentioned, sometimes the custom partitioner would return negative
> > > partition
> > > > id accidentally.
> > > > If it returned -1, how could you know if it is expected or not
> > expected?
> > > >
> > > > Thanks.
> > > > Luke
> > > >
> > > > On Wed, Feb 16, 2022 at 3:28 PM Luke Chen <show...@gmail.com> wrote:
> > > >
> > > > > Hi Artem,
> > > > >
> > > > > Thanks for the update. I have some questions about it:
> > > > >
> > > > > 1. Could you explain why you need the `partitioner` return -1? In
> > which
> > > > > case we need it? And how it is used in your KIP?
> > > > > 2. What does the "partitioner.sticky.batch.size" mean? In the
> > > > > "Configuration" part, you didn't explain it. And default to 0, I
> > guess
> > > > it's
> > > > > the same as current behavior for backward compatibility, right? You
> > > > should
> > > > > mention it.
> > > > > 3. I'm thinking we can have a threshold to the
> > > > > "partitioner.sticky.batch.size". Let's say, we already accumulate
> > > 15.5KB
> > > > in
> > > > > partition1, and sent. So when next batch created, in your current
> > > design,
> > > > > we still stick to partition1, until 16KB reached, and then we
> create
> > a
> > > > new
> > > > > batch to change to next partition, ex: partition2. But I think if
> we
> > > set
> > > > a
> > > > > threshold to 95% (for example), we can know previous 15.5KB already
> > > > exceeds
> > > > > the threshold so that we can directly create new batch for next
> > > records.
> > > > > This way should be able to make it more efficient. WDYT?
> > > > > 4. I think the improved queuing logic should be good enough. I
> can't
> > > get
> > > > > the benefit of having `partition.availability.timeout.ms` config.
> In
> > > > > short, you want to make the partitioner take the broker load into
> > > > > consideration. We can just improve that in the queuing logic (and
> you
> > > > > already did it). Why should we add the config? Could you use some
> > > > examples
> > > > > to explain why we need it.
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > > On Wed, Feb 16, 2022 at 8:57 AM Artem Livshits
> > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > >
> > > > >> Hello,
> > > > >>
> > > > >> Please add your comments about the KIP.  If there are no
> > > considerations,
> > > > >> I'll put it up for vote in the next few days.
> > > > >>
> > > > >> -Artem
> > > > >>
> > > > >> On Mon, Feb 7, 2022 at 6:01 PM Artem Livshits <
> > alivsh...@confluent.io
> > > >
> > > > >> wrote:
> > > > >>
> > > > >> > Hello,
> > > > >> >
> > > > >> > After trying a few prototypes, I've made some changes to the
> > public
> > > > >> > interface.  Please see the updated document
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > >> > .
> > > > >> >
> > > > >> > -Artem
> > > > >> >
> > > > >> > On Thu, Nov 4, 2021 at 10:37 AM Artem Livshits <
> > > > alivsh...@confluent.io>
> > > > >> > wrote:
> > > > >> >
> > > > >> >> Hello,
> > > > >> >>
> > > > >> >> This is the discussion thread for
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > >> >> .
> > > > >> >>
> > > > >> >> The proposal is a bug fix for
> > > > >> >> https://issues.apache.org/jira/browse/KAFKA-10888, but it does
> > > > >> include a
> > > > >> >> client config change, therefore we have a KIP to discuss.
> > > > >> >>
> > > > >> >> -Artem
> > > > >> >>
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to