[ 
https://issues.apache.org/jira/browse/KAFKA-10121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10121:
---------------------------------
    Description: 
Beginning in Kafka 2.6.0, Streams has a new task assignment algorithm that 
reacts to cluster membership changes by starting out 100% sticky and warming up 
tasks in the background to eventually migrate to a 100% balanced assignment. 
See KIP-441 for the details.

However, in computing the final, 100% balanced, assignment, the assignor 
doesn't take into account the current ownership of the tasks. Thus, when 
instances are added or removed, the assignor is likely to migrate large numbers 
of tasks. This is mitigated by the fact that the migrations happen at a trickle 
over time in the background, but it's still better to avoid unnecessary 
migrations if possible. See the example below for details.

The solution seems to be to use some kind of optimization algorithm to find a 
100% balanced assignment that also has maximum overlap with the current 
assignment.

I'd formally state the optimization problem as:

> Generate a 100% balanced assignment that has the maximum overlap with the 
> current assignment.

 

Example, with additional detail:

The main focus of the KIP-441 work was the migration mechanism that allows 
Streams to warm up state for new instances in the background while continuing 
to process tasks on the instances that previously owned them. Accordingly the 
assignment algorithm itself focuses on simplicity and guaranteed balance, not 
optimality.

There are three kinds of balance that all have to be met for Stream to be 100% 
balanced:
 # Active task balance: no member should have more active processing workload 
than any other
 # Stateful task balance: no member should have more stateful tasks (either 
active and stateful or standby) than any other
 # Task parallel balance: no member should have more tasks (partitions) for a 
single subtopology than another

(Note: in all these cases, an instance may actually have one more task than 
another, if the number of members doesn't evenly divide the number of tasks. 
For a simple case, consider if you have two members and only one task. It can 
only be assigned to one of the members, and the assignment is still as balanced 
as it could be.)

The current algorithm ensures all three kinds of balance thusly:
 # sort all members by name (to ensure assignment stability)
 # sort all tasks by subtopology first, then by partition. E.g., sorted like 
this: 0_0, 0_1, 0_2, 1_0, 1_1
 # for all tasks that are stateful, iterate over both tasks and members in 
sorted order, assigning each task t[i] to the member m[i % num_tasks]
 # for each standby replica we need to assign, continue looping over the sorted 
members, assigning each replica to the next member (assuming the member doesn't 
already have a replica of the task)
 # for each stateless task, assign an active replica to the member with the 
least number of tasks. Since the active assignment of the member with the least 
number of tasks should have at most 1 task less than any other member after 
step 3, the assignment after step 5 is still balanced.

To demonstrate how a more sophisticated algorithm could minimize migrations, 
consider the following simple assignment with two instances and six tasks:

m1: [0_0, 0_2, 0_4]

m2: [0_1, 0_3, 0_5]

Adding a new member causes four of the tasks to migrate:

m1: [0_0, 0_3]

m2: [0_1, 0_4]

m3: [0_2, 0_5]

However, the following assignment is equally balanced, and only two of the 
tasks need to migrate:

m1: [0_0, 0_2]

m2: [0_1, 0_3]

m3: [0_4, 0_5]

 

Of course, the full problem, including all three kinds of balance is much more 
complex to optimize.

  was:
Beginning in Kafka 2.6.0, Streams has a new task assignment algorithm that 
reacts to cluster membership changes by starting out 100% sticky and warming up 
tasks in the background to eventually migrate to a 100% balanced assignment. 
See KIP-441 for the details.

However, in computing the final, 100% balanced, assignment, the assignor 
doesn't take into account the current ownership of the tasks. Thus, when 
instances are added or removed, the assignor is likely to migrate large numbers 
of tasks. This is mitigated by the fact that the migrations happen at a trickle 
over time in the background, but it's still better to avoid unnecessary 
migrations if possible. See the example below for details.

The solution seems to be to use some kind of optimization algorithm to find a 
100% balanced assignment that also has maximum overlap with the current 
assignment.

 

Example, with additional detail:

The main focus of the KIP-441 work was the migration mechanism that allows 
Streams to warm up state for new instances in the background while continuing 
to process tasks on the instances that previously owned them. Accordingly the 
assignment algorithm itself focuses on simplicity and guaranteed balance, not 
optimality.

There are three kinds of balance that all have to be met for Stream to be 100% 
balanced:
 # Active task balance: no member should have more active processing workload 
than any other
 # Stateful task balance: no member should have more stateful tasks (either 
active and stateful or standby) than any other
 # Task parallel balance: no member should have more tasks (partitions) for a 
single subtopology than another

(Note: in all these cases, an instance may actually have one more task than 
another, if the number of members doesn't evenly divide the number of tasks. 
For a simple case, consider if you have two members and only one task. It can 
only be assigned to one of the members, and the assignment is still as balanced 
as it could be.)

The current algorithm ensures all three kinds of balance thusly:
 # sort all members by name (to ensure assignment stability)
 # sort all tasks by subtopology first, then by partition. E.g., sorted like 
this: 0_0, 0_1, 0_2, 1_0, 1_1
 # for all tasks that are stateful, iterate over both tasks and members in 
sorted order, assigning each task t[i] to the member m[i % num_tasks]
 # for each standby replica we need to assign, continue looping over the sorted 
members, assigning each replica to the next member (assuming the member doesn't 
already have a replica of the task)
 # for each stateless task, assign an active replica to the member with the 
least number of tasks. Since the active assignment of the member with the least 
number of tasks should have at most 1 task less than any other member after 
step 3, the assignment after step 5 is still balanced.

To demonstrate how a more sophisticated algorithm could minimize migrations, 
consider the following simple assignment with two instances and six tasks:

m1: [0_0, 0_2, 0_4]

m2: [0_1, 0_3, 0_5]

Adding a new member causes four of the tasks to migrate:

m1: [0_0, 0_3]

m2: [0_1, 0_4]

m3: [0_2, 0_5]

However, the following assignment is equally balanced, and only two of the 
tasks need to migrate:

m1: [0_0, 0_2]

m2: [0_1, 0_3]

m3: [0_4, 0_5]

 

Of course, the full problem, including all three kinds of balance is much more 
complex to optimize.


> Streams Task Assignment optimization design
> -------------------------------------------
>
>                 Key: KAFKA-10121
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10121
>             Project: Kafka
>          Issue Type: Task
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: John Roesler
>            Priority: Minor
>
> Beginning in Kafka 2.6.0, Streams has a new task assignment algorithm that 
> reacts to cluster membership changes by starting out 100% sticky and warming 
> up tasks in the background to eventually migrate to a 100% balanced 
> assignment. See KIP-441 for the details.
> However, in computing the final, 100% balanced, assignment, the assignor 
> doesn't take into account the current ownership of the tasks. Thus, when 
> instances are added or removed, the assignor is likely to migrate large 
> numbers of tasks. This is mitigated by the fact that the migrations happen at 
> a trickle over time in the background, but it's still better to avoid 
> unnecessary migrations if possible. See the example below for details.
> The solution seems to be to use some kind of optimization algorithm to find a 
> 100% balanced assignment that also has maximum overlap with the current 
> assignment.
> I'd formally state the optimization problem as:
> > Generate a 100% balanced assignment that has the maximum overlap with the 
> > current assignment.
>  
> Example, with additional detail:
> The main focus of the KIP-441 work was the migration mechanism that allows 
> Streams to warm up state for new instances in the background while continuing 
> to process tasks on the instances that previously owned them. Accordingly the 
> assignment algorithm itself focuses on simplicity and guaranteed balance, not 
> optimality.
> There are three kinds of balance that all have to be met for Stream to be 
> 100% balanced:
>  # Active task balance: no member should have more active processing workload 
> than any other
>  # Stateful task balance: no member should have more stateful tasks (either 
> active and stateful or standby) than any other
>  # Task parallel balance: no member should have more tasks (partitions) for a 
> single subtopology than another
> (Note: in all these cases, an instance may actually have one more task than 
> another, if the number of members doesn't evenly divide the number of tasks. 
> For a simple case, consider if you have two members and only one task. It can 
> only be assigned to one of the members, and the assignment is still as 
> balanced as it could be.)
> The current algorithm ensures all three kinds of balance thusly:
>  # sort all members by name (to ensure assignment stability)
>  # sort all tasks by subtopology first, then by partition. E.g., sorted like 
> this: 0_0, 0_1, 0_2, 1_0, 1_1
>  # for all tasks that are stateful, iterate over both tasks and members in 
> sorted order, assigning each task t[i] to the member m[i % num_tasks]
>  # for each standby replica we need to assign, continue looping over the 
> sorted members, assigning each replica to the next member (assuming the 
> member doesn't already have a replica of the task)
>  # for each stateless task, assign an active replica to the member with the 
> least number of tasks. Since the active assignment of the member with the 
> least number of tasks should have at most 1 task less than any other member 
> after step 3, the assignment after step 5 is still balanced.
> To demonstrate how a more sophisticated algorithm could minimize migrations, 
> consider the following simple assignment with two instances and six tasks:
> m1: [0_0, 0_2, 0_4]
> m2: [0_1, 0_3, 0_5]
> Adding a new member causes four of the tasks to migrate:
> m1: [0_0, 0_3]
> m2: [0_1, 0_4]
> m3: [0_2, 0_5]
> However, the following assignment is equally balanced, and only two of the 
> tasks need to migrate:
> m1: [0_0, 0_2]
> m2: [0_1, 0_3]
> m3: [0_4, 0_5]
>  
> Of course, the full problem, including all three kinds of balance is much 
> more complex to optimize.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to