[ 
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhilong Hong updated FLINK-23218:
---------------------------------
    Description: 
_This is the part 2 of the optimization related to task deployments. For more 
details about the overall description and the part 1, please see FLINK-23005._

For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
serialized value is more than 700 Kilobytes. After the compression, it would be 
200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more 
than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
become a heavy burden for the garbage collector to deal with.

In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed 
via the blob server if their sizes exceed a certain threshold (which is defined 
as {{blob.offload.minsize}}). TaskExecutors request the information from the 
blob server once they begin to process the TaskDeploymentDescriptor. This make 
sure that JobManager don't need to keep all the copies in the heap memory until 
the TaskDeploymentDescriptors are all sent. There will be only one copy in the 
blob server. Like the JobInformation, we can just distribute the cached 
ShuffleDescriptors via the blob server if their overall size has exceeded the 
threshold.

This improvement can help to avoid the long-term garbage collection during task 
deployment.

The cached ShuffleDescriptors in the blob server will be removed once the 
partitions related to them are no longer valid. This makes sure the blob server 
won't be full of cached ShuffleDescriptors, even there's a long running session 
on the cluster.

In the part 3 we will limit the size of ShuffleDescriptors in 
PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
on the TaskExecutor because of cached ShuffleDescriptors. For more details 
please see FLINK-23354.

  was:
_This is the part 2 of the optimization related to task deployments. For more 
details about the overall description and the part 1, please see FLINK-23005._

For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
serialized value is more than 700 Kilobytes. After the compression, it would be 
200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more 
than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
become a heavy burden for the garbage collector to deal with.

In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed 
via the blob server if their sizes exceed a certain threshold (which is defined 
as {{blob.offload.minsize}}). TaskExecutors request the information from the 
blob server once they begin to process the TaskDeploymentDescriptor. This make 
sure that JobManager don't need to keep all the copies in the heap memory until 
the TaskDeploymentDescriptors are all sent. There will be only one copy in the 
blob server. Like the JobInformation, we can just distribute the cached 
ShuffleDescriptors via the blob server if their overall size has exceeded the 
threshold.

This improvement can help to avoid the long-term garbage collection during task 
deployment.

The cached ShuffleDescriptors in the blob server will be removed once the 
partitions related to them are no longer valid. This makes sure the blob server 
won't be full of cached ShuffleDescriptors, even there's a long running session 
on the cluster.
 
 
In the part 3 we will limit the size of ShuffleDescriptors in 
PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
on the TaskExecutor because of cached ShuffleDescriptors. For more details 
please see FLINK-23354.


> Distribute the ShuffleDescriptors via blob server
> -------------------------------------------------
>
>                 Key: FLINK-23218
>                 URL: https://issues.apache.org/jira/browse/FLINK-23218
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Coordination
>            Reporter: Zhilong Hong
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more 
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
> serialized value is more than 700 Kilobytes. After the compression, it would 
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is 
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are 
> distributed via the blob server if their sizes exceed a certain threshold 
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the 
> information from the blob server once they begin to process the 
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep 
> all the copies in the heap memory until the TaskDeploymentDescriptors are all 
> sent. There will be only one copy in the blob server. Like the 
> JobInformation, we can just distribute the cached ShuffleDescriptors via the 
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during 
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the 
> partitions related to them are no longer valid. This makes sure the blob 
> server won't be full of cached ShuffleDescriptors, even there's a long 
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in 
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen 
> on the TaskExecutor because of cached ShuffleDescriptors. For more details 
> please see FLINK-23354.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to