[ 
https://issues.apache.org/jira/browse/FLINK-21110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu closed FLINK-21110.
---------------------------
    Resolution: Done

> Optimize scheduler performance for large-scale jobs
> ---------------------------------------------------
>
>                 Key: FLINK-21110
>                 URL: https://issues.apache.org/jira/browse/FLINK-21110
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: Zhilong Hong
>            Assignee: Zhilong Hong
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>         Attachments: Illustration of Group.jpg
>
>
> According to the result of scheduler benchmarks we implemented in 
> FLINK-20612, the bottleneck of deploying and running a large-scale job in 
> Flink is mainly focused on the following procedures:
> |Procedure|Time complexity|
> |Initializing ExecutionGraph|O(N^2)|
> |Building DefaultExecutionTopology|O(N^2)|
> |Initializing PipelinedRegionSchedulingStrategy|O(N^2)|
> |Scheduling downstream tasks when a task finishes|O(N^2)|
> |Calculating tasks to restart when a failover occurs|O(N^2)|
> |Releasing result partitions|O(N^3)|
> These procedures are all related to the complexity of the topology in the 
> ExecutionGraph. Between two vertices connected with the all-to-all edges, all 
> the upstream Intermediate ResultPartitions are connected to all downstream 
> ExecutionVertices. The computation complexity of building and traversing all 
> these edges will be O(N^2). 
> As for memory usage, currently we use ExecutionEdges to store the information 
> of connections. For the all-to-all distribution type, there are O(N^2) 
> ExecutionEdges. We test a simple job with only two vertices. The parallelisms 
> of them are both 10k. Furthermore, they are connected with all-to-all edges. 
> It takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.
> In most large-scale jobs, there will be more than two vertices with large 
> parallelisms, and they would cost a lot of time and memory to deploy the job.
> As we can see, for two JobVertices connected with the all-to-all distribution 
> type, all IntermediateResultPartitions produced by the upstream 
> ExecutionVertices are isomorphic, which means that the downstream 
> ExecutionVertices they connected are exactly the same. The downstream 
> ExecutionVertices belonging to the same JobVertex are also isomorphic, as the 
> upstream ResultPartitions they connect are the same, too.
> Since every JobEdge has exactly one distribution type, we can divide the 
> vertices and result partitions into groups according to the distribution type 
> of the JobEdge. 
> For the all-to-all distribution type, since all downstream vertices are 
> isomorphic, they belong to a single group, and all the upstream result 
> partitions are connected to this group. Vice versa, all the upstream result 
> partitions also belong to a single group, and all the downstream vertices are 
> connected to this group. In the past, when we wanted to iterate all the 
> downstream vertices, we needed to loop over them n times, which leads to the 
> complexity of O(N^2). Now since all upstream result partitions are connected 
> to one downstream group, we just need to loop over them once, with the 
> complexity of O(N).
> For the pointwise distribution type, because each result partition is 
> connected to different downstream vertices, they should belong to different 
> groups. Vice versa, all the vertices belong to different groups. Since one 
> result partition group is connected to one vertex group pointwisely, the 
> computation complexity of looping over them is still O(N).
> !Illustration of Group.jpg|height=249!
> After we group the result partitions and vertices, ExecutionEdge is no longer 
> needed. For the test job we mentioned above, the optimization can effectively 
> reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) 
> in our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds 
> (with 10k parallelism).
>  
> The detailed design doc: 
> https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to