Guozhang Wang created KAFKA-4117:
------------------------------------
Summary: Cleanup StreamPartitionAssignor behavior
Key: KAFKA-4117
URL: https://issues.apache.org/jira/browse/KAFKA-4117
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Guozhang Wang
I went through the whole assignment logic once again and I feel the logic has
now becomes a bit lossy, and I want to clean them up probably in another PR but
just dump my thoughts here on the appropriate logic (also cc @enothereska
@mjsax):
Some background:
1. Each KafkaStreams instance contains a clientId, and if not specified default
value is applicationId-1/2/etc if there are multiple instances inside the same
JVM. One instance contains multiple threads where the thread-clientId is
constructed as clientId-StreamThread-1/2/etc, and the thread-clientId is used
as the embedded consumer clientId as well as metrics tag.
2. However, since one instance can contain multiple threads, and hence multiple
consumers, and when considering partition assignment, the streams library need
to take the capacity into consideration based on the granularity of instance
not on threads. Therefore we create a 4byte UUID.randomUUID() as the processId
and encode that in the subscription metadata bytes, and the leader then knows
if multiple consumer members are actually belong to the same instance (i.e.
belong to threads of that instance), so that when assigning partitions it can
balance among instances. NOTE that in production we recommend one thread per
instance, so consumersByClient will only have one consumer per client (i.e.
instance).
3. In addition, historically we hard-code the partition grouper logic, where
for each task, it is assigned only with one partition of its subscribed topic.
For example, if we have topicA with 5 partitions and topicB with 10 partitions,
we will create 10 tasks, with the first five tasks containing one of the
partitions each, while the last five tasks contain only one partition from
topicB. And therefore the TaskId class contains the groupId of the sub-topology
and the partition, so that taskId(group, 1) gets partition1 of topicA and
partition1 of topicB. We later expose this to users to customize so that more
than one partitions of the topic can be assigned to the same task, so that the
partition field in the TaskId no longer indicate anything about which
partitions are assigned, and we add AssignedPartitions to capture which
partitions are assigned to which tasks.
4. While doing the assignment, the leader is also responsible for creating
these changelog / repartition topics, and the number of partitions of these
topics are equal to the number of tasks that needs to write to these topics,
which are wrapped in stateChangelogTopicToTaskIds and
internalSourceTopicToTaskIds respectively. After such topics are created, the
leader also needs to "augment" the received cluster metadata with these topics
to 1) check for copartitioning, and 2) maintained for QueryableState's
discovery function.
The current implementation is mixed with all these legacy logic and gets quite
messy, and I'm thinking to make a pass over the StreamPartitionAssignor and
cleaning up it bit. More precisely:
1. Read and parse the subscription information to construct the clientMetadata
map, where each metadata contains the Set<String> consumerMemberIds,
ClientState<TaskId> state, and HostInfo hostInfo.
2. Access the (sub-)topology to create the corresponding changelog /
repartition topics and construct the stateChangelogTopicToTaskIds and
internalSourceTopicToTaskIds.
Call streamThread.partitionGrouper.partitionGroups to get the map from created
tasks to their assigned partitions.
3. Call TaskAssignor.assign (which now takes the whole clientMetadata map) to
assign tasks to clients, and hence we get the assigned partitions to clients.
4. For each client, use some round-robin manner (as we did now) to assign tasks
to their hosted consumers with the clientMetadata.consumerMemberIds map.
5. Check co-partitioning of assigned partitions, and maintain the Cluster
metadata locally on the leader.
6. Construct the assignment info, where activeTasks is also a map from TaskId
to list of TopicPartitions since otherwise we will not know which partitions
are assigned to which tasks.
7. For non-leaders, when getting the assignment, also construct the Cluster
metadata from the decoded assignment information; and also maintain the
AssignmentInfo locally for constructing the tasks.
And some minor improvements:
1. The default thread-clientIds applicationId-x-StreamThread-y" may still be
conflicting to each other with multiple JVMs / machines, which is bad for
metrics collection / debugging across hosts. We can modify the default clientId
toapplicationId-processIdwhereprocessIdisUUID, hence the default
thread-clientId isapplicationId-UUID-StreamThread-y`.
2. The TaskId.partition field no longer indicate which partitions are actually
assigned to this task, but we still need to keep its topicGroupId field as it
indicates which sub-topology this task belongs to, hence helpful for debugging.
So maybe we can rename the partition field to sth. like sequence?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)