[ https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856514#comment-17856514 ]
Lorenzo Affetti edited comment on FLINK-31762 at 6/20/24 2:44 PM: ------------------------------------------------------------------ Thank you [~jingge] ! This would be my first contribution to the Kafka connector and I am currently digging in the codebase to frame the problem, bear with me :) I confirm that the logic above applies to the V2 APIs in the `getSplitOwner` method of SourceEnumerator . For example, with two topics with 2 partitions and parallelism 12 I get: 16:33:44,934 INFO org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Assigning splits to readers {code:java} { 0=[[Partition: test-topic-1-0, StartingOffset:-2, StoppingOffset:-9223372036854775808], [Partition: test-topic-2-1, StartingOffset:-2, StoppingOffset:-9223372036854775808]], 1=[[Partition: test-topic-1-1, StartingOffset:-2, StoppingOffset:-9223372036854775808]], 3=[[Partition: test-topic-2-0, StartingOffset:-2, StoppingOffset:-9223372036854775808]] }{code} As stated, the assignment is suboptimal as task 2 would be idle. I still need to verify where `KafkaTopicPartitionAssigner` code gets used to avoid inconsistencies. Or, at least, is this part of this issue or only the KafkaSource matters? Also, [~tzulitai] changing the logic to evenly distribute the partitions would probably lead to a non-deterministic assignment (especially conjuncted with partition discovery) would this cause any issue in your opinion? I see you mentioned above about state restores, can we expand on that? For example, in the `KafkaTopicPartitionAssigner` we can read as javadoc for the `assign` method: {code:java} /** * Returns the index of the target subtask that a specific Kafka partition should be assigned * to. * * <p>The resulting distribution of partitions of a single topic has the following contract: * * <ul> * <li>1. Uniformly distributed across subtasks * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask * indices) by using the partition id as the offset from a starting index (i.e., the index * of the subtask which partition 0 of the topic will be assigned to, determined using the * topic name). * </ul> * * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this * contract to locally filter out partitions that it should not subscribe to, guaranteeing that * all partitions of a single topic will always be assigned to some subtask in a uniformly * distributed manner. * * @param partition the Kafka partition * @param numParallelSubtasks total number of parallel subtasks * @return index of the target subtask that the Kafka partition should be assigned to. */{code} Or this is something not be concerned about as this logic is not used in KafkaSource? Thank you! was (Author: JIRAUSER304233): Thank you [~jingge] ! This would be my first contribution to the Kafka connector and I am currently digging in the codebase to frame the problem, bear with me :) I confirm that the logic above applies to the V2 APIs in the `getSplitOwner` method of SourceEnumerator . For example, with two topics with 2 partitions and parallelism 12 I get: 16:33:44,934 INFO org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - Assigning splits to readers { 0=[[Partition: test-topic-1-0, StartingOffset:-2, StoppingOffset:-9223372036854775808], [Partition: test-topic-2-1, StartingOffset:-2, StoppingOffset:-9223372036854775808]], 1=[[Partition: test-topic-1-1, StartingOffset:-2, StoppingOffset:-9223372036854775808]], 3=[[Partition: test-topic-2-0, StartingOffset:-2, StoppingOffset:-9223372036854775808]] } As stated, the assignment is suboptimal as task 2 would be idle. I still need to verify where `KafkaTopicPartitionAssigner` code gets used to avoid inconsistencies. Or, at least, is this part of this issue or only the KafkaSource matters? Also, [~tzulitai] changing the logic to evenly distribute the partitions would probably lead to a non-deterministic assignment (especially conjuncted with partition discovery) would this cause any issue in your opinion? I see you mentioned above about state restores, can we expand on that? For example, in the `KafkaTopicPartitionAssigner` we can read as javadoc for the `assign` method: {code:java} /** * Returns the index of the target subtask that a specific Kafka partition should be assigned * to. * * <p>The resulting distribution of partitions of a single topic has the following contract: * * <ul> * <li>1. Uniformly distributed across subtasks * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask * indices) by using the partition id as the offset from a starting index (i.e., the index * of the subtask which partition 0 of the topic will be assigned to, determined using the * topic name). * </ul> * * <p>The above contract is crucial and cannot be broken. Consumer subtasks rely on this * contract to locally filter out partitions that it should not subscribe to, guaranteeing that * all partitions of a single topic will always be assigned to some subtask in a uniformly * distributed manner. * * @param partition the Kafka partition * @param numParallelSubtasks total number of parallel subtasks * @return index of the target subtask that the Kafka partition should be assigned to. */{code} Or this is something not be concerned about as this logic is not used in KafkaSource? Thank you! > Subscribe to multiple Kafka topics may cause partition assignment skew > ---------------------------------------------------------------------- > > Key: FLINK-31762 > URL: https://issues.apache.org/jira/browse/FLINK-31762 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.13.0, 1.18.0 > Reporter: Liam > Assignee: Lorenzo Affetti > Priority: Major > Attachments: image-2023-04-11-08-00-16-054.png, > image-2023-04-11-08-12-24-115.png > > > To simplify the demonstration, let us assume that there are two topics, and > each topic has four partitions. We have set the parallelism to eight to > consume these two topics. However, the current partition assignment method > may lead to some subtasks being assigned two partitions while others are left > with none. > !image-2023-04-11-08-00-16-054.png|width=500,height=143! > In my case, the situation is even worse as I have ten topics, each with 100 > partitions. If I set the parallelism to 1000, some slots may be assigned > seven partitions while others remain unassigned. > To address this issue, I propose a new partition assignment solution. In this > approach, round-robin assignment takes place between all topics, not just one. > For example, the ideal assignment for the case mentioned above is presented > below: > > !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134! > This new solution can also handle cases where each topic has more partitions. > !image-2023-04-11-08-12-24-115.png|width=444,height=127! > Let us work together to reach a consensus on this proposal. Thank you! > > FYI: how the partition be assigned currently > {code:java} > public class KafkaTopicPartitionAssigner { > public static int assign(KafkaTopicPartition partition, int > numParallelSubtasks) { > return assign(partition.getTopic(), partition.getPartition(), > numParallelSubtasks); > } public static int assign(String topic, int partition, int > numParallelSubtasks) { > int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % > numParallelSubtasks; // here, the assumption is that the id of Kafka > partitions are always ascending > // starting from 0, and therefore can be used directly as the offset > clockwise from the > // start index > return (startIndex + partition) % numParallelSubtasks; > } > {code} > for Kafka Source, it's implemented in the KafkaSourceEnumerator as below > {code:java} > static int getSplitOwner(TopicPartition tp, int numReaders) { > int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) % > numReaders; // here, the assumption is that the id of Kafka partitions > are always ascending > // starting from 0, and therefore can be used directly as the offset > clockwise from the > // start index > return (startIndex + tp.partition()) % numReaders; > } {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)