[ 
https://issues.apache.org/jira/browse/FLINK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17458147#comment-17458147
 ] 

huntercc commented on FLINK-24295:
----------------------------------

I also think retrying may be a good idea. In my solution, task will retry with 
penalty mechanism to communicate with upstream RemoteInputChannel other than 
directly trigger a `requestPartitionState` RPC call, which prominently reduced 
the number of RPC calls the JobMaster need to handle. Only a task gets 
`PartitionNotFoundException` after several retries, task will trigger a 
`requestPartitionState` RPC call to JobMaster. In practice, we improved the 
efficiency of submitting a large scale job with data shuffle in this way.

> Too many requestPartitionState may jam the JobManager during task deployment
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-24295
>                 URL: https://issues.apache.org/jira/browse/FLINK-24295
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.14.0, 1.15.0
>            Reporter: Zhilong Hong
>            Priority: Major
>
> After the optimization of the phase 2 we've done in FLINK-21110, the speed of 
> task deployment has accelerated. However, we find that during the task 
> deployment, there may be too many {{requestPartitionState}} RPC calls from 
> TaskManagers that would jam the JobManager.
> Why would there be too many {{requestPartitionState}} RPC calls? After the 
> optimization, the JobManager can submit tasks to TaskManagers quickly. If 
> JobManager calls {{submitTask}} faster than the speed of dealing with 
> {{submitTask}} by TaskManagers, there may be a scenario that some 
> TaskManagers deploy tasks faster than other TaskManagers.
> When a downstream task is deployed, it would try to request partitions from 
> upstream tasks, which may be located at a remote TaskManager. If the upstream 
> tasks are not deployed, it would request the partition state from JobManager. 
> In the worst case, the complexity of the computation and memory would be 
> O(N^2).
> In our test with a streaming job, which has two vertices with the 8,000 
> parallelism and connected with all-to-all edges, in the worst case, there 
> will be 32,000,000 {{requestPartitionState}} RPC calls in the JobManager. 
> Each RPC call requires 1 KiB space in the heap memory of the JobManager. The 
> overall space cost of {{requestPartitionState}} will be 32 GiB, which is a 
> heavy burden for GC to deal with.
> In our test, the size of the heap memory of JobManager is 8 GiB. During the 
> task deployment the JobManager gets more full GCs. The JobManager gets stuck 
> since it is filled with full GCs and has no time to deal with the incoming 
> RPC calls.
> The worst thing is that there's no log outputted for this RPC call. When a 
> user find the JobManager is get slower or get stuck, he/she won't be able to 
> find out why.
> Why does this case rarely happen before? Before the optimization, it takes a 
> long time to calculate TaskDeploymentDescriptors and send them to 
> TaskManagers. JobManager calls {{submitTask}} more slowly than the speed of 
> dealing with {{submitTask}} by TaskManagers in most cases. Since the 
> deployment of tasks are topologically sorted, the upstream tasks is deployed 
> before the downstream tasks, and this case rarely happens.
> In my opinion, the solution of this issue needs more discussion. According to 
> the discussion in the pull request 
> ([https://github.com/apache/flink/pull/6680]), it's not safe to remove this 
> RPC call, because we cannot always make sure the assumption that an upstream 
> task failure will always fail the downstream consumers is always right.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to