[ 
https://issues.apache.org/jira/browse/KAFKA-4117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4117:
---------------------------------
    Description: 
I went through the whole assignment logic once again and I feel the logic has 
now becomes a bit lossy, and I want to clean them up probably in another PR but 
just dump my thoughts here on the appropriate logic:

Some background:

1. Each {{KafkaStreams}} instance contains a clientId, and if not specified 
default value is applicationId-1/2/etc if there are multiple instances inside 
the same JVM. One instance contains multiple threads where the thread-clientId 
is constructed as clientId-StreamThread-1/2/etc, and the thread-clientId is 
used as the embedded consumer clientId as well as metrics tag.

2. However, since one instance can contain multiple threads, and hence multiple 
consumers, and when considering partition assignment, the streams library need 
to take the capacity into consideration based on the granularity of instance 
not on threads. Therefore we create a 4byte {{UUID.randomUUID()}} as the 
processId and encode that in the subscription metadata bytes, and the leader 
then knows if multiple consumer members are actually belong to the same 
instance (i.e. belong to threads of that instance), so that when assigning 
partitions it can balance among instances. NOTE that in production we recommend 
one thread per instance, so consumersByClient will only have one consumer per 
client (i.e. instance).

3. In addition, historically we hard-code the partition grouper logic, where 
for each task, it is assigned only with one partition of its subscribed topic. 
For example, if we have topicA with 5 partitions and topicB with 10 partitions, 
we will create 10 tasks, with the first five tasks containing one of the 
partitions each, while the last five tasks contain only one partition from 
topicB. And therefore the TaskId class contains the groupId of the sub-topology 
and the partition, so that taskId(group, 1) gets partition1 of topicA and 
partition1 of topicB. We later expose this to users to customize so that more 
than one partitions of the topic can be assigned to the same task, so that the 
partition field in the TaskId no longer indicate anything about which 
partitions are assigned, and we add {{AssignedPartitions}} to capture which 
partitions are assigned to which tasks.

4. While doing the assignment, the leader is also responsible for creating 
these changelog / repartition topics, and the number of partitions of these 
topics are equal to the number of tasks that needs to write to these topics, 
which are wrapped in {{stateChangelogTopicToTaskIds}} and 
{{internalSourceTopicToTaskIds}} respectively. After such topics are created, 
the leader also needs to "augment" the received cluster metadata with these 
topics to 1) check for copartitioning, and 2) maintained for QueryableState's 
discovery function.

The current implementation is mixed with all these legacy logic and gets quite 
messy, and I'm thinking to make a pass over the StreamPartitionAssignor and 
cleaning up it bit. More precisely:

1. Read and parse the subscription information to construct the clientMetadata 
map, where each metadata contains the {{Set<String> consumerMemberIds}}, 
{{ClientState<TaskId> state}}, and {{HostInfo hostInfo}}.

2. Access the (sub-)topology to create the corresponding changelog / 
repartition topics and construct the {{stateChangelogTopicToTaskIds}} and 
{{internalSourceTopicToTaskIds}}.

3. Call {{streamThread.partitionGrouper.partitionGroups}} to get the map from 
created tasks to their assigned partitions.

4. Call {{TaskAssignor.assign}} (which now takes the whole clientMetadata map) 
to assign tasks to clients, and hence we get the assigned partitions to clients.

5. For each client, use some round-robin manner (as we did now) to assign tasks 
to their hosted consumers with the {{clientMetadata.consumerMemberIds}} map.

6. Check co-partitioning of assigned partitions, and maintain the {{Cluster}} 
metadata locally on the leader.

7. Construct the assignment info, where activeTasks is also a map from 
{{TaskId}} to list of {{TopicPartitions}} since otherwise we will not know 
which partitions are assigned to which tasks.

8. For non-leaders, when getting the assignment, also construct the Cluster 
metadata from the decoded assignment information; and also maintain the 
AssignmentInfo locally for constructing the tasks.

And some minor improvements:

1. The default {{thread-clientIds applicationId-x-StreamThread-y}} may still be 
conflicting to each other with multiple JVMs / machines, which is bad for 
metrics collection / debugging across hosts. We can modify the default clientId 
to {{applicationId-processId}} whereprocessId is the generated UUID, hence the 
default thread-clientId is {{applicationId-UUID-StreamThread-y}}.

2. The {{TaskId.partition}} field no longer indicate which partitions are 
actually assigned to this task, but we still need to keep its topicGroupId 
field as it indicates which sub-topology this task belongs to, hence helpful 
for debugging. So maybe we can rename the partition field to sth. like sequence?

  was:
I went through the whole assignment logic once again and I feel the logic has 
now becomes a bit lossy, and I want to clean them up probably in another PR but 
just dump my thoughts here on the appropriate logic:

Some background:

1. Each {{KafkaStreams}} instance contains a clientId, and if not specified 
default value is applicationId-1/2/etc if there are multiple instances inside 
the same JVM. One instance contains multiple threads where the thread-clientId 
is constructed as clientId-StreamThread-1/2/etc, and the thread-clientId is 
used as the embedded consumer clientId as well as metrics tag.

2. However, since one instance can contain multiple threads, and hence multiple 
consumers, and when considering partition assignment, the streams library need 
to take the capacity into consideration based on the granularity of instance 
not on threads. Therefore we create a 4byte {{UUID.randomUUID()}} as the 
processId and encode that in the subscription metadata bytes, and the leader 
then knows if multiple consumer members are actually belong to the same 
instance (i.e. belong to threads of that instance), so that when assigning 
partitions it can balance among instances. NOTE that in production we recommend 
one thread per instance, so consumersByClient will only have one consumer per 
client (i.e. instance).

3. In addition, historically we hard-code the partition grouper logic, where 
for each task, it is assigned only with one partition of its subscribed topic. 
For example, if we have topicA with 5 partitions and topicB with 10 partitions, 
we will create 10 tasks, with the first five tasks containing one of the 
partitions each, while the last five tasks contain only one partition from 
topicB. And therefore the TaskId class contains the groupId of the sub-topology 
and the partition, so that taskId(group, 1) gets partition1 of topicA and 
partition1 of topicB. We later expose this to users to customize so that more 
than one partitions of the topic can be assigned to the same task, so that the 
partition field in the TaskId no longer indicate anything about which 
partitions are assigned, and we add {{AssignedPartitions}} to capture which 
partitions are assigned to which tasks.

4. While doing the assignment, the leader is also responsible for creating 
these changelog / repartition topics, and the number of partitions of these 
topics are equal to the number of tasks that needs to write to these topics, 
which are wrapped in {{stateChangelogTopicToTaskIds}} and 
{{internalSourceTopicToTaskIds}} respectively. After such topics are created, 
the leader also needs to "augment" the received cluster metadata with these 
topics to 1) check for copartitioning, and 2) maintained for QueryableState's 
discovery function.

The current implementation is mixed with all these legacy logic and gets quite 
messy, and I'm thinking to make a pass over the StreamPartitionAssignor and 
cleaning up it bit. More precisely:

1. Read and parse the subscription information to construct the clientMetadata 
map, where each metadata contains the {{Set<String> consumerMemberIds}}, 
{{ClientState<TaskId> state}}, and {{HostInfo hostInfo}}.

2. Access the (sub-)topology to create the corresponding changelog / 
repartition topics and construct the {{stateChangelogTopicToTaskIds}} and 
{{internalSourceTopicToTaskIds}}.

3. Call {{streamThread.partitionGrouper.partitionGroups}} to get the map from 
created tasks to their assigned partitions.

4. Call {{TaskAssignor.assign}} (which now takes the whole clientMetadata map) 
to assign tasks to clients, and hence we get the assigned partitions to clients.

5. For each client, use some round-robin manner (as we did now) to assign tasks 
to their hosted consumers with the {{clientMetadata.consumerMemberIds}} map.

6. Check co-partitioning of assigned partitions, and maintain the {{Cluster}} 
metadata locally on the leader.

7. Construct the assignment info, where activeTasks is also a map from 
{{TaskId}} to list of {{TopicPartitions}} since otherwise we will not know 
which partitions are assigned to which tasks.

8. For non-leaders, when getting the assignment, also construct the Cluster 
metadata from the decoded assignment information; and also maintain the 
AssignmentInfo locally for constructing the tasks.

And some minor improvements:

1. The default {{thread-clientIds applicationId-x-StreamThread-y}} may still be 
conflicting to each other with multiple JVMs / machines, which is bad for 
metrics collection / debugging across hosts. We can modify the default clientId 
to {{applicationId-processId}} whereprocessId is the generated UUID, hence the 
default thread-clientId is {{applicationId-UUID-StreamThread-y}}.

2. The {{TaskId.partition}} field no longer indicate which partitions are 
actually assigned to this task, but we still need to keep its topicGroupId 
field as it indicates which sub-topology this task belongs to, hence helpful 
for debugging. So maybe we can rename the partition field to sth. like sequence?


> Cleanup StreamPartitionAssignor behavior
> ----------------------------------------
>
>                 Key: KAFKA-4117
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4117
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>              Labels: architecture
>
> I went through the whole assignment logic once again and I feel the logic has 
> now becomes a bit lossy, and I want to clean them up probably in another PR 
> but just dump my thoughts here on the appropriate logic:
> Some background:
> 1. Each {{KafkaStreams}} instance contains a clientId, and if not specified 
> default value is applicationId-1/2/etc if there are multiple instances inside 
> the same JVM. One instance contains multiple threads where the 
> thread-clientId is constructed as clientId-StreamThread-1/2/etc, and the 
> thread-clientId is used as the embedded consumer clientId as well as metrics 
> tag.
> 2. However, since one instance can contain multiple threads, and hence 
> multiple consumers, and when considering partition assignment, the streams 
> library need to take the capacity into consideration based on the granularity 
> of instance not on threads. Therefore we create a 4byte {{UUID.randomUUID()}} 
> as the processId and encode that in the subscription metadata bytes, and the 
> leader then knows if multiple consumer members are actually belong to the 
> same instance (i.e. belong to threads of that instance), so that when 
> assigning partitions it can balance among instances. NOTE that in production 
> we recommend one thread per instance, so consumersByClient will only have one 
> consumer per client (i.e. instance).
> 3. In addition, historically we hard-code the partition grouper logic, where 
> for each task, it is assigned only with one partition of its subscribed 
> topic. For example, if we have topicA with 5 partitions and topicB with 10 
> partitions, we will create 10 tasks, with the first five tasks containing one 
> of the partitions each, while the last five tasks contain only one partition 
> from topicB. And therefore the TaskId class contains the groupId of the 
> sub-topology and the partition, so that taskId(group, 1) gets partition1 of 
> topicA and partition1 of topicB. We later expose this to users to customize 
> so that more than one partitions of the topic can be assigned to the same 
> task, so that the partition field in the TaskId no longer indicate anything 
> about which partitions are assigned, and we add {{AssignedPartitions}} to 
> capture which partitions are assigned to which tasks.
> 4. While doing the assignment, the leader is also responsible for creating 
> these changelog / repartition topics, and the number of partitions of these 
> topics are equal to the number of tasks that needs to write to these topics, 
> which are wrapped in {{stateChangelogTopicToTaskIds}} and 
> {{internalSourceTopicToTaskIds}} respectively. After such topics are created, 
> the leader also needs to "augment" the received cluster metadata with these 
> topics to 1) check for copartitioning, and 2) maintained for QueryableState's 
> discovery function.
> The current implementation is mixed with all these legacy logic and gets 
> quite messy, and I'm thinking to make a pass over the StreamPartitionAssignor 
> and cleaning up it bit. More precisely:
> 1. Read and parse the subscription information to construct the 
> clientMetadata map, where each metadata contains the {{Set<String> 
> consumerMemberIds}}, {{ClientState<TaskId> state}}, and {{HostInfo hostInfo}}.
> 2. Access the (sub-)topology to create the corresponding changelog / 
> repartition topics and construct the {{stateChangelogTopicToTaskIds}} and 
> {{internalSourceTopicToTaskIds}}.
> 3. Call {{streamThread.partitionGrouper.partitionGroups}} to get the map from 
> created tasks to their assigned partitions.
> 4. Call {{TaskAssignor.assign}} (which now takes the whole clientMetadata 
> map) to assign tasks to clients, and hence we get the assigned partitions to 
> clients.
> 5. For each client, use some round-robin manner (as we did now) to assign 
> tasks to their hosted consumers with the {{clientMetadata.consumerMemberIds}} 
> map.
> 6. Check co-partitioning of assigned partitions, and maintain the {{Cluster}} 
> metadata locally on the leader.
> 7. Construct the assignment info, where activeTasks is also a map from 
> {{TaskId}} to list of {{TopicPartitions}} since otherwise we will not know 
> which partitions are assigned to which tasks.
> 8. For non-leaders, when getting the assignment, also construct the Cluster 
> metadata from the decoded assignment information; and also maintain the 
> AssignmentInfo locally for constructing the tasks.
> And some minor improvements:
> 1. The default {{thread-clientIds applicationId-x-StreamThread-y}} may still 
> be conflicting to each other with multiple JVMs / machines, which is bad for 
> metrics collection / debugging across hosts. We can modify the default 
> clientId to {{applicationId-processId}} whereprocessId is the generated UUID, 
> hence the default thread-clientId is {{applicationId-UUID-StreamThread-y}}.
> 2. The {{TaskId.partition}} field no longer indicate which partitions are 
> actually assigned to this task, but we still need to keep its topicGroupId 
> field as it indicates which sub-topology this task belongs to, hence helpful 
> for debugging. So maybe we can rename the partition field to sth. like 
> sequence?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to