[ 
https://issues.apache.org/jira/browse/KAFKA-4117?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4117:
---------------------------------
       Resolution: Fixed
    Fix Version/s: 0.10.2.0
           Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2012
[https://github.com/apache/kafka/pull/2012]

> Cleanup StreamPartitionAssignor behavior
> ----------------------------------------
>
>                 Key: KAFKA-4117
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4117
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>              Labels: architecture
>             Fix For: 0.10.2.0
>
>
> I went through the whole assignment logic once again and I feel the logic has 
> now becomes a bit lossy, and I want to clean them up probably in another PR 
> but just dump my thoughts here on the appropriate logic:
> Some background:
> 1. Each {{KafkaStreams}} instance contains a clientId, and if not specified 
> default value is applicationId-1/2/etc if there are multiple instances inside 
> the same JVM. One instance contains multiple threads where the 
> thread-clientId is constructed as clientId-StreamThread-1/2/etc, and the 
> thread-clientId is used as the embedded consumer clientId as well as metrics 
> tag.
> 2. However, since one instance can contain multiple threads, and hence 
> multiple consumers, and when considering partition assignment, the streams 
> library need to take the capacity into consideration based on the granularity 
> of instance not on threads. Therefore we create a 4byte {{UUID.randomUUID()}} 
> as the processId and encode that in the subscription metadata bytes, and the 
> leader then knows if multiple consumer members are actually belong to the 
> same instance (i.e. belong to threads of that instance), so that when 
> assigning partitions it can balance among instances. NOTE that in production 
> we recommend one thread per instance, so consumersByClient will only have one 
> consumer per client (i.e. instance).
> 3. In addition, historically we hard-code the partition grouper logic, where 
> for each task, it is assigned only with one partition of its subscribed 
> topic. For example, if we have topicA with 5 partitions and topicB with 10 
> partitions, we will create 10 tasks, with the first five tasks containing one 
> of the partitions each, while the last five tasks contain only one partition 
> from topicB. And therefore the TaskId class contains the groupId of the 
> sub-topology and the partition, so that taskId(group, 1) gets partition1 of 
> topicA and partition1 of topicB. We later expose this to users to customize 
> so that more than one partitions of the topic can be assigned to the same 
> task, so that the partition field in the TaskId no longer indicate anything 
> about which partitions are assigned, and we add {{AssignedPartitions}} to 
> capture which partitions are assigned to which tasks.
> 4. While doing the assignment, the leader is also responsible for creating 
> these changelog / repartition topics, and the number of partitions of these 
> topics are equal to the number of tasks that needs to write to these topics, 
> which are wrapped in {{stateChangelogTopicToTaskIds}} and 
> {{internalSourceTopicToTaskIds}} respectively. After such topics are created, 
> the leader also needs to "augment" the received cluster metadata with these 
> topics to 1) check for copartitioning, and 2) maintained for QueryableState's 
> discovery function.
> The current implementation is mixed with all these legacy logic and gets 
> quite messy, and I'm thinking to make a pass over the StreamPartitionAssignor 
> and cleaning up it bit. More precisely:
> 1. Read and parse the subscription information to construct the 
> clientMetadata map, where each metadata contains the {{Set<String> 
> consumerMemberIds}}, {{ClientState<TaskId> state}}, and {{HostInfo hostInfo}}.
> 2. Access the (sub-)topology to create the corresponding changelog / 
> repartition topics and construct the {{stateChangelogTopicToTaskIds}} and 
> {{internalSourceTopicToTaskIds}}.
> 3. Call {{streamThread.partitionGrouper.partitionGroups}} to get the map from 
> created tasks to their assigned partitions.
> 4. Call {{TaskAssignor.assign}} (which now takes the whole clientMetadata 
> map) to assign tasks to clients, and hence we get the assigned partitions to 
> clients.
> 5. For each client, use some round-robin manner (as we did now) to assign 
> tasks to their hosted consumers with the {{clientMetadata.consumerMemberIds}} 
> map.
> 6. Check co-partitioning of assigned partitions, and maintain the {{Cluster}} 
> metadata locally on the leader.
> 7. Construct the assignment info, where activeTasks is also a map from 
> {{TaskId}} to list of {{TopicPartitions}} since otherwise we will not know 
> which partitions are assigned to which tasks.
> 8. For non-leaders, when getting the assignment, also construct the Cluster 
> metadata from the decoded assignment information; and also maintain the 
> AssignmentInfo locally for constructing the tasks.
> And some minor improvements:
> 1. The default {{thread-clientIds applicationId-x-StreamThread-y}} may still 
> be conflicting to each other with multiple JVMs / machines, which is bad for 
> metrics collection / debugging across hosts. We can modify the default 
> clientId to {{applicationId-processId}} whereprocessId is the generated UUID, 
> hence the default thread-clientId is {{applicationId-UUID-StreamThread-y}}.
> 2. The {{TaskId.partition}} field no longer indicate which partitions are 
> actually assigned to this task, but we still need to keep its topicGroupId 
> field as it indicates which sub-topology this task belongs to, hence helpful 
> for debugging. So maybe we can rename the partition field to sth. like 
> sequence?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to