Hi, Artem, Thanks for the reply. A few more comments.
1. Perhaps you could elaborate a bit more on how the producer determines the partition if the partitioner returns -1. This will help understand why encapsulating that logic as a partitioner is not clean. 2. I am not sure that I understand this part. If 15.5KB is more efficient, could we just set batch.size to 15.5KB? 4. Yes, we could add a switch (or a variant of the partitioner) for enabling this behavior. Also, choosing partitions based on broker readiness can be made in a smoother way. For example, we could track the last time a broker has drained any batches from the accumulator. We can then select partitions from brokers proportionally to the inverse of that time. This seems smoother than a cutoff based on a partition.availability.timeout.ms threshold. Thanks, Jun On Fri, Feb 18, 2022 at 5:14 PM Artem Livshits <alivsh...@confluent.io.invalid> wrote: > Hello Luke, Jun, > > Thank you for your feedback. I've added the Rejected Alternative section > that may clarify some of the questions w.r.t. returning -1. > > 1. I've elaborated on the -1 in the KIP. The problem is that a significant > part of the logic needs to be in the producer (because it now uses > information about brokers that only the producer knows), so encapsulation > of the logic within the default partitioner isn't as clean. I've added > the Rejected Alternative section that documents an attempt to keep the > encapsulation by providing new callbacks to the partitioner. > > 2. The meaning of the partitioner.sticky.batch.size is explained in the > Uniform Sticky Batch Size section. Basically, we track the amount of bytes > produced to the partition and if it exceeds partitioner.sticky.batch.size > then we switch to the next partition. As far as the reason to make it > different from batch.size, I think Luke answered this with the question #3 > -- what if the load pattern is such that 15.5KB would be more efficient > than 16KB? > > 3. I think it's hard to have one size that would fit all patterns. E.g. if > the load pattern is such that there is linger and the app fills the batch > before linger expires, then having 16KB would most likely synchronize > batching and partition switching, so each partition would get a full > batch. If load pattern is such that there are a few non-complete batches > go out before a larger batch starts to fill, then it may actually be > beneficial to make slightly larger (e.g. linger=0, first few records go in > the first batch, then next few records go to second batch, and so on, until > 5 in-flight, then larger batch would form while waiting for broker to > respond, but the partition switch would happen before the larger batch is > full). > > 4. There are a couple of reasons for introducing > partition.availability.timeout.ms. Luke's an Jun's questions are slightly > different, so I'm going to separate replies. > (Luke) Is the queue size a good enough signal? I think it's a good default > signal as it tries to preserve general fairness and not overreact on the > broker's state at each moment in time. But because it's smooth, it may not > be reactive enough to instantaneous latency jumps. For latency-sensitive > workloads, it may be desirable to react faster when a broker becomes > unresponsive (but that may make the distribution really choppy), so > partition.availability.timeout.ms provides an opportunity to tune > adaptiveness. > > (Jun) Can we just not assign partitions to brokers that are not ready? > Switching partitions purely based on current broker readiness information > can really skew workload I think (or at least I couldn't build a model that > proves that over time it's going to be generally fair), I feel that the > algorithm should try to be fair in general and use smoother signals by > default (e.g. a broker with choppier latency may get much less load even > though it can handle throughput, it then may potentially skew consumption), > note that the queue-size-based logic uses probabilities (so we don't fully > remove brokers, just make it less likely) and relative info rather than a > threshold (so if all brokers are heavily, but equally loaded, they will get > equal distribution, rather than get removed because they exceed some > threshold). So at the very least, I would like this logic to be turned off > by default as it's hard to predict what it could do with different patterns > (which means that there would need to be some configuration). We could > just not use brokers that are not ready, but again, I think that it's good > to try to be fair under normal circumstances, so if normally brokers can > respond under some partition.availability.timeout.ms threshold and the > application works well with those latencies, then we could distribute data > equally between brokers that don't exceed the latencies. The value, of > course, would depend on the environment and app requirements, hence it's > configurable. > > 10. Added a statement at the beginning of the proposed changes. > > -Artem > > > On Thu, Feb 17, 2022 at 3:46 PM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Artem, > > > > Thanks for the KIP. A few comments below. > > > > 1. I agree with Luke that having the partitioner returning -1 is kind of > > weird. Could we just change the implementation of DefaultPartitioner to > the > > new behavior? > > > > 2. partitioner.sticky.batch.size: Similar question to Luke. I am not sure > > why we want to introduce this new configuration. Could we just use the > > existing batch.size? > > > > 4. I also agree with Luke that it's not clear why we need > > partition.availability.timeout.ms. The KIP says the broker "would not be > > chosen until the broker is able to accept the next ready batch from the > > partition". If we are keeping track of this, could we just avoid > assigning > > records to partitions whose leader is not able to accept the next batch? > If > > we do that, perhaps we don't need partition.availability.timeout.ms. > > > > 10. Currently, partitioner.class defaults to DefaultPartitioner, which > uses > > StickyPartitioner when the key is specified. Since this KIP improves upon > > StickyPartitioner, it would be useful to make the new behavior the > default > > and document that in the KIP. > > > > Thanks, > > > > Jun > > > > > > On Wed, Feb 16, 2022 at 7:30 PM Luke Chen <show...@gmail.com> wrote: > > > > > Hi Artem, > > > > > > Also, one more thing I think you need to know. > > > As this bug KAFKA-7572 < > https://issues.apache.org/jira/browse/KAFKA-7572 > > > > > > mentioned, sometimes the custom partitioner would return negative > > partition > > > id accidentally. > > > If it returned -1, how could you know if it is expected or not > expected? > > > > > > Thanks. > > > Luke > > > > > > On Wed, Feb 16, 2022 at 3:28 PM Luke Chen <show...@gmail.com> wrote: > > > > > > > Hi Artem, > > > > > > > > Thanks for the update. I have some questions about it: > > > > > > > > 1. Could you explain why you need the `partitioner` return -1? In > which > > > > case we need it? And how it is used in your KIP? > > > > 2. What does the "partitioner.sticky.batch.size" mean? In the > > > > "Configuration" part, you didn't explain it. And default to 0, I > guess > > > it's > > > > the same as current behavior for backward compatibility, right? You > > > should > > > > mention it. > > > > 3. I'm thinking we can have a threshold to the > > > > "partitioner.sticky.batch.size". Let's say, we already accumulate > > 15.5KB > > > in > > > > partition1, and sent. So when next batch created, in your current > > design, > > > > we still stick to partition1, until 16KB reached, and then we create > a > > > new > > > > batch to change to next partition, ex: partition2. But I think if we > > set > > > a > > > > threshold to 95% (for example), we can know previous 15.5KB already > > > exceeds > > > > the threshold so that we can directly create new batch for next > > records. > > > > This way should be able to make it more efficient. WDYT? > > > > 4. I think the improved queuing logic should be good enough. I can't > > get > > > > the benefit of having `partition.availability.timeout.ms` config. In > > > > short, you want to make the partitioner take the broker load into > > > > consideration. We can just improve that in the queuing logic (and you > > > > already did it). Why should we add the config? Could you use some > > > examples > > > > to explain why we need it. > > > > > > > > Thank you. > > > > Luke > > > > > > > > On Wed, Feb 16, 2022 at 8:57 AM Artem Livshits > > > > <alivsh...@confluent.io.invalid> wrote: > > > > > > > >> Hello, > > > >> > > > >> Please add your comments about the KIP. If there are no > > considerations, > > > >> I'll put it up for vote in the next few days. > > > >> > > > >> -Artem > > > >> > > > >> On Mon, Feb 7, 2022 at 6:01 PM Artem Livshits < > alivsh...@confluent.io > > > > > > >> wrote: > > > >> > > > >> > Hello, > > > >> > > > > >> > After trying a few prototypes, I've made some changes to the > public > > > >> > interface. Please see the updated document > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > > >> > . > > > >> > > > > >> > -Artem > > > >> > > > > >> > On Thu, Nov 4, 2021 at 10:37 AM Artem Livshits < > > > alivsh...@confluent.io> > > > >> > wrote: > > > >> > > > > >> >> Hello, > > > >> >> > > > >> >> This is the discussion thread for > > > >> >> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner > > > >> >> . > > > >> >> > > > >> >> The proposal is a bug fix for > > > >> >> https://issues.apache.org/jira/browse/KAFKA-10888, but it does > > > >> include a > > > >> >> client config change, therefore we have a KIP to discuss. > > > >> >> > > > >> >> -Artem > > > >> >> > > > >> > > > > >> > > > > > > > > > >