[
https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18050030#comment-18050030
]
RocMarshal commented on FLINK-33123:
------------------------------------
Happy New Year!
[~dmvk] [~mxm] [~Zhanghao Chen]
Based on the existing analysis and discussions(Thanks) in the JIRA,
I have redesigned an alternative fix proposal. Hopes a few of insights.
Could you please take a look if you had the free time?
Thank you~
CC [~fanrui] [~gyfora]
> Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for
> autoscaler and adaptive scheduler
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-33123
> URL: https://issues.apache.org/jira/browse/FLINK-33123
> Project: Flink
> Issue Type: Bug
> Components: Autoscaler, Runtime / Coordination
> Affects Versions: 1.17.0, 1.18.0
> Reporter: Zhanghao Chen
> Assignee: David Morávek
> Priority: Critical
> Attachments: image-2023-09-20-15-09-22-733.png,
> image-2023-09-20-15-14-04-679.png
>
>
> *Background*
> https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is
> wrong when the parallelism is changed for a vertex with a FORWARD edge, which
> is used by both the autoscaler and adaptive scheduler where one can change
> the vertex parallelism dynamically. Fix is applied to dynamically replace
> partitioner from FORWARD to REBLANCE on task deployment in
> {{{}StreamTask{}}}:
>
> !image-2023-09-20-15-09-22-733.png|width=560,height=221!
> *Problem*
> Unfortunately, the fix is still buggy in two aspects:
> # The connections between upstream and downstream tasks are determined by
> the distribution type of the partitioner when generating execution graph on
> the JM side. When the edge is FORWARD, the distribution type is POINTWISE,
> and Flink will try to evenly distribute subpartitions to all downstream
> tasks. If one want to change it to REBALANCE, the distribution type has to be
> changed to ALL_TO_ALL to make all-to-all connections between upstream and
> downstream tasks. However, the fix did not change the distribution type which
> makes the network connections be set up in a wrong way.
> # The FOWARD partitioner will be replaced if
> environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the
> task parallelism. However, the number of subpartitions here equals to the
> number of downstream tasks of this particular task, which is also determined
> by the distribution type of the partitioner when generating execution graph
> on the JM side. When ceil(downstream task parallelism / upstream task
> parallelism) = upstream task parallelism, we will have the number of
> subpartitions = task parallelism. For example, for a topology A (parallelism
> 2) -> B (parallelism 5), we will have 1 A task having 2 subpartitions, 1 A
> task having 3 subpartition, and hence 1 task will have its number of
> subpartitions equals to the task parallelism 2 and skip partitioner
> replacement. As a result, that task will only send data to only one
> downstream task as the FORWARD partitioner always send data to the first
> subpartition. In fact, for a normal job with a FORWARD edge without any
> autoscaling action, you will find that the partitioner is changed to
> REBALANCE internally as the number of subpartitions always equals to 1 in
> this case.
> !image-2023-09-20-15-14-04-679.png|width=892,height=301!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)