Zhilong Hong created FLINK-23005:
------------------------------------

             Summary: Optimize the deployment of tasks
                 Key: FLINK-23005
                 URL: https://issues.apache.org/jira/browse/FLINK-23005
             Project: Flink
          Issue Type: Sub-task
          Components: Runtime / Coordination
            Reporter: Zhilong Hong
             Fix For: 1.14.0


h3. Introduction

The optimizations introduced in FLINK-21110 so far have improved the 
performance of job initialization, failover and partitions releasing. However, 
the task deployment is still slow. For a job with two vertices, each vertex has 
8k parallelism and they are connected with the all-to-all edge. It takes 
32.611s to deploy all the tasks and make them transition to running. If the 
parallelisms are 16k, it may take more than 2 minutes.

As the creation of TaskDeploymentDescriptors runs in the main thread of 
jobmanager, it means that the jobmanager cannot deal with other akka messages 
like heartbeats, task status update, and etc., for more than two minutes.

 

All in all, currently there are two issues in the deployment of tasks for large 
scale jobs:
 # It takes a long time to deploy tasks, especially for all-to-all edges.
 # Heartbeat timeout may happen during or after the procedure of task 
deployments. For the streaming job, it would cause the failover of the entire 
region. The job may never transition to running since there would be another 
heartbeat timeout during the procedure of new task deployments.

h3. Proposal

Task deployments involves the following procedures:
 # Jobmanager creates TaskDeploymentDescriptor for each task in the main thread
 # TaskDeploymentDescriptor is serialized in the future executor
 # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
 # TaskExecutors create a new task thread and execute it

The optimization contains two parts:

*1. Cache the compressed serialized value of ShuffleDescriptors*

ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
IntermediateResultPartitions that a task consumes. For the downstream vertices 
connected with the all-to-all edge that has _N_ parallelism, we need to 
calculate _N_ ShuffleDescriptors for _N_ times. However, for these vertices, 
they share the same ShuffleDescriptors since they all consume the same 
IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for 
each downstream vertex individually. We can just cache them. This will decrease 
the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to 
O(N).

Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
times, so we can just cache the serialized value of ShuffleDescriptors instead 
of the original object. To decrease the size of akka messages and reduce 
replicated data over the network, these serialized value can be compressed.

*2. Distribute the ShuffleDescriptors via blob server*

For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
serialized value is more than 700 Kilobytes. After the compression, it would be 
200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more 
than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
become a heavy burden for the garbage collector to deal with.

In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed 
via the blob server if their sizes exceed a certain threshold (which is defined 
as {{blob.offload.minsize}}). TaskExecutors request the information from the 
blob server once they begin to process the TaskDeploymentDescriptor. This make 
sure that jobmanager don't need to keep all the copies in the heap memory until 
the TaskDeploymentDescriptors are all sent. There will be only one copy on the 
blob server. Like the JobInformation, we can just distribute the cached 
ShuffleDescriptors via the blob server if their overall size has exceeded the 
threshold.
h3. Summary

In summary, the optimization of task deployment is to introduce a cache for the 
TaskDeploymentDescriptor. We cache the compressed serialized value of 
ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
value would be distributed via the blob server.
h3. Comparison

We implemented a POC and conducted an experiment to compare the performance of 
our optimization. We choose the streaming job in the experiment because no task 
will be running until all tasks are deployed. This avoids other disturbing 
factors. The job contains two vertices: a source and a sink. They are connected 
with an all-to-all edge.

The results illustrated below are the time interval between the timestamp of 
the first task that transitions to _deploying_ and the timestamp of the last 
task that transitions to _running_:
||Parallelism||Before||After ||
|8000*8000|32.611s|6.480s|
|16000*16000|128.408s|19.051s|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to