[ 
https://issues.apache.org/jira/browse/FLINK-15031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371290#comment-17371290
 ] 

Zhu Zhu edited comment on FLINK-15031 at 6/29/21, 10:52 AM:
------------------------------------------------------------

Discussed with [~trohrmann] offline. His concern was that the network 
configuration can be different in different TMs and this inconsistency can 
cause different announced and used network memory. To solve this problem, we 
think in the first version we should clearly state in documents that it 
requires JM/RM/TMs to have the same configuration.

Another thing we should be aware is the network memory usage code may change 
and result int requirement&usage inconsistency. To avoid this problem, we 
should extract current network memory usage code block to let it be shared by 
the requirement calculation and usage.

Regarding the fraction style config, it is actually refers to floating buffers 
excluding minimum requirements. It is a bit too hard for users(even advanced 
users) to understand. Given that it is a new feature to automatically calculate 
required network memory, we think it's fine that in the first version, to avoid 
performance regression and buffer request timeout, always include enough extra 
floating buffers into the requirement, and see whether users complain about 
network memory requirements. 

WDYT? [~jinxing6...@126.com]  [~karmagyz]  [~xintongsong]


was (Author: zhuzh):
Discussed with Till offline. His concern was that the network configuration can 
be different in different TMs and this inconsistency can cause different 
announced and used network memory. To solve this problem, we think in the first 
version we should clearly state in documents that it requires JM/RM/TMs to have 
the same configuration.

Another thing we should be aware is the network memory usage code may change 
and result int requirement&usage inconsistency. To avoid this problem, we 
should extract current network memory usage code block to let it be shared by 
the requirement calculation and usage.

Regarding the fraction style config, it is actually refers to floating buffers 
excluding minimum requirements. It is a bit too hard for users(even advanced 
users) to understand. Given that it is a new feature to automatically calculate 
required network memory, we think it's fine that in the first version, to avoid 
performance regression and buffer request timeout, always include enough extra 
floating buffers into the requirement, and see whether users complain about 
network memory requirements. 

WDYT? [~jinxing6...@126.com][~karmagyz][~xintongsong]

> Automatically calculate required network memory for fine-grained jobs
> ---------------------------------------------------------------------
>
>                 Key: FLINK-15031
>                 URL: https://issues.apache.org/jira/browse/FLINK-15031
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: Zhu Zhu
>            Assignee: Jin Xing
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> In cases where resources are specified, we expect each operator to declare 
> required resources before using them. In this way, no resource related error 
> should happen if resources are not used beyond what was declared. This 
> ensures a deployed task would not fail due to insufficient resources in TM, 
> which may result in unnecessary failures and may even cause a job hanging 
> forever, failing repeatedly on deploying tasks to a TM with insufficient 
> resources.
> Shuffle memory is the last missing piece for this goal at the moment. Minimum 
> network buffers are required by tasks to work. Currently a task is possible 
> to be deployed to a TM with insufficient network buffers, and fails on 
> launching.
> To avoid that, we should calculate required network memory for a 
> task/SlotSharingGroup before allocating a slot for it.
> The required shuffle memory can be derived from the number of required 
> network buffers. The number of buffers required by a task (ExecutionVertex) is
> {code:java}
> exclusive buffers for input channels(i.e. numInputChannel * 
> buffersPerChannel) + required buffers for result partition buffer 
> pool(currently is numberOfSubpartitions + 1)
> {code}
> Note that this is for the {{NettyShuffleService}} case. For custom shuffle 
> services, currently there is no way to get the required shuffle memory of a 
> task.
> To make it simple under dynamic slot sharing, the required shuffle memory for 
> a task should be the max required shuffle memory of all {{ExecutionVertex}} 
> of the same {{ExecutionJobVertex}}. And the required shuffle memory for a 
> slot sharing group should be the sum of shuffle memory for each 
> {{ExecutionJobVertex}} instance within.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to