[ 
https://issues.apache.org/jira/browse/KAFKA-10121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17478283#comment-17478283
 ] 

Tim Patterson commented on KAFKA-10121:
---------------------------------------

Created https://issues.apache.org/jira/browse/KAFKA-13600 which is more about 
the rebalancing/warmup algo, but I believe taking into account current 
ownership as per this ticket might also fix this.

> Streams Task Assignment optimization design
> -------------------------------------------
>
>                 Key: KAFKA-10121
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10121
>             Project: Kafka
>          Issue Type: Task
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: John Roesler
>            Priority: Minor
>
> Beginning in Kafka 2.6.0, Streams has a new task assignment algorithm that 
> reacts to cluster membership changes by starting out 100% sticky and warming 
> up tasks in the background to eventually migrate to a 100% balanced 
> assignment. See KIP-441 for the details.
> However, in computing the final, 100% balanced, assignment, the assignor 
> doesn't take into account the current ownership of the tasks. Thus, when 
> instances are added or removed, the assignor is likely to migrate large 
> numbers of tasks. This is mitigated by the fact that the migrations happen at 
> a trickle over time in the background, but it's still better to avoid 
> unnecessary migrations if possible. See the example below for details.
> The solution seems to be to use some kind of optimization algorithm to find a 
> 100% balanced assignment that also has maximum overlap with the current 
> assignment.
> I'd formally state the optimization problem as:
> > Generate a 100% balanced assignment that has the maximum overlap with the 
> > current assignment.
>  
> Example, with additional detail:
> The main focus of the KIP-441 work was the migration mechanism that allows 
> Streams to warm up state for new instances in the background while continuing 
> to process tasks on the instances that previously owned them. Accordingly the 
> assignment algorithm itself focuses on simplicity and guaranteed balance, not 
> optimality.
> There are three kinds of balance that all have to be met for Stream to be 
> 100% balanced:
>  # Active task balance: no member should have more active processing workload 
> than any other
>  # Stateful task balance: no member should have more stateful tasks (either 
> active and stateful or standby) than any other
>  # Task parallel balance: no member should have more tasks (partitions) for a 
> single subtopology than another
> (Note: in all these cases, an instance may actually have one more task than 
> another, if the number of members doesn't evenly divide the number of tasks. 
> For a simple case, consider if you have two members and only one task. It can 
> only be assigned to one of the members, and the assignment is still as 
> balanced as it could be.)
> The current algorithm ensures all three kinds of balance thusly:
>  # sort all members by name (to ensure assignment stability)
>  # sort all tasks by subtopology first, then by partition. E.g., sorted like 
> this: 0_0, 0_1, 0_2, 1_0, 1_1
>  # for all tasks that are stateful, iterate over both tasks and members in 
> sorted order, assigning each task t[i] to the member m[i % num_tasks]
>  # for each standby replica we need to assign, continue looping over the 
> sorted members, assigning each replica to the next member (assuming the 
> member doesn't already have a replica of the task)
>  # for each stateless task, assign an active replica to the member with the 
> least number of tasks. Since the active assignment of the member with the 
> least number of tasks should have at most 1 task less than any other member 
> after step 3, the assignment after step 5 is still balanced.
> To demonstrate how a more sophisticated algorithm could minimize migrations, 
> consider the following simple assignment with two instances and six tasks:
> m1: [0_0, 0_2, 0_4]
> m2: [0_1, 0_3, 0_5]
> Adding a new member causes four of the tasks to migrate:
> m1: [0_0, 0_3]
> m2: [0_1, 0_4]
> m3: [0_2, 0_5]
> However, the following assignment is equally balanced, and only two of the 
> tasks need to migrate:
> m1: [0_0, 0_2]
> m2: [0_1, 0_3]
> m3: [0_4, 0_5]
>  
> Of course, the full problem, including all three kinds of balance is much 
> more complex to optimize.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to