[ https://issues.apache.org/jira/browse/FLINK-23218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17385585#comment-17385585 ]
Till Rohrmann commented on FLINK-23218: --------------------------------------- Just for my understanding: When will a {{TaskExecutor}} reread the {{JobInformation}}, {{TaskInformation}} and {{ShuffleDescriptors}}? Won't this only happen after the {{JM}} redeploys the {{Tasks}}? In this case, the {{JM}} could store these blobs again. But I am ok with introducing a not so permanent blob entry that is eligible for LRU pruning. I think it is a good idea to solve this on the {{BlobCache}} side by providing an API which allows you to say that the cache does not need to keep this file because in doubt it can be downloaded from the server again and it is not actively used during the execution of the {{Task}}. > Distribute the ShuffleDescriptors via blob server > ------------------------------------------------- > > Key: FLINK-23218 > URL: https://issues.apache.org/jira/browse/FLINK-23218 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination > Reporter: Zhilong Hong > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > _This is the part 2 of the optimization related to task deployments. For more > details about the overall description and the part 1, please see FLINK-23005._ > For ShuffleDescriptors of vertices with 8k parallelism, the size of their > serialized value is more than 700 Kilobytes. After the compression, it would > be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is > more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the > TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would > become a heavy burden for the garbage collector to deal with. > In TaskDeploymentDescriptor, JobInformation and TaskInformation are > distributed via the blob server if their sizes exceed a certain threshold > (which is defined as {{blob.offload.minsize}}). TaskExecutors request the > information from the blob server once they begin to process the > TaskDeploymentDescriptor. This make sure that JobManager don't need to keep > all the copies in the heap memory until the TaskDeploymentDescriptors are all > sent. There will be only one copy in the blob server. Like the > JobInformation, we can just distribute the cached ShuffleDescriptors via the > blob server if their overall size has exceeded the threshold. > This improvement can help to avoid the long-term garbage collection during > task deployment. > The cached ShuffleDescriptors in the blob server will be removed once the > partitions related to them are no longer valid. This makes sure the blob > server won't be full of cached ShuffleDescriptors, even there's a long > running session on the cluster. > In the part 3 we will limit the size of ShuffleDescriptors in > PermanentBlobCache on TaskExecutor. This makes sure out of space won't happen > on the TaskExecutor because of cached ShuffleDescriptors. For more details > please see FLINK-23354. -- This message was sent by Atlassian Jira (v8.3.4#803005)