[ https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann reassigned FLINK-15031: ------------------------------------- Assignee: Zhu Zhu > Calculate required shuffle memory cases before allocating slots in resources > specified > -------------------------------------------------------------------------------------- > > Key: FLINK-15031 > URL: https://issues.apache.org/jira/browse/FLINK-15031 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination > Affects Versions: 1.10.0 > Reporter: Zhu Zhu > Assignee: Zhu Zhu > Priority: Major > Fix For: 1.10.0 > > > In resources specified cases, we expect each operator to declare required > resources and before using them. In this way, no resource related error > should happen if no resource is used more than declared. This ensures a > deployed task would not fail due to insufficient resources in TM. This may > result in unnecessary failures and may even cause a job hanging forever, > failing repeatedly on deploying tasks to a TM with insufficient resources. > Shuffle memory is the last missing piece for this goal at the moment. Minimum > network buffers are required by tasks to work. Currently a task is possible > to be deployed to a TM with insufficient network buffers, and fails on > launching. > To avoid that, we should calculate required network memory for a > task/SlotSharingGroup before allocating a slot for it. > The required shuffle memory can be derived from the number of required > network buffers. The number of buffers required by a task (ExecutionVertex) > is > {code:java} > exclusive buffers for input channels(i.e. numInputChannel * > buffersPerChannel) + required buffers for result partition buffer > pool(currently is numberOfSubpartitions + 1) > {code} > Note that this is for the {{NettyShuffleService}} case. For custom shuffle > services, currently there is no way to get the required shuffle memory of a > task. > To make it simple under dynamic slot sharing, the required shuffle memory for > a task should be the max required shuffle memory of all {{ExecutionVertex}} > of the same {{ExecutionJobVertex}}. And the required shuffle memory for a > slot sharing group should be the sum of shuffle memory for each > {{ExecutionJobVertex}} instance within. -- This message was sent by Atlassian Jira (v8.3.4#803005)