[ https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhu Zhu updated FLINK-15031: ---------------------------- Summary: Automatically calculate required network memory for fine-grained jobs (was: Automatically calculate required shuffle memory for fine-grained jobs) > Automatically calculate required network memory for fine-grained jobs > --------------------------------------------------------------------- > > Key: FLINK-15031 > URL: https://issues.apache.org/jira/browse/FLINK-15031 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination > Affects Versions: 1.10.0 > Reporter: Zhu Zhu > Assignee: Jin Xing > Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > Time Spent: 10m > Remaining Estimate: 0h > > In cases where resources are specified, we expect each operator to declare > required resources before using them. In this way, no resource related error > should happen if resources are not used beyond what was declared. This > ensures a deployed task would not fail due to insufficient resources in TM, > which may result in unnecessary failures and may even cause a job hanging > forever, failing repeatedly on deploying tasks to a TM with insufficient > resources. > Shuffle memory is the last missing piece for this goal at the moment. Minimum > network buffers are required by tasks to work. Currently a task is possible > to be deployed to a TM with insufficient network buffers, and fails on > launching. > To avoid that, we should calculate required network memory for a > task/SlotSharingGroup before allocating a slot for it. > The required shuffle memory can be derived from the number of required > network buffers. The number of buffers required by a task (ExecutionVertex) is > {code:java} > exclusive buffers for input channels(i.e. numInputChannel * > buffersPerChannel) + required buffers for result partition buffer > pool(currently is numberOfSubpartitions + 1) > {code} > Note that this is for the {{NettyShuffleService}} case. For custom shuffle > services, currently there is no way to get the required shuffle memory of a > task. > To make it simple under dynamic slot sharing, the required shuffle memory for > a task should be the max required shuffle memory of all {{ExecutionVertex}} > of the same {{ExecutionJobVertex}}. And the required shuffle memory for a > slot sharing group should be the sum of shuffle memory for each > {{ExecutionJobVertex}} instance within. -- This message was sent by Atlassian Jira (v8.3.4#803005)