Thanks Sophie. Much appreciated! It's good to know this will be improved in a 
later release. 

On 2020/03/23 22:52:27, Sophie Blee-Goldman <sop...@confluent.io> wrote: 
> I don't think it has anything to do with your specific topology, but it
> might be
> that the "stickiness" is overriding the "data parallelism balance" in the
> current
> assignment algorithm. There are a lot of different factors to optimize for,
> so we
> end up making tradeoffs with a rough hierarchy of these factors:
> 
> 1) task balance (number of tasks)
> 2) stickiness (instance had this task before, helps with state restoration)
> 3) data parallelism (distributing tasks of the same subtopology across
> different instances)
> 
> In other words, within the constraint that instances should have an equal
> number of
> total tasks (proportionate to their number of stream thread, differing by
> at most 1), the
> next highest criteria is "which instance had this task last". So, if you
> bring up your
> instances one at a time, the first one will end up with all the tasks
> initially. When you
> bring up the second, we will go through the tasks and assign them to their
> previous
> owner if possible, ie, until that previous owner is full (the balance
> constraint). Since
> we assign tasks in subtopology order, eg t1p0, t1p1, t2p0, t2p1, t3 ... the
> "first" tasks
> will presumably all end up on your first instance. It sounds like in your
> case the "first"
> tasks are the ones for the input topic.
> 
> I hope that made sense and didn't just confuse things further -- if you're
> interested in
> the assignment code I'm referring to take a look at StickyTaskAssignor. The
> good news
> is that KIP-441 will definitely address this limitation of the current
> assignment algorithm.
> 
> On Mon, Mar 23, 2020 at 7:57 AM Stephen Young
> <stephendeyo...@yahoo.co.uk.invalid> wrote:
> 
> > Thanks for your help Sophie and Matthias.
> >
> > In my cloud environment I'm using kafka version 2.2.1. I've tested this
> > locally with 2.4.1 and I can see the same issue with 3 local instances. As
> > I add more local instances I start to see better balancing.
> >
> > I was wondering if the issue could be because my kafka streams app reads
> > the input topic as a ktable. I want this so it is simple for producers to
> > send deletes and updates to records in the topic (by nullifying keys etc)
> > and also so my streams app automatically recalculates various aggregations
> > in all of the sub-topologies. Could this be the cause of the problem?
> >
> > Stephen
> >
> > On Fri, 20 Mar 2020 at 17:33, Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> >
> > > Although it's not the main objective, one side effect of KIP-441 should
> > be
> > > improved balance of the final stable assignment. By warming up standbys
> > > before switching them over to active tasks we can achieve stickiness
> > > without
> > > sacrificing balance in the followup rebalance.
> > >
> > > This work is targeted for the next release, so if you do still observe
> > > issues in
> > > newer versions I'd recommend trying out 2.6 when it comes out.
> > >
> > > You can read up on the details and track the progress of this KIP in the
> > > KIP document:
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > > JIRA: https://issues.apache.org/jira/browse/KAFKA-6145?src=confmacro
> > >
> > > Cheers,
> > > Sophie
> > >
> > > On Fri, Mar 20, 2020 at 10:20 AM Matthias J. Sax <mj...@apache.org>
> > wrote:
> > >
> > > > Partition assignment, or move specific "task placement" for Kafka
> > > > Streams, is a hard-coded algorithm (cf.
> > > >
> > > >
> > >
> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
> > > > ).
> > > > The algorithm actually tires to assign different tasks from the same
> > > > sub-topology to different instances and thus, your 6 input topic
> > > > partitions should ideally get balanced over your 3 instance (ie, 2
> > each,
> > > > one for each thread).
> > > >
> > > > However, the algorithm needs to trade-off load balancing and stickiness
> > > > (to avoid unnecessary, expensive state migration) and thus, the
> > > > placement strategy is best effort only. Also, in older versions there
> > > > was some issue that got fixed in newer version (ie, 2.0.x and newer).
> > > > Not sure what version you are on (as you linked to 1.0 docs, maybe
> > > > upgrade resolves your issue?).
> > > >
> > > > Compare:
> > > >
> > > >  - https://issues.apache.org/jira/browse/KAFKA-6039
> > > >  - https://issues.apache.org/jira/browse/KAFKA-7144
> > > >
> > > > If you still observe issues in never version, please comment on the
> > > > tickets ofr create a new ticket describing the problem. Or even better,
> > > > do a PR to help improving the "task placement" algorithm. :)
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 3/20/20 6:47 AM, Stephen Young wrote:
> > > > > Thanks Guozhang. That's really helpful!
> > > > >
> > > > > Are you able to explain a bit more about how it would work for my use
> > > > case? As I understand it this 'repartition' method enables us to
> > > > materialize a stream to a new topic with a custom partitioning
> > strategy.
> > > > >
> > > > > But my problem is not how the topic is partitioned. My issue is that
> > > the
> > > > partitions of the source topic need to be spread equally amongst all
> > the
> > > > available threads. How could 'repartition' help with this?
> > > > >
> > > > > Stephen
> > > > >
> > > > > On 2020/03/19 23:20:54, Guozhang Wang <wangg...@gmail.com> wrote:
> > > > >> Hi Stephen,
> > > > >>
> > > > >> We've deprecated the partition-grouper API due to its drawbacks in
> > > > >> upgrading compatibility (consider if you want to change the
> > > > num.partitions
> > > > >> while evolving your application), and instead we're working on
> > KIP-221
> > > > for
> > > > >> the same purpose of your use case:
> > > > >>
> > > > >>
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
> > > > >>
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >> On Wed, Mar 18, 2020 at 7:48 AM Stephen Young
> > > > >> <wintersg...@googlemail.com.invalid> wrote:
> > > > >>
> > > > >>> I have a question about partition assignment for a kafka streams
> > app.
> > > > As I
> > > > >>> understand it the more complex your topology is the greater the
> > > number
> > > > of
> > > > >>> internal topics kafka streams will create. In my case the app has 8
> > > > graphs
> > > > >>> in the topology. There are 6 partitions for each graph (this
> > matches
> > > > the
> > > > >>> number of partitions of the input topic). So there are 48
> > partitions
> > > > that
> > > > >>> the app needs to handle. These get balanced equally across all 3
> > > > servers
> > > > >>> where the app is running (each server also has 2 threads so there
> > > are 6
> > > > >>> available instances of the app).
> > > > >>>
> > > > >>> The problem for me is that the partitions of the input topic have
> > the
> > > > >>> heaviest workload. But these 6 partitions are not distributed
> > evenly
> > > > >>> amongst the instances. They are just considered 6 partitions
> > amongst
> > > > the 48
> > > > >>> the app needs to balance. But this means if a server gets most or
> > all
> > > > of
> > > > >>> these 6 partitions, it ends up exhausting all of the resources on
> > > that
> > > > >>> server.
> > > > >>>
> > > > >>> Is there a way of equally balancing these 6 specific partitions
> > > > amongst the
> > > > >>> available instances? I thought writing a custom partition grouper
> > > might
> > > > >>> help here:
> > > > >>>
> > > > >>>
> > > > >>>
> > > >
> > >
> > https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#partition-grouper
> > > > >>>
> > > > >>> But the advice seems to be to not do this otherwise you risk
> > breaking
> > > > the
> > > > >>> app.
> > > > >>>
> > > > >>> Thanks!
> > > > >>>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > >
> > > >
> > >
> >
> 

Reply via email to