Hi Jun,

Thank you for the suggestions.

1. As we discussed offline, we can hardcode the logic for
DefaultPartitioner and UniformStickyPartitioner in the KafkaProducer (i.e.
the DefaultPartitioner.partition won't get called, instead KafkaProducer
would check if the partitioner is an instance of DefaultPartitioner and
then run the actual partitioning logic itself).  Then the change to the
Partitioner wouldn't be required.  I'll update the KIP to reflect that.

2. I don't expect users to change this too often, as changing it would
require a bit of studying of the production patterns.  As a general
principle, if I can think of a model that requires a deviation from
default, I tend to add a configuration option.  It could be that it'll
never get used in practice, but I cannot prove that.  I'm ok with removing
the option, let me know what you think.

-Artem

On Mon, Feb 28, 2022 at 2:06 PM Jun Rao <j...@confluent.io.invalid> wrote:

> Hi, Artem,
>
> Thanks for the reply. A few more comments.
>
> 1. Since we control the implementation and the usage of DefaultPartitioner,
> another way is to instantiate the DefaultPartitioner with a special
> constructor, which allows it to have more access to internal information.
> Then we could just change the behavior of  DefaultPartitioner such that it
> can use the internal infoamtion when choosing the partition. This seems
> more intuitive than having DefaultPartitioner return -1 partition.
>
> 2. I guess partitioner.sticky.batch.size is introduced because the
> effective batch size could be less than batch.size and we want to align
> partition switching with the effective batch size. How would a user know
> the effective batch size to set partitioner.sticky.batch.size properly? If
> the user somehow knows the effective batch size, does setting batch.size to
> the effective batch size achieve the same result?
>
> 4. Thanks for the explanation. Makes sense to me.
>
> Thanks,
>
> Jun
>
> Thanks,
>
> Jun
>
> On Fri, Feb 25, 2022 at 8:26 PM Artem Livshits
> <alivsh...@confluent.io.invalid> wrote:
>
> > Hi Jun,
> >
> > 1. Updated the KIP to add a couple paragraphs about implementation
> > necessities in the Proposed Changes section.
> >
> > 2. Sorry if my reply was confusing, what I meant to say (and I elaborated
> > on that in point #3) is that there could be patterns for which 16KB
> > wouldn't be the most effective setting, thus it would be good to make it
> > configurable.
> >
> > 4. We could use broker readiness timeout.  But I'm not sure it would
> > correctly model the broker load.  The problem is that latency is not an
> > accurate measure of throughput, we could have 2 brokers that have equal
> > throughput but one has higher latency (so it takes larger batches less
> > frequently, but still takes the same load).  Latency-based logic is
> likely
> > to send less data to the broker with higher latency.  Using the queue
> size
> > would adapt to throughput, regardless of latency (which could be just a
> > result of deployment topology), so that's the model chosen in the
> > proposal.  The partition.availability.timeout.ms logic approaches the
> > model
> > from a slightly different angle, say we have a requirement to deliver
> > messages via brokers that have a certain latency, then
> > partition.availability.timeout.ms could be used to tune that.  Latency
> is
> > a
> > much more volatile metric than throughput (latency depends on external
> > load, on capacity, on deployment topology, on jitter in network, on
> jitter
> > in disk, etc.) and I think it would be best to leave latency-based
> > thresholds configurable to tune for the environment.
> >
> > -Artem
> >
> > On Wed, Feb 23, 2022 at 11:14 AM Jun Rao <j...@confluent.io.invalid>
> wrote:
> >
> > > Hi, Artem,
> > >
> > > Thanks for the reply. A few more comments.
> > >
> > > 1. Perhaps you could elaborate a bit more on how the producer
> determines
> > > the partition if the partitioner returns -1. This will help understand
> > why
> > > encapsulating that logic as a partitioner is not clean.
> > >
> > > 2. I am not sure that I understand this part. If 15.5KB is more
> > efficient,
> > > could we just set batch.size to 15.5KB?
> > >
> > > 4. Yes, we could add a switch (or a variant of the partitioner) for
> > > enabling this behavior. Also, choosing partitions based on broker
> > readiness
> > > can be made in a smoother way. For example, we could track the last
> time
> > a
> > > broker has drained any batches from the accumulator. We can then select
> > > partitions from brokers proportionally to the inverse of that time.
> This
> > > seems smoother than a cutoff based on a
> > partition.availability.timeout.ms
> > >  threshold.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits
> > > <alivsh...@confluent.io.invalid> wrote:
> > >
> > > > Hello Luke, Jun,
> > > >
> > > > Thank you for your feedback.  I've added the Rejected Alternative
> > section
> > > > that may clarify some of the questions w.r.t. returning -1.
> > > >
> > > > 1. I've elaborated on the -1 in the KIP.  The problem is that a
> > > significant
> > > > part of the logic needs to be in the producer (because it now uses
> > > > information about brokers that only the producer knows), so
> > encapsulation
> > > > of the logic within the default partitioner isn't as clean.   I've
> > added
> > > > the Rejected Alternative section that documents an attempt to keep
> the
> > > > encapsulation by providing new callbacks to the partitioner.
> > > >
> > > > 2. The meaning of the partitioner.sticky.batch.size is explained in
> the
> > > > Uniform Sticky Batch Size section.  Basically, we track the amount of
> > > bytes
> > > > produced to the partition and if it exceeds
> > partitioner.sticky.batch.size
> > > > then we switch to the next partition.  As far as the reason to make
> it
> > > > different from batch.size, I think Luke answered this with the
> question
> > > #3
> > > > -- what if the load pattern is such that 15.5KB would be more
> efficient
> > > > than 16KB?
> > > >
> > > > 3. I think it's hard to have one size that would fit all patterns.
> > E.g.
> > > if
> > > > the load pattern is such that there is linger and the app fills the
> > batch
> > > > before linger expires, then having 16KB would most likely synchronize
> > > > batching and partition switching, so each partition would get a full
> > > > batch.  If load pattern is such that there are a few non-complete
> > batches
> > > > go out before a larger batch starts to fill, then it may actually be
> > > > beneficial to make slightly larger (e.g. linger=0, first few records
> go
> > > in
> > > > the first batch, then next few records go to second batch, and so on,
> > > until
> > > > 5 in-flight, then larger batch would form while waiting for broker to
> > > > respond, but the partition switch would happen before the larger
> batch
> > is
> > > > full).
> > > >
> > > > 4. There are a couple of reasons for introducing
> > > > partition.availability.timeout.ms.  Luke's an Jun's questions are
> > > slightly
> > > > different, so I'm going to separate replies.
> > > > (Luke) Is the queue size a good enough signal?  I think it's a good
> > > default
> > > > signal as it tries to preserve general fairness and not overreact on
> > the
> > > > broker's state at each moment in time.  But because it's smooth, it
> may
> > > not
> > > > be reactive enough to instantaneous latency jumps.  For
> > latency-sensitive
> > > > workloads, it may be desirable to react faster when a broker becomes
> > > > unresponsive (but that may make the distribution really choppy), so
> > > > partition.availability.timeout.ms provides an opportunity to tune
> > > > adaptiveness.
> > > >
> > > > (Jun) Can we just not assign partitions to brokers that are not
> ready?
> > > > Switching partitions purely based on current broker readiness
> > information
> > > > can really skew workload I think (or at least I couldn't build a
> model
> > > that
> > > > proves that over time it's going to be generally fair), I feel that
> the
> > > > algorithm should try to be fair in general and use smoother signals
> by
> > > > default (e.g. a broker with choppier latency may get much less load
> > even
> > > > though it can handle throughput, it then may potentially skew
> > > consumption),
> > > > note that the queue-size-based logic uses probabilities (so we don't
> > > fully
> > > > remove brokers, just make it less likely) and relative info rather
> > than a
> > > > threshold (so if all brokers are heavily, but equally loaded, they
> will
> > > get
> > > > equal distribution, rather than get removed because they exceed some
> > > > threshold).  So at the very least, I would like this logic to be
> turned
> > > off
> > > > by default as it's hard to predict what it could do with different
> > > patterns
> > > > (which means that there would need to be some configuration).  We
> could
> > > > just not use brokers that are not ready, but again, I think that it's
> > > good
> > > > to try to be fair under normal circumstances, so if normally brokers
> > can
> > > > respond under some partition.availability.timeout.ms threshold and
> the
> > > > application works well with those latencies, then we could distribute
> > > data
> > > > equally between brokers that don't exceed the latencies.  The value,
> of
> > > > course, would depend on the environment and app requirements, hence
> > it's
> > > > configurable.
> > > >
> > > > 10. Added a statement at the beginning of the proposed changes.
> > > >
> > > > -Artem
> > > >
> > > >
> > > > On Thu, Feb 17, 2022 at 3:46 PM Jun Rao <j...@confluent.io.invalid>
> > > wrote:
> > > >
> > > > > Hi, Artem,
> > > > >
> > > > > Thanks for the KIP. A few comments below.
> > > > >
> > > > > 1. I agree with Luke that having the partitioner returning -1 is
> kind
> > > of
> > > > > weird. Could we just change the implementation of
> DefaultPartitioner
> > to
> > > > the
> > > > > new behavior?
> > > > >
> > > > > 2. partitioner.sticky.batch.size: Similar question to Luke. I am
> not
> > > sure
> > > > > why we want to introduce this new configuration. Could we just use
> > the
> > > > > existing batch.size?
> > > > >
> > > > > 4. I also agree with Luke that it's not clear why we need
> > > > > partition.availability.timeout.ms. The KIP says the broker "would
> > not
> > > be
> > > > > chosen until the broker is able to accept the next ready batch from
> > the
> > > > > partition". If we are keeping track of this, could we just avoid
> > > > assigning
> > > > > records to partitions whose leader is not able to accept the next
> > > batch?
> > > > If
> > > > > we do that, perhaps we don't need
> partition.availability.timeout.ms.
> > > > >
> > > > > 10. Currently, partitioner.class defaults to DefaultPartitioner,
> > which
> > > > uses
> > > > > StickyPartitioner when the key is specified. Since this KIP
> improves
> > > upon
> > > > > StickyPartitioner, it would be useful to make the new behavior the
> > > > default
> > > > > and document that in the KIP.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Wed, Feb 16, 2022 at 7:30 PM Luke Chen <show...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi Artem,
> > > > > >
> > > > > > Also, one more thing I think you need to know.
> > > > > > As this bug KAFKA-7572 <
> > > > https://issues.apache.org/jira/browse/KAFKA-7572
> > > > > >
> > > > > > mentioned, sometimes the custom partitioner would return negative
> > > > > partition
> > > > > > id accidentally.
> > > > > > If it returned -1, how could you know if it is expected or not
> > > > expected?
> > > > > >
> > > > > > Thanks.
> > > > > > Luke
> > > > > >
> > > > > > On Wed, Feb 16, 2022 at 3:28 PM Luke Chen <show...@gmail.com>
> > wrote:
> > > > > >
> > > > > > > Hi Artem,
> > > > > > >
> > > > > > > Thanks for the update. I have some questions about it:
> > > > > > >
> > > > > > > 1. Could you explain why you need the `partitioner` return -1?
> In
> > > > which
> > > > > > > case we need it? And how it is used in your KIP?
> > > > > > > 2. What does the "partitioner.sticky.batch.size" mean? In the
> > > > > > > "Configuration" part, you didn't explain it. And default to 0,
> I
> > > > guess
> > > > > > it's
> > > > > > > the same as current behavior for backward compatibility, right?
> > You
> > > > > > should
> > > > > > > mention it.
> > > > > > > 3. I'm thinking we can have a threshold to the
> > > > > > > "partitioner.sticky.batch.size". Let's say, we already
> accumulate
> > > > > 15.5KB
> > > > > > in
> > > > > > > partition1, and sent. So when next batch created, in your
> current
> > > > > design,
> > > > > > > we still stick to partition1, until 16KB reached, and then we
> > > create
> > > > a
> > > > > > new
> > > > > > > batch to change to next partition, ex: partition2. But I think
> if
> > > we
> > > > > set
> > > > > > a
> > > > > > > threshold to 95% (for example), we can know previous 15.5KB
> > already
> > > > > > exceeds
> > > > > > > the threshold so that we can directly create new batch for next
> > > > > records.
> > > > > > > This way should be able to make it more efficient. WDYT?
> > > > > > > 4. I think the improved queuing logic should be good enough. I
> > > can't
> > > > > get
> > > > > > > the benefit of having `partition.availability.timeout.ms`
> > config.
> > > In
> > > > > > > short, you want to make the partitioner take the broker load
> into
> > > > > > > consideration. We can just improve that in the queuing logic
> (and
> > > you
> > > > > > > already did it). Why should we add the config? Could you use
> some
> > > > > > examples
> > > > > > > to explain why we need it.
> > > > > > >
> > > > > > > Thank you.
> > > > > > > Luke
> > > > > > >
> > > > > > > On Wed, Feb 16, 2022 at 8:57 AM Artem Livshits
> > > > > > > <alivsh...@confluent.io.invalid> wrote:
> > > > > > >
> > > > > > >> Hello,
> > > > > > >>
> > > > > > >> Please add your comments about the KIP.  If there are no
> > > > > considerations,
> > > > > > >> I'll put it up for vote in the next few days.
> > > > > > >>
> > > > > > >> -Artem
> > > > > > >>
> > > > > > >> On Mon, Feb 7, 2022 at 6:01 PM Artem Livshits <
> > > > alivsh...@confluent.io
> > > > > >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Hello,
> > > > > > >> >
> > > > > > >> > After trying a few prototypes, I've made some changes to the
> > > > public
> > > > > > >> > interface.  Please see the updated document
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > >> > .
> > > > > > >> >
> > > > > > >> > -Artem
> > > > > > >> >
> > > > > > >> > On Thu, Nov 4, 2021 at 10:37 AM Artem Livshits <
> > > > > > alivsh...@confluent.io>
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> >> Hello,
> > > > > > >> >>
> > > > > > >> >> This is the discussion thread for
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
> > > > > > >> >> .
> > > > > > >> >>
> > > > > > >> >> The proposal is a bug fix for
> > > > > > >> >> https://issues.apache.org/jira/browse/KAFKA-10888, but it
> > does
> > > > > > >> include a
> > > > > > >> >> client config change, therefore we have a KIP to discuss.
> > > > > > >> >>
> > > > > > >> >> -Artem
> > > > > > >> >>
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to