Thanks Sophie. Much appreciated! It's good to know this will be improved in a later release.
On 2020/03/23 22:52:27, Sophie Blee-Goldman <sop...@confluent.io> wrote: > I don't think it has anything to do with your specific topology, but it > might be > that the "stickiness" is overriding the "data parallelism balance" in the > current > assignment algorithm. There are a lot of different factors to optimize for, > so we > end up making tradeoffs with a rough hierarchy of these factors: > > 1) task balance (number of tasks) > 2) stickiness (instance had this task before, helps with state restoration) > 3) data parallelism (distributing tasks of the same subtopology across > different instances) > > In other words, within the constraint that instances should have an equal > number of > total tasks (proportionate to their number of stream thread, differing by > at most 1), the > next highest criteria is "which instance had this task last". So, if you > bring up your > instances one at a time, the first one will end up with all the tasks > initially. When you > bring up the second, we will go through the tasks and assign them to their > previous > owner if possible, ie, until that previous owner is full (the balance > constraint). Since > we assign tasks in subtopology order, eg t1p0, t1p1, t2p0, t2p1, t3 ... the > "first" tasks > will presumably all end up on your first instance. It sounds like in your > case the "first" > tasks are the ones for the input topic. > > I hope that made sense and didn't just confuse things further -- if you're > interested in > the assignment code I'm referring to take a look at StickyTaskAssignor. The > good news > is that KIP-441 will definitely address this limitation of the current > assignment algorithm. > > On Mon, Mar 23, 2020 at 7:57 AM Stephen Young > <stephendeyo...@yahoo.co.uk.invalid> wrote: > > > Thanks for your help Sophie and Matthias. > > > > In my cloud environment I'm using kafka version 2.2.1. I've tested this > > locally with 2.4.1 and I can see the same issue with 3 local instances. As > > I add more local instances I start to see better balancing. > > > > I was wondering if the issue could be because my kafka streams app reads > > the input topic as a ktable. I want this so it is simple for producers to > > send deletes and updates to records in the topic (by nullifying keys etc) > > and also so my streams app automatically recalculates various aggregations > > in all of the sub-topologies. Could this be the cause of the problem? > > > > Stephen > > > > On Fri, 20 Mar 2020 at 17:33, Sophie Blee-Goldman <sop...@confluent.io> > > wrote: > > > > > Although it's not the main objective, one side effect of KIP-441 should > > be > > > improved balance of the final stable assignment. By warming up standbys > > > before switching them over to active tasks we can achieve stickiness > > > without > > > sacrificing balance in the followup rebalance. > > > > > > This work is targeted for the next release, so if you do still observe > > > issues in > > > newer versions I'd recommend trying out 2.6 when it comes out. > > > > > > You can read up on the details and track the progress of this KIP in the > > > KIP document: > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams > > > JIRA: https://issues.apache.org/jira/browse/KAFKA-6145?src=confmacro > > > > > > Cheers, > > > Sophie > > > > > > On Fri, Mar 20, 2020 at 10:20 AM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > > > Partition assignment, or move specific "task placement" for Kafka > > > > Streams, is a hard-coded algorithm (cf. > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java > > > > ). > > > > The algorithm actually tires to assign different tasks from the same > > > > sub-topology to different instances and thus, your 6 input topic > > > > partitions should ideally get balanced over your 3 instance (ie, 2 > > each, > > > > one for each thread). > > > > > > > > However, the algorithm needs to trade-off load balancing and stickiness > > > > (to avoid unnecessary, expensive state migration) and thus, the > > > > placement strategy is best effort only. Also, in older versions there > > > > was some issue that got fixed in newer version (ie, 2.0.x and newer). > > > > Not sure what version you are on (as you linked to 1.0 docs, maybe > > > > upgrade resolves your issue?). > > > > > > > > Compare: > > > > > > > > - https://issues.apache.org/jira/browse/KAFKA-6039 > > > > - https://issues.apache.org/jira/browse/KAFKA-7144 > > > > > > > > If you still observe issues in never version, please comment on the > > > > tickets ofr create a new ticket describing the problem. Or even better, > > > > do a PR to help improving the "task placement" algorithm. :) > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 3/20/20 6:47 AM, Stephen Young wrote: > > > > > Thanks Guozhang. That's really helpful! > > > > > > > > > > Are you able to explain a bit more about how it would work for my use > > > > case? As I understand it this 'repartition' method enables us to > > > > materialize a stream to a new topic with a custom partitioning > > strategy. > > > > > > > > > > But my problem is not how the topic is partitioned. My issue is that > > > the > > > > partitions of the source topic need to be spread equally amongst all > > the > > > > available threads. How could 'repartition' help with this? > > > > > > > > > > Stephen > > > > > > > > > > On 2020/03/19 23:20:54, Guozhang Wang <wangg...@gmail.com> wrote: > > > > >> Hi Stephen, > > > > >> > > > > >> We've deprecated the partition-grouper API due to its drawbacks in > > > > >> upgrading compatibility (consider if you want to change the > > > > num.partitions > > > > >> while evolving your application), and instead we're working on > > KIP-221 > > > > for > > > > >> the same purpose of your use case: > > > > >> > > > > >> > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint > > > > >> > > > > >> > > > > >> Guozhang > > > > >> > > > > >> On Wed, Mar 18, 2020 at 7:48 AM Stephen Young > > > > >> <wintersg...@googlemail.com.invalid> wrote: > > > > >> > > > > >>> I have a question about partition assignment for a kafka streams > > app. > > > > As I > > > > >>> understand it the more complex your topology is the greater the > > > number > > > > of > > > > >>> internal topics kafka streams will create. In my case the app has 8 > > > > graphs > > > > >>> in the topology. There are 6 partitions for each graph (this > > matches > > > > the > > > > >>> number of partitions of the input topic). So there are 48 > > partitions > > > > that > > > > >>> the app needs to handle. These get balanced equally across all 3 > > > > servers > > > > >>> where the app is running (each server also has 2 threads so there > > > are 6 > > > > >>> available instances of the app). > > > > >>> > > > > >>> The problem for me is that the partitions of the input topic have > > the > > > > >>> heaviest workload. But these 6 partitions are not distributed > > evenly > > > > >>> amongst the instances. They are just considered 6 partitions > > amongst > > > > the 48 > > > > >>> the app needs to balance. But this means if a server gets most or > > all > > > > of > > > > >>> these 6 partitions, it ends up exhausting all of the resources on > > > that > > > > >>> server. > > > > >>> > > > > >>> Is there a way of equally balancing these 6 specific partitions > > > > amongst the > > > > >>> available instances? I thought writing a custom partition grouper > > > might > > > > >>> help here: > > > > >>> > > > > >>> > > > > >>> > > > > > > > > > https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#partition-grouper > > > > >>> > > > > >>> But the advice seems to be to not do this otherwise you risk > > breaking > > > > the > > > > >>> app. > > > > >>> > > > > >>> Thanks! > > > > >>> > > > > >> > > > > >> > > > > >> -- > > > > >> -- Guozhang > > > > >> > > > > > > > > > > > > > >