Zhilong Hong created FLINK-21331:
------------------------------------
Summary: Optimize calculating tasks to restart in
RestartPipelinedRegionFailoverStrategy
Key: FLINK-21331
URL: https://issues.apache.org/jira/browse/FLINK-21331
Project: Flink
Issue Type: Sub-task
Components: Runtime / Coordination
Reporter: Zhilong Hong
Fix For: 1.13.0
RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to
restart when a task failure occurs. It contains two parts: firstly calculate
the regions to restart; then add all the tasks in these regions to the
restarting queue.
The bottleneck is mainly in the first part. This part traverses all the
upstream and downstream regions of the failed region to determine whether they
should be restarted or not.
For the current failed region, if its consumed result partition is not
available, the owner, i.e., the upstream region should restart. Also, since the
failed region needs to restart, its result partition won't be available, all
the downstream regions need to restart, too.
1. Calculating the upstream regions that should restart
The current implementation is:
{code:java}
for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion:
for each consumed SchedulingResultPartition of the SchedulingExecutionVertex:
if the result partition is not available:
add the producer region to the restart queue
{code}
Based on FLINK-21328, the consumed result partition of a vertex is already
grouped. Here we can use a HashSet to record the visited result partition
group. For vertices connected with all-to-all edges, they will only need to
traverse the group once. This decreases the time complexity from O(N^2) to O(N).
2. Calculating the downstream regions that should restart
The current implementation is:
{code:java}
for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion:
for each produced SchedulingResultPartition of the SchedulingExecutionVertex:
for each consumer SchedulingExecutionVertex of the produced
SchedulingResultPartition:
if the region containing the consumer SchedulingExecutionVertex is not
visited:
add the region to the restart queue
{code}
Since the count of the produced result partitions of a vertex equals the count
of output JobEdges, the time complexity of this procedure is actually O(N^2).
As the consumer vertices of a result partition are already grouped, we can use
a HashSet to record the visited ConsumerVertexGroup. The time complexity
decreases from O(N^2) to O(N).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)