[
https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhilong Hong updated FLINK-23218:
---------------------------------
Description:
_This is the part 2 of the optimization related to task deployments. For more
details about the overall description and the part 1, please see FLINK-23005._
For ShuffleDescriptors of vertices with 8k parallelism, the size of their
serialized value is more than 700 Kilobytes. After the compression, it would be
200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more
than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the
TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would
become a heavy burden for the garbage collector to deal with.
In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed
via the blob server if their sizes exceed a certain threshold (which is defined
as {{blob.offload.minsize}}). TaskExecutors request the information from the
blob server once they begin to process the TaskDeploymentDescriptor. This make
sure that JobManager don't need to keep all the copies in the heap memory until
the TaskDeploymentDescriptors are all sent. There will be only one copy in the
blob server. Like the JobInformation, we can just distribute the cached
ShuffleDescriptors via the blob server if their overall size has exceeded the
threshold.
This improvement can help to avoid the long-term garbage collection during task
deployment.
The cached ShuffleDescriptors in the blob server will be removed once the
partitions related to them are no longer valid. This makes sure the blob server
won't be full of cached ShuffleDescriptors, even there's a long running session
on the cluster.
In the part 3 we will limit the size of ShuffleDescriptors in
PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen
on the TaskExecutor because of cached ShuffleDescriptors. For more details
please see FLINK-23354.
was:
_This is the part 2 of the optimization related to task deployments. For more
details about the overall description and the part 1, please see FLINK-23005._
For ShuffleDescriptors of vertices with 8k parallelism, the size of their
serialized value is more than 700 Kilobytes. After the compression, it would be
200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more
than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the
TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would
become a heavy burden for the garbage collector to deal with.
In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed
via the blob server if their sizes exceed a certain threshold (which is defined
as {{blob.offload.minsize}}). TaskExecutors request the information from the
blob server once they begin to process the TaskDeploymentDescriptor. This make
sure that JobManager don't need to keep all the copies in the heap memory until
the TaskDeploymentDescriptors are all sent. There will be only one copy in the
blob server. Like the JobInformation, we can just distribute the cached
ShuffleDescriptors via the blob server if their overall size has exceeded the
threshold.
This improvement can help to avoid the long-term garbage collection during task
deployment.
The cached ShuffleDescriptors in the blob server will be removed once the
partitions related to them are no longer valid. This makes sure the blob server
won't be full of cached ShuffleDescriptors, even there's a long running session
on the cluster.
In the part 3 we will limit the size of ShuffleDescriptors in
PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen
on the TaskExecutor because of cached ShuffleDescriptors. For more details
please see FLINK-23354.
> Distribute the ShuffleDescriptors via blob server
> -------------------------------------------------
>
> Key: FLINK-23218
> URL: https://issues.apache.org/jira/browse/FLINK-23218
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Reporter: Zhilong Hong
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.14.0
>
>
> _This is the part 2 of the optimization related to task deployments. For more
> details about the overall description and the part 1, please see FLINK-23005._
> For ShuffleDescriptors of vertices with 8k parallelism, the size of their
> serialized value is more than 700 Kilobytes. After the compression, it would
> be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is
> more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the
> TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would
> become a heavy burden for the garbage collector to deal with.
> In TaskDeploymentDescriptor, JobInformation and TaskInformation are
> distributed via the blob server if their sizes exceed a certain threshold
> (which is defined as {{blob.offload.minsize}}). TaskExecutors request the
> information from the blob server once they begin to process the
> TaskDeploymentDescriptor. This make sure that JobManager don't need to keep
> all the copies in the heap memory until the TaskDeploymentDescriptors are all
> sent. There will be only one copy in the blob server. Like the
> JobInformation, we can just distribute the cached ShuffleDescriptors via the
> blob server if their overall size has exceeded the threshold.
> This improvement can help to avoid the long-term garbage collection during
> task deployment.
> The cached ShuffleDescriptors in the blob server will be removed once the
> partitions related to them are no longer valid. This makes sure the blob
> server won't be full of cached ShuffleDescriptors, even there's a long
> running session on the cluster.
> In the part 3 we will limit the size of ShuffleDescriptors in
> PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen
> on the TaskExecutor because of cached ShuffleDescriptors. For more details
> please see FLINK-23354.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)