[
https://issues.apache.org/jira/browse/FLINK-21331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-21331:
-----------------------------------
Labels: pull-request-available (was: )
> Optimize calculating tasks to restart in
> RestartPipelinedRegionFailoverStrategy
> -------------------------------------------------------------------------------
>
> Key: FLINK-21331
> URL: https://issues.apache.org/jira/browse/FLINK-21331
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Zhilong Hong
> Assignee: Zhilong Hong
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.13.0
>
>
> RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to
> restart when a task failure occurs. It contains two parts: firstly calculate
> the regions to restart; then add all the tasks in these regions to the
> restarting queue.
> The bottleneck is mainly in the first part. This part traverses all the
> upstream and downstream regions of the failed region to determine whether
> they should be restarted or not.
> For the current failed region, if its consumed result partition is not
> available, the owner, i.e., the upstream region should restart. Also, since
> the failed region needs to restart, its result partition won't be available,
> all the downstream regions need to restart, too.
> 1. Calculating the upstream regions that should restart
> The current implementation is:
> {code:java}
> for each SchedulingExecutionVertex in current visited
> SchedulingPipelinedRegion:
> for each consumed SchedulingResultPartition of the
> SchedulingExecutionVertex:
> if the result partition is not available:
> add the producer region to the restart queue
> {code}
> Based on FLINK-21328, the consumed result partition of a vertex is already
> grouped. Here we can use a HashSet to record the visited result partition
> group. For vertices connected with all-to-all edges, they will only need to
> traverse the group once. This decreases the time complexity from O(N^2) to
> O(N).
> 2. Calculating the downstream regions that should restart
> The current implementation is:
> {code:java}
> for each SchedulingExecutionVertex in current visited
> SchedulingPipelinedRegion:
> for each produced SchedulingResultPartition of the
> SchedulingExecutionVertex:
> for each consumer SchedulingExecutionVertex of the produced
> SchedulingResultPartition:
> if the region containing the consumer SchedulingExecutionVertex is not
> visited:
> add the region to the restart queue
> {code}
> Since the count of the produced result partitions of a vertex equals the
> count of output JobEdges, the time complexity of this procedure is actually
> O(N^2). As the consumer vertices of a result partition are already grouped,
> we can use a HashSet to record the visited ConsumerVertexGroup. The time
> complexity decreases from O(N^2) to O(N).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)