Zakelly Lan created FLINK-29437: ----------------------------------- Summary: The partition of data before and after the Kafka Shuffle are not aligned Key: FLINK-29437 URL: https://issues.apache.org/jira/browse/FLINK-29437 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.15.2 Reporter: Zakelly Lan Fix For: 1.16.0 Attachments: image-2022-09-28-14-32-28-116.png, image-2022-09-28-14-35-47-954.png
I notice that the key group range in consumer side of Kafka Shuffle is not aligned with the producer side, there are two problems: # The data partitioning of the sink(producer) is exactly the same way as a keyed stream that as the same maximum parallelism as the number of kafka partitions does, but in consumer side the number of partitions and key groups are not the same. # There is a distribution of assigning kafka partitions to consumer subtasks (See KafkaTopicPartitionAssigner#assign), but the producer of Kafka Shuffle simply assume the partition index equals the subtask index. e.g. !image-2022-09-28-14-32-28-116.png|width=1133,height=274! My proposed change: # Set the max parallelism of the key stream in consumer size as the number of kafka partitions. # Use the same method when assigning kafka partitions to consumer subtasks to maintain a map from subtasks to kafka partitions, which is used by the producer to insert into the right partition for data for a subtask. i.e. !image-2022-09-28-14-35-47-954.png|width=1030,height=283! -- This message was sent by Atlassian Jira (v8.20.10#820010)