Zakelly Lan created FLINK-29437:
-----------------------------------

             Summary: The partition of data before and after the Kafka Shuffle 
are not aligned
                 Key: FLINK-29437
                 URL: https://issues.apache.org/jira/browse/FLINK-29437
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.15.2
            Reporter: Zakelly Lan
             Fix For: 1.16.0
         Attachments: image-2022-09-28-14-32-28-116.png, 
image-2022-09-28-14-35-47-954.png

I notice that the key group range in consumer side of Kafka Shuffle is not 
aligned with the producer side, there are two problems:
 # The data partitioning of the sink(producer) is exactly the same way as a 
keyed stream that as the same maximum parallelism as the number of kafka 
partitions does, but in consumer side the number of partitions and key groups 
are not the same.
 # There is a distribution of assigning kafka partitions to consumer subtasks 
(See KafkaTopicPartitionAssigner#assign), but the producer of Kafka Shuffle 
simply assume the partition index equals the subtask index. e.g. 
!image-2022-09-28-14-32-28-116.png|width=1133,height=274!

My proposed change:
 # Set the max parallelism of the key stream in consumer size as the number of 
kafka partitions. 
 # Use the same method when assigning kafka partitions to consumer subtasks to 
maintain a map from subtasks to kafka partitions, which is used by the producer 
to insert into the right partition for data for a subtask. i.e. 
!image-2022-09-28-14-35-47-954.png|width=1030,height=283!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to