[ 
https://issues.apache.org/jira/browse/FLINK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhilong Hong updated FLINK-21110:
---------------------------------
    Description: 
According to the result of scheduler benchmarks we implemented in FLINK-20612, 
the bottleneck of deploying and running a large-scale job in Flink is mainly 
focused on the following procedures:
|Procedure|Time complexity|
|Initializing ExecutionGraph|O(N^2)|
|Building DefaultExecutionTopology|O(N^2)|
|Initializing PipelinedRegionSchedulingStrategy|O(N^2)|
|Scheduling downstream tasks when a task finishes|O(N^2)|
|Calculating tasks to restart when a failover occurs|O(N^2)|
|Releasing result partitions|O(N^3)|

These procedures are all related to the complexity of the topology in the 
ExecutionGraph. Between two vertices connected with the all-to-all edges, all 
the upstream Intermediate ResultPartitions are connected to all downstream 
ExecutionVertices. The computation complexity of building and traversing all 
these edges will be O(N^2). 

As for memory usage, currently we use ExecutionEdges to store the information 
of connections. For the all-to-all distribution type, there are O(N^2) 
ExecutionEdges. We test a simple job with only two vertices. The parallelisms 
of them are both 10k. Furthermore, they are connected with all-to-all edges. It 
takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.

In most large-scale jobs, there will be more than two vertices with large 
parallelisms, and they would cost a lot of time and memory to deploy the job.

As we can see, for two JobVertices connected with the all-to-all distribution 
type, all IntermediateResultPartitions produced by the upstream 
ExecutionVertices are isomorphic, which means that the downstream 
ExecutionVertices they connected are exactly the same. The downstream 
ExecutionVertices belonging to the same JobVertex are also isomorphic, as the 
upstream ResultPartitions they connect are the same, too.

Since every JobEdge has exactly one distribution type, we can divide the 
vertices and result partitions into groups according to the distribution type 
of the JobEdge. 

For the all-to-all distribution type, since all downstream vertices are 
isomorphic, they belong to a single group, and all the upstream result 
partitions are connected to this group. Vice versa, all the upstream result 
partitions also belong to a single group, and all the downstream vertices are 
connected to this group. In the past, when we wanted to iterate all the 
downstream vertices, we needed to loop over them n times, which leads to the 
complexity of O(N^2). Now since all upstream result partitions are connected to 
one downstream group, we just need to loop over them once, with the complexity 
of O(N).

For the pointwise distribution type, because each result partition is connected 
to different downstream vertices, they should belong to different groups. Vice 
versa, all the vertices belong to different groups. Since one result partition 
group is connected to one vertex group pointwisely, the computation complexity 
of looping over them is still O(N).

!Illustration of Group.jpg|height=249!

After we group the result partitions and vertices, ExecutionEdge is no longer 
needed. For the test job we mentioned above, the optimization can effectively 
reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) in 
our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds (with 
10k parallelism).

 

The detailed design doc: 
https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing

 

  was:
According to the result of scheduler benchmarks we implemented in FLINK-20612, 
the bottleneck of deploying and running a large-scale job in Flink is mainly 
focused on the following procedures:
|Procedure|Time complexity|
|Initializing ExecutionGraph|O(N^2)|
|Building DefaultExecutionTopology|O(N^2)|
|Initializing PipelinedRegionSchedulingStrategy|O(N^2)|
|Scheduling downstream tasks when a task finishes|O(N^2)|
|Calculating tasks to restart when a failover occurs|O(N^2)|
|Releasing result partitions|O(N^3)|

These procedures are all related to the complexity of the topology in the 
ExecutionGraph. Between two vertices connected with the all-to-all edges, all 
the upstream Intermediate ResultPartitions are connected to all downstream 
ExecutionVertices. The computation complexity of building and traversing all 
these edges will be O(N^2). 

As for memory usage, currently we use ExecutionEdges to store the information 
of connections. For the all-to-all distribution type, there are O(N^2) 
ExecutionEdges. We test a simple job with only two vertices. The parallelisms 
of them are both 10k. Furthermore, they are connected with all-to-all edges. It 
takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.

In most large-scale jobs, there will be more than two vertices with large 
parallelisms, and they would cost a lot of time and memory to deploy the job.

As we can see, for two JobVertices connected with the all-to-all distribution 
type, all IntermediateResultPartitions produced by the upstream 
ExecutionVertices are isomorphic, which means that the downstream 
ExecutionVertices they connected are exactly the same. The downstream 
ExecutionVertices belonging to the same JobVertex are also isomorphic, as the 
upstream ResultPartitions they connect are the same, too.

Since every JobEdge has exactly one distribution type, we can divide the 
vertices and result partitions into groups according to the distribution type 
of the JobEdge. 

For the all-to-all distribution type, since all downstream vertices are 
isomorphic, they belong to a single group, and all the upstream result 
partitions are connected to this group. Vice versa, all the upstream result 
partitions also belong to a single group, and all the downstream vertices are 
connected to this group. In the past, when we wanted to iterate all the 
downstream vertices, we needed to loop over them n times, which leads to the 
complexity of O(N^2). Now since all upstream result partitions are connected to 
one downstream group, we just need to loop over them once, with the complexity 
of O(N).

For the pointwise distribution type, because each result partition is connected 
to different downstream vertices, they should belong to different groups. Vice 
versa, all the vertices belong to different groups. Since one result partition 
group is connected to one vertex group pointwisely, the computation complexity 
of looping over them is still O(N).

!Illustration of Group.jpg|height=249!

After we group the result partitions and vertices, ExecutionEdge is no longer 
needed. For the test job we mentioned above, the optimization can effectively 
reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) in 
our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds (with 
10k parallelism).

The detailed design doc is still in progress and will be attached once finished.

 


> Optimize Scheduler Performance for Large-Scale Jobs
> ---------------------------------------------------
>
>                 Key: FLINK-21110
>                 URL: https://issues.apache.org/jira/browse/FLINK-21110
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: Zhilong Hong
>            Priority: Major
>             Fix For: 1.13.0
>
>         Attachments: Illustration of Group.jpg
>
>
> According to the result of scheduler benchmarks we implemented in 
> FLINK-20612, the bottleneck of deploying and running a large-scale job in 
> Flink is mainly focused on the following procedures:
> |Procedure|Time complexity|
> |Initializing ExecutionGraph|O(N^2)|
> |Building DefaultExecutionTopology|O(N^2)|
> |Initializing PipelinedRegionSchedulingStrategy|O(N^2)|
> |Scheduling downstream tasks when a task finishes|O(N^2)|
> |Calculating tasks to restart when a failover occurs|O(N^2)|
> |Releasing result partitions|O(N^3)|
> These procedures are all related to the complexity of the topology in the 
> ExecutionGraph. Between two vertices connected with the all-to-all edges, all 
> the upstream Intermediate ResultPartitions are connected to all downstream 
> ExecutionVertices. The computation complexity of building and traversing all 
> these edges will be O(N^2). 
> As for memory usage, currently we use ExecutionEdges to store the information 
> of connections. For the all-to-all distribution type, there are O(N^2) 
> ExecutionEdges. We test a simple job with only two vertices. The parallelisms 
> of them are both 10k. Furthermore, they are connected with all-to-all edges. 
> It takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.
> In most large-scale jobs, there will be more than two vertices with large 
> parallelisms, and they would cost a lot of time and memory to deploy the job.
> As we can see, for two JobVertices connected with the all-to-all distribution 
> type, all IntermediateResultPartitions produced by the upstream 
> ExecutionVertices are isomorphic, which means that the downstream 
> ExecutionVertices they connected are exactly the same. The downstream 
> ExecutionVertices belonging to the same JobVertex are also isomorphic, as the 
> upstream ResultPartitions they connect are the same, too.
> Since every JobEdge has exactly one distribution type, we can divide the 
> vertices and result partitions into groups according to the distribution type 
> of the JobEdge. 
> For the all-to-all distribution type, since all downstream vertices are 
> isomorphic, they belong to a single group, and all the upstream result 
> partitions are connected to this group. Vice versa, all the upstream result 
> partitions also belong to a single group, and all the downstream vertices are 
> connected to this group. In the past, when we wanted to iterate all the 
> downstream vertices, we needed to loop over them n times, which leads to the 
> complexity of O(N^2). Now since all upstream result partitions are connected 
> to one downstream group, we just need to loop over them once, with the 
> complexity of O(N).
> For the pointwise distribution type, because each result partition is 
> connected to different downstream vertices, they should belong to different 
> groups. Vice versa, all the vertices belong to different groups. Since one 
> result partition group is connected to one vertex group pointwisely, the 
> computation complexity of looping over them is still O(N).
> !Illustration of Group.jpg|height=249!
> After we group the result partitions and vertices, ExecutionEdge is no longer 
> needed. For the test job we mentioned above, the optimization can effectively 
> reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) 
> in our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds 
> (with 10k parallelism).
>  
> The detailed design doc: 
> https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to