[ 
https://issues.apache.org/jira/browse/FLINK-11078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun updated FLINK-11078:
-----------------------------
    Description: 
In pre-FLIP-6 context, we start a yarn session with fixed number of running 
{{TaskManagers}}. This is good for user to run a series of small jobs on a 
specific cluster and reduce the cost of deploying a cluster per job. In current 
code base, we start a yarn session with none pre-allocated TMs, but allocates 
them on a certain job submitted and running.

To get benefits from both mode, I propose introducing a pair {{(min, max)}} 
represents the minimum and maximum for the number of running {{TaskExecutors}}.

With such option, when setting minimum = maximum = n we effectively have the 
same behaviour as before with the pre-Flip-6 code; and when setting minimum = 
0, maximum = inf we effectively have the same behaviour as current code path.

Most of the implementation area would be passing such option in 
{{FlinkYarnSessionCli}} and respecting it.

Hopefully the changes are not that big and I would try to draft more details. 
Glad to see your suggestions and ideas.

*UPDATE*: For how to respect the options, it is a choice that we

1. Build {{SlotManager}} respecting the options.

That is, passing options via Cli and set configuration, the code path would be 
{{env -> YarnEntrypointUtils#loadConfiguration -> YarnSessionClusterEntrypoint 
-> ... -> ClusterEntrypoint#runCluster -> ... -> 
SlotManagerConfiguration#fromConfiguration}}. And for the new options, we 
introduce two new keys {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} and 
{{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}

2. Start {{SlotManager}} will also start new workers with number 
{{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}

3.1. When {{SlotManager#allocateResource}}, we ensure the number of TMs does 
not exceed 
 {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}}

3.2 When {{resourceActions#releaseResource}}, we ensure the number of TMs does 
exceed   {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}. By an analysis of 
code path, currently we {{stopWorker}} only when 
{{SlotManager#checkTaskManagerTimeouts}}. Thus add a guardian before 
{{#releaseResource}} in {{#checkTaskManagerTimeouts}} should work.

I give this a roughly attempt at 
https://github.com/tisonkun/flink/tree/FLINK-11078. More corner/conflict 
scenarios should be taken into consideration.

  was:
In pre-FLIP-6 context, we start a yarn session with fixed number of running 
{{TaskManagers}}. This is good for user to run a series of small jobs on a 
specific cluster and reduce the cost of deploying a cluster per job. In current 
code base, we start a yarn session with none pre-allocated TMs, but allocates 
them on a certain job submitted and running.

To get benefits from both mode, I propose introducing a pair {{(min, max)}} 
represents the minimum and maximum for the number of running {{TaskExecutors}}.

With such option, when setting minimum = maximum = n we effectively have the 
same behaviour as before with the pre-Flip-6 code; and when setting minimum = 
0, maximum = inf we effectively have the same behaviour as current code path.

Most of the implementation area would be passing such option in 
{{FlinkYarnSessionCli}} and respecting it.

Hopefully the changes are not that big and I would try to draft more details. 
Glad to see your suggestions and ideas.

*UPDATE*: For how to respect the options, it is a choice that we

1. Build {{SlotManager}} respecting the options.

That is, passing options via Cli and set configuration, the code path would be 
{{env -> YarnEntrypointUtils#loadConfiguration -> YarnSessionClusterEntrypoint 
-> ... -> ClusterEntrypoint#runCluster -> ... -> 
SlotManagerConfiguration#fromConfiguration}}. And for the new options, we 
introduce two new keys {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} and 
{{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}

2. Start {{SlotManager}} will also start new workers with number 
{{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}

3.1. When {{SlotManager#allocateResource}}, we ensure current number of TMs 
does not exceed 
 {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}}

3.2 When {{resourceActions#releaseResource}}, we ensure the number of TMs does 
exceed   {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}. By an analysis of 
code path, currently we {{stopWorker}} only when 
{{SlotManager#checkTaskManagerTimeouts}}. Thus add a guardian before 
{{#releaseResource}} in {{#checkTaskManagerTimeouts}} should work.

I give this a roughly attempt at 
https://github.com/tisonkun/flink/tree/FLINK-11078. More corner/conflict 
scenarios should be taken into consideration.


> Capability to define the numerical range for running TaskExecutors
> ------------------------------------------------------------------
>
>                 Key: FLINK-11078
>                 URL: https://issues.apache.org/jira/browse/FLINK-11078
>             Project: Flink
>          Issue Type: New Feature
>          Components: Client, ResourceManager
>    Affects Versions: 1.8.0
>            Reporter: TisonKun
>            Assignee: TisonKun
>            Priority: Major
>             Fix For: 1.8.0
>
>
> In pre-FLIP-6 context, we start a yarn session with fixed number of running 
> {{TaskManagers}}. This is good for user to run a series of small jobs on a 
> specific cluster and reduce the cost of deploying a cluster per job. In 
> current code base, we start a yarn session with none pre-allocated TMs, but 
> allocates them on a certain job submitted and running.
> To get benefits from both mode, I propose introducing a pair {{(min, max)}} 
> represents the minimum and maximum for the number of running 
> {{TaskExecutors}}.
> With such option, when setting minimum = maximum = n we effectively have the 
> same behaviour as before with the pre-Flip-6 code; and when setting minimum = 
> 0, maximum = inf we effectively have the same behaviour as current code path.
> Most of the implementation area would be passing such option in 
> {{FlinkYarnSessionCli}} and respecting it.
> Hopefully the changes are not that big and I would try to draft more details. 
> Glad to see your suggestions and ideas.
> *UPDATE*: For how to respect the options, it is a choice that we
> 1. Build {{SlotManager}} respecting the options.
> That is, passing options via Cli and set configuration, the code path would 
> be {{env -> YarnEntrypointUtils#loadConfiguration -> 
> YarnSessionClusterEntrypoint -> ... -> ClusterEntrypoint#runCluster -> ... -> 
> SlotManagerConfiguration#fromConfiguration}}. And for the new options, we 
> introduce two new keys {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}} and 
> {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}
> 2. Start {{SlotManager}} will also start new workers with number 
> {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}
> 3.1. When {{SlotManager#allocateResource}}, we ensure the number of TMs does 
> not exceed 
>  {{ResourceManagerOptions#TASK_MANAGER_MAXIMUM}}
> 3.2 When {{resourceActions#releaseResource}}, we ensure the number of TMs 
> does exceed   {{ResourceManagerOptions#TASK_MANAGER_MINIMUM}}. By an analysis 
> of code path, currently we {{stopWorker}} only when 
> {{SlotManager#checkTaskManagerTimeouts}}. Thus add a guardian before 
> {{#releaseResource}} in {{#checkTaskManagerTimeouts}} should work.
> I give this a roughly attempt at 
> https://github.com/tisonkun/flink/tree/FLINK-11078. More corner/conflict 
> scenarios should be taken into consideration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to