[ https://issues.apache.org/jira/browse/KAFKA-10121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17478283#comment-17478283 ]
Tim Patterson commented on KAFKA-10121: --------------------------------------- Created https://issues.apache.org/jira/browse/KAFKA-13600 which is more about the rebalancing/warmup algo, but I believe taking into account current ownership as per this ticket might also fix this. > Streams Task Assignment optimization design > ------------------------------------------- > > Key: KAFKA-10121 > URL: https://issues.apache.org/jira/browse/KAFKA-10121 > Project: Kafka > Issue Type: Task > Components: streams > Affects Versions: 2.6.0 > Reporter: John Roesler > Priority: Minor > > Beginning in Kafka 2.6.0, Streams has a new task assignment algorithm that > reacts to cluster membership changes by starting out 100% sticky and warming > up tasks in the background to eventually migrate to a 100% balanced > assignment. See KIP-441 for the details. > However, in computing the final, 100% balanced, assignment, the assignor > doesn't take into account the current ownership of the tasks. Thus, when > instances are added or removed, the assignor is likely to migrate large > numbers of tasks. This is mitigated by the fact that the migrations happen at > a trickle over time in the background, but it's still better to avoid > unnecessary migrations if possible. See the example below for details. > The solution seems to be to use some kind of optimization algorithm to find a > 100% balanced assignment that also has maximum overlap with the current > assignment. > I'd formally state the optimization problem as: > > Generate a 100% balanced assignment that has the maximum overlap with the > > current assignment. > > Example, with additional detail: > The main focus of the KIP-441 work was the migration mechanism that allows > Streams to warm up state for new instances in the background while continuing > to process tasks on the instances that previously owned them. Accordingly the > assignment algorithm itself focuses on simplicity and guaranteed balance, not > optimality. > There are three kinds of balance that all have to be met for Stream to be > 100% balanced: > # Active task balance: no member should have more active processing workload > than any other > # Stateful task balance: no member should have more stateful tasks (either > active and stateful or standby) than any other > # Task parallel balance: no member should have more tasks (partitions) for a > single subtopology than another > (Note: in all these cases, an instance may actually have one more task than > another, if the number of members doesn't evenly divide the number of tasks. > For a simple case, consider if you have two members and only one task. It can > only be assigned to one of the members, and the assignment is still as > balanced as it could be.) > The current algorithm ensures all three kinds of balance thusly: > # sort all members by name (to ensure assignment stability) > # sort all tasks by subtopology first, then by partition. E.g., sorted like > this: 0_0, 0_1, 0_2, 1_0, 1_1 > # for all tasks that are stateful, iterate over both tasks and members in > sorted order, assigning each task t[i] to the member m[i % num_tasks] > # for each standby replica we need to assign, continue looping over the > sorted members, assigning each replica to the next member (assuming the > member doesn't already have a replica of the task) > # for each stateless task, assign an active replica to the member with the > least number of tasks. Since the active assignment of the member with the > least number of tasks should have at most 1 task less than any other member > after step 3, the assignment after step 5 is still balanced. > To demonstrate how a more sophisticated algorithm could minimize migrations, > consider the following simple assignment with two instances and six tasks: > m1: [0_0, 0_2, 0_4] > m2: [0_1, 0_3, 0_5] > Adding a new member causes four of the tasks to migrate: > m1: [0_0, 0_3] > m2: [0_1, 0_4] > m3: [0_2, 0_5] > However, the following assignment is equally balanced, and only two of the > tasks need to migrate: > m1: [0_0, 0_2] > m2: [0_1, 0_3] > m3: [0_4, 0_5] > > Of course, the full problem, including all three kinds of balance is much > more complex to optimize. -- This message was sent by Atlassian Jira (v8.20.1#820001)