[ https://issues.apache.org/jira/browse/FLINK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17458147#comment-17458147 ]
huntercc commented on FLINK-24295: ---------------------------------- I also think retrying may be a good idea. In my solution, task will retry with penalty mechanism to communicate with upstream RemoteInputChannel other than directly trigger a `requestPartitionState` RPC call, which prominently reduced the number of RPC calls the JobMaster need to handle. Only a task gets `PartitionNotFoundException` after several retries, task will trigger a `requestPartitionState` RPC call to JobMaster. In practice, we improved the efficiency of submitting a large scale job with data shuffle in this way. > Too many requestPartitionState may jam the JobManager during task deployment > ---------------------------------------------------------------------------- > > Key: FLINK-24295 > URL: https://issues.apache.org/jira/browse/FLINK-24295 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.14.0, 1.15.0 > Reporter: Zhilong Hong > Priority: Major > > After the optimization of the phase 2 we've done in FLINK-21110, the speed of > task deployment has accelerated. However, we find that during the task > deployment, there may be too many {{requestPartitionState}} RPC calls from > TaskManagers that would jam the JobManager. > Why would there be too many {{requestPartitionState}} RPC calls? After the > optimization, the JobManager can submit tasks to TaskManagers quickly. If > JobManager calls {{submitTask}} faster than the speed of dealing with > {{submitTask}} by TaskManagers, there may be a scenario that some > TaskManagers deploy tasks faster than other TaskManagers. > When a downstream task is deployed, it would try to request partitions from > upstream tasks, which may be located at a remote TaskManager. If the upstream > tasks are not deployed, it would request the partition state from JobManager. > In the worst case, the complexity of the computation and memory would be > O(N^2). > In our test with a streaming job, which has two vertices with the 8,000 > parallelism and connected with all-to-all edges, in the worst case, there > will be 32,000,000 {{requestPartitionState}} RPC calls in the JobManager. > Each RPC call requires 1 KiB space in the heap memory of the JobManager. The > overall space cost of {{requestPartitionState}} will be 32 GiB, which is a > heavy burden for GC to deal with. > In our test, the size of the heap memory of JobManager is 8 GiB. During the > task deployment the JobManager gets more full GCs. The JobManager gets stuck > since it is filled with full GCs and has no time to deal with the incoming > RPC calls. > The worst thing is that there's no log outputted for this RPC call. When a > user find the JobManager is get slower or get stuck, he/she won't be able to > find out why. > Why does this case rarely happen before? Before the optimization, it takes a > long time to calculate TaskDeploymentDescriptors and send them to > TaskManagers. JobManager calls {{submitTask}} more slowly than the speed of > dealing with {{submitTask}} by TaskManagers in most cases. Since the > deployment of tasks are topologically sorted, the upstream tasks is deployed > before the downstream tasks, and this case rarely happens. > In my opinion, the solution of this issue needs more discussion. According to > the discussion in the pull request > ([https://github.com/apache/flink/pull/6680]), it's not safe to remove this > RPC call, because we cannot always make sure the assumption that an upstream > task failure will always fail the downstream consumers is always right. -- This message was sent by Atlassian Jira (v8.20.1#820001)