[ https://issues.apache.org/jira/browse/FLINK-11078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
TisonKun updated FLINK-11078: ----------------------------- Description: In pre-FLIP-6 context, we start a yarn session with fixed number of running {{TaskManagers}}. This is good for user to run a series of small jobs on a specific cluster and reduce the cost of deploying a cluster per job. In current code base, we start a yarn session with none pre-allocated TMs, but allocates them on a certain job submitted and running. To get benefits from both mode, I propose introducing a pair {{(min, max)}} represents the minimum and maximum for the number of running {{TaskExecutors}}. With such option, when setting minimum = maximum = n we effectively have the same behaviour as before with the pre-Flip-6 code; and when setting minimum = 0, maximum = inf we effectively have the same behaviour as current code path. Most of the implementation area would be passing such option in {{FlinkYarnSessionCli}} and respecting it. Hopefully the changes are not that big and I would try to draft more details. Glad to see your suggestions and ideas. *UPDATE*: For how to respect the options, it is a choice that we 1. Build {{SlotManager}} respecting the options. That is, passing options via Cli and set configuration, the code path would be {{env -> YarnEntrypointUtils#loadConfiguration -> YarnSessionClusterEntrypoint -> ... -> ClusterEntrypoint#runCluster -> ... -> SlotManagerConfiguration#fromConfiguration}}. And for the new options, we introduce two new keys {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} and {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}} 2. Start {{SlotManager}} will also start new workers with number {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}} 3.1. When {{SlotManager#allocateResource}}, we ensure the number of TMs does not exceed {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} 3.2 When {{resourceActions#releaseResource}}, we ensure the number of TMs does exceed {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}. By an analysis of code path, currently we {{stopWorker}} only when {{SlotManager#checkTaskManagerTimeouts}}. Thus add a guardian before {{#releaseResource}} in {{#checkTaskManagerTimeouts}} should work. I give this a roughly attempt at https://github.com/tisonkun/flink/tree/FLINK-11078. More corner/conflict scenarios should be taken into consideration. was: In pre-FLIP-6 context, we start a yarn session with fixed number of running {{TaskManagers}}. This is good for user to run a series of small jobs on a specific cluster and reduce the cost of deploying a cluster per job. In current code base, we start a yarn session with none pre-allocated TMs, but allocates them on a certain job submitted and running. To get benefits from both mode, I propose introducing a pair {{(min, max)}} represents the minimum and maximum for the number of running {{TaskExecutors}}. With such option, when setting minimum = maximum = n we effectively have the same behaviour as before with the pre-Flip-6 code; and when setting minimum = 0, maximum = inf we effectively have the same behaviour as current code path. Most of the implementation area would be passing such option in {{FlinkYarnSessionCli}} and respecting it. Hopefully the changes are not that big and I would try to draft more details. Glad to see your suggestions and ideas. *UPDATE*: For how to respect the options, it is a choice that we 1. Build {{SlotManager}} respecting the options. That is, passing options via Cli and set configuration, the code path would be {{env -> YarnEntrypointUtils#loadConfiguration -> YarnSessionClusterEntrypoint -> ... -> ClusterEntrypoint#runCluster -> ... -> SlotManagerConfiguration#fromConfiguration}}. And for the new options, we introduce two new keys {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} and {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}} 2. Start {{SlotManager}} will also start new workers with number {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}} 3.1. When {{SlotManager#allocateResource}}, we ensure current number of TMs does not exceed {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} 3.2 When {{resourceActions#releaseResource}}, we ensure the number of TMs does exceed {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}. By an analysis of code path, currently we {{stopWorker}} only when {{SlotManager#checkTaskManagerTimeouts}}. Thus add a guardian before {{#releaseResource}} in {{#checkTaskManagerTimeouts}} should work. I give this a roughly attempt at https://github.com/tisonkun/flink/tree/FLINK-11078. More corner/conflict scenarios should be taken into consideration. > Capability to define the numerical range for running TaskExecutors > ------------------------------------------------------------------ > > Key: FLINK-11078 > URL: https://issues.apache.org/jira/browse/FLINK-11078 > Project: Flink > Issue Type: New Feature > Components: Client, ResourceManager > Affects Versions: 1.8.0 > Reporter: TisonKun > Assignee: TisonKun > Priority: Major > Fix For: 1.8.0 > > > In pre-FLIP-6 context, we start a yarn session with fixed number of running > {{TaskManagers}}. This is good for user to run a series of small jobs on a > specific cluster and reduce the cost of deploying a cluster per job. In > current code base, we start a yarn session with none pre-allocated TMs, but > allocates them on a certain job submitted and running. > To get benefits from both mode, I propose introducing a pair {{(min, max)}} > represents the minimum and maximum for the number of running > {{TaskExecutors}}. > With such option, when setting minimum = maximum = n we effectively have the > same behaviour as before with the pre-Flip-6 code; and when setting minimum = > 0, maximum = inf we effectively have the same behaviour as current code path. > Most of the implementation area would be passing such option in > {{FlinkYarnSessionCli}} and respecting it. > Hopefully the changes are not that big and I would try to draft more details. > Glad to see your suggestions and ideas. > *UPDATE*: For how to respect the options, it is a choice that we > 1. Build {{SlotManager}} respecting the options. > That is, passing options via Cli and set configuration, the code path would > be {{env -> YarnEntrypointUtils#loadConfiguration -> > YarnSessionClusterEntrypoint -> ... -> ClusterEntrypoint#runCluster -> ... -> > SlotManagerConfiguration#fromConfiguration}}. And for the new options, we > introduce two new keys {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} and > {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}} > 2. Start {{SlotManager}} will also start new workers with number > {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}} > 3.1. When {{SlotManager#allocateResource}}, we ensure the number of TMs does > not exceed > {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} > 3.2 When {{resourceActions#releaseResource}}, we ensure the number of TMs > does exceed {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}. By an analysis > of code path, currently we {{stopWorker}} only when > {{SlotManager#checkTaskManagerTimeouts}}. Thus add a guardian before > {{#releaseResource}} in {{#checkTaskManagerTimeouts}} should work. > I give this a roughly attempt at > https://github.com/tisonkun/flink/tree/FLINK-11078. More corner/conflict > scenarios should be taken into consideration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)