[ 
https://issues.apache.org/jira/browse/FLINK-31762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856514#comment-17856514
 ] 

Lorenzo Affetti edited comment on FLINK-31762 at 6/20/24 2:44 PM:
------------------------------------------------------------------

Thank you [~jingge] !

 

This would be my first contribution to the Kafka connector and I am currently 
digging in the codebase to frame the problem, bear with me :)

I confirm that the logic above applies to the V2 APIs in the `getSplitOwner` 
method of SourceEnumerator .

 

For example, with two topics with 2 partitions and parallelism 12 I get:

 
16:33:44,934 INFO 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers
{code:java}
{
  0=[[Partition: test-topic-1-0, StartingOffset:-2, 
StoppingOffset:-9223372036854775808], [Partition: test-topic-2-1, 
StartingOffset:-2, StoppingOffset:-9223372036854775808]],
  1=[[Partition: test-topic-1-1, StartingOffset:-2, 
StoppingOffset:-9223372036854775808]],
  3=[[Partition: test-topic-2-0, StartingOffset:-2, 
StoppingOffset:-9223372036854775808]]
}{code}
 
As stated, the assignment is suboptimal as task 2 would be idle.
 
I still need to verify where `KafkaTopicPartitionAssigner` code gets used to 
avoid inconsistencies.
Or, at least, is this part of this issue or only the KafkaSource matters?
 
Also, [~tzulitai] changing the logic to evenly distribute the partitions would 
probably lead to a non-deterministic assignment (especially conjuncted with 
partition discovery) would this cause any issue in your opinion? I see you 
mentioned above about state restores, can we expand on that?
 
For example, in the `KafkaTopicPartitionAssigner` we can read as javadoc for 
the `assign` method:
{code:java}
/**
* Returns the index of the target subtask that a specific Kafka partition 
should be assigned
* to.
*
* <p>The resulting distribution of partitions of a single topic has the 
following contract:
*
* <ul>
* <li>1. Uniformly distributed across subtasks
* <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. 
ascending subtask
* indices) by using the partition id as the offset from a starting index (i.e., 
the index
* of the subtask which partition 0 of the topic will be assigned to, determined 
using the
* topic name).
* </ul>
*
* <p>The above contract is crucial and cannot be broken. Consumer subtasks rely 
on this
* contract to locally filter out partitions that it should not subscribe to, 
guaranteeing that
* all partitions of a single topic will always be assigned to some subtask in a 
uniformly
* distributed manner.
*
* @param partition the Kafka partition
* @param numParallelSubtasks total number of parallel subtasks
* @return index of the target subtask that the Kafka partition should be 
assigned to.
*/{code}
Or this is something not be concerned about as this logic is not used in 
KafkaSource?

Thank you!


was (Author: JIRAUSER304233):
Thank you [~jingge] !

 

This would be my first contribution to the Kafka connector and I am currently 
digging in the codebase to frame the problem, bear with me :)

I confirm that the logic above applies to the V2 APIs in the `getSplitOwner` 
method of SourceEnumerator .

 

For example, with two topics with 2 partitions and parallelism 12 I get:

 
16:33:44,934 INFO 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator [] - 
Assigning splits to readers {
  0=[[Partition: test-topic-1-0, StartingOffset:-2, 
StoppingOffset:-9223372036854775808], [Partition: test-topic-2-1, 
StartingOffset:-2, StoppingOffset:-9223372036854775808]],
 1=[[Partition: test-topic-1-1, StartingOffset:-2, 
StoppingOffset:-9223372036854775808]],
 3=[[Partition: test-topic-2-0, StartingOffset:-2, 
StoppingOffset:-9223372036854775808]]
}
 
As stated, the assignment is suboptimal as task 2 would be idle.
 
I still need to verify where `KafkaTopicPartitionAssigner` code gets used to 
avoid inconsistencies.
Or, at least, is this part of this issue or only the KafkaSource matters?
 
Also, [~tzulitai] changing the logic to evenly distribute the partitions would 
probably lead to a non-deterministic assignment (especially conjuncted with 
partition discovery) would this cause any issue in your opinion? I see you 
mentioned above about state restores, can we expand on that?
 
For example, in the `KafkaTopicPartitionAssigner` we can read as javadoc for 
the `assign` method:
{code:java}
/**
* Returns the index of the target subtask that a specific Kafka partition 
should be assigned
* to.
*
* <p>The resulting distribution of partitions of a single topic has the 
following contract:
*
* <ul>
* <li>1. Uniformly distributed across subtasks
* <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. 
ascending subtask
* indices) by using the partition id as the offset from a starting index (i.e., 
the index
* of the subtask which partition 0 of the topic will be assigned to, determined 
using the
* topic name).
* </ul>
*
* <p>The above contract is crucial and cannot be broken. Consumer subtasks rely 
on this
* contract to locally filter out partitions that it should not subscribe to, 
guaranteeing that
* all partitions of a single topic will always be assigned to some subtask in a 
uniformly
* distributed manner.
*
* @param partition the Kafka partition
* @param numParallelSubtasks total number of parallel subtasks
* @return index of the target subtask that the Kafka partition should be 
assigned to.
*/{code}
Or this is something not be concerned about as this logic is not used in 
KafkaSource?

Thank you!

> Subscribe to multiple Kafka topics may cause partition assignment skew
> ----------------------------------------------------------------------
>
>                 Key: FLINK-31762
>                 URL: https://issues.apache.org/jira/browse/FLINK-31762
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.0, 1.18.0
>            Reporter: Liam
>            Assignee: Lorenzo Affetti
>            Priority: Major
>         Attachments: image-2023-04-11-08-00-16-054.png, 
> image-2023-04-11-08-12-24-115.png
>
>
> To simplify the demonstration, let us assume that there are two topics, and 
> each topic has four partitions. We have set the parallelism to eight to 
> consume these two topics. However, the current partition assignment method 
> may lead to some subtasks being assigned two partitions while others are left 
> with none.
> !image-2023-04-11-08-00-16-054.png|width=500,height=143!
> In my case, the situation is even worse as I have ten topics, each with 100 
> partitions. If I set the parallelism to 1000, some slots may be assigned 
> seven partitions while others remain unassigned.
> To address this issue, I propose a new partition assignment solution. In this 
> approach, round-robin assignment takes place between all topics, not just one.
> For example, the ideal assignment for the case mentioned above is presented 
> below:
>  
> !https://imgr.whimsical.com/object/A4jSJwgQNrc5mgpGddhghq|width=513,height=134!
> This new solution can also handle cases where each topic has more partitions.
> !image-2023-04-11-08-12-24-115.png|width=444,height=127!
> Let us work together to reach a consensus on this proposal. Thank you!
>  
> FYI: how the partition be assigned currently
> {code:java}
> public class KafkaTopicPartitionAssigner {    
>     public static int assign(KafkaTopicPartition partition, int 
> numParallelSubtasks) {
>         return assign(partition.getTopic(), partition.getPartition(), 
> numParallelSubtasks);
>     }    public static int assign(String topic, int partition, int 
> numParallelSubtasks) {
>         int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % 
> numParallelSubtasks;        // here, the assumption is that the id of Kafka 
> partitions are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + partition) % numParallelSubtasks;
>     }
>  {code}
> for Kafka Source, it's implemented in the KafkaSourceEnumerator as below
> {code:java}
>     static int getSplitOwner(TopicPartition tp, int numReaders) {
>         int startIndex = ((tp.topic().hashCode() * 31) & 0x7FFFFFFF) % 
> numReaders;        // here, the assumption is that the id of Kafka partitions 
> are always ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + tp.partition()) % numReaders;
>     } {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to