[ https://issues.apache.org/jira/browse/FLINK-32045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17721632#comment-17721632 ]
Zhilong Hong commented on FLINK-32045: -------------------------------------- Thank you for proposing these optimizations, Weihua! # For the threshold to enable the distribution of shuffle descriptors via the blob server, originally I'm thinking about adding a new configuration called something like "blob.deployement.offload.minsize" (I forgot the original name). This configuration was eventually dropped, because we don't want to introduce a new configuration that would require users to have advanced knowledge before configuring it. However, I think enabling the distribution of shuffle descriptors via the blob server according to the parallelism is a better solution for this situation. It's more understandable and easier to configure. We can also set a large default value for this configuration. What do you think [~zhuzh]? # We thought about introducing a cache for shuffle descriptors in the TaskManager earlier. Since users usually won't set a large number for the configuration "taskmanager.numberOfTaskSlots", which means there would only be a few slots in a TaskManager (for example, 8?). There won't be a lot of deserialization work on the TaskManager side. So, I'm wondering how much performance it would improve with a cache for shuffle descriptors in the TaskManager. Also, there's another question arises for the cache. How to update the cache? Currently, the cache in JobManager is cleared in two scenarios: (1) ConsumerPartitionGroup is released (2) The producer of an IntermediateResult encounters a failover. To clear the caches in the TaskManager at the same time, we may need to introduce a few complicated RPC calls between JobManager and TaskManager to achieve it. In my opinion, it's a bit of complicated. The third concern is about the session mode. If users submitted a lot of jobs to a session in a rapid speed, the cache would flush the heap memory in a short time, and causes unexpected influence for user's tasks. We can use a LRUCache or FIFOCache for this situation. However, it's not easy for us to decide the size of the cache, because we don't know how large the TaskManager would be. In my opinion, introducing a cache for ShuffleDescriptors in the TaskManager may require more discussions. Please correct me if I missed anything or anything I said is wrong. Thank you. > optimize task deployment performance for large-scale jobs > --------------------------------------------------------- > > Key: FLINK-32045 > URL: https://issues.apache.org/jira/browse/FLINK-32045 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Reporter: Weihua Hu > Priority: Major > > h1. Background > In FLINK-21110, we cache shuffle descriptors on the job manager side and > support using blob servers to offload these descriptors in order to reduce > the cost of tasks deployment. > I think there is also some improvement we could do for large-scale jobs. > # The default min size to enable distribution via blob server is 1MB. But > for a large wordcount job with 20000 parallelism, the size of serialized > shuffle descriptors is only 300KB. It means users need to lower the > "blob.offload.minsize", but the value is hard for users to decide. > # The task executor side still needs to load blob files and deserialize > shuffle descriptors for each task. Since these operations are running in the > main thread, it may be pending other RPCs from the job manager. > h1. Propose > # Enable distribute shuffle descriptors via blob server automatically. This > could be decided by the edge number of the current shuffle descriptor. The > blob offload will be enabled when the edge number exceeds an internal > threshold. > # Introduce cache of deserialized shuffle descriptors on the task executor > side. This could reduce the cost of reading from local blob files and > deserialization. Of course, the cache should have TTL to avoid occupying too > much memory. And the cache should have the same switch mechanism as the blob > server offload. -- This message was sent by Atlassian Jira (v8.20.10#820010)