[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-03-15 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17059911#comment-17059911
 ] 

Yangze Guo commented on FLINK-15959:


FYI: As there is no response for nearly two weeks. I open FLINK-16605 to work 
on the max limit of the total number of slots.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-03-08 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17054589#comment-17054589
 ] 

Yangze Guo commented on FLINK-15959:


Hi, [~liuyufei]. I'm very interested and pay close attention to max limitation. 
Would you like to share the current progress and when will you finish this 
work? If you do not have enough bandwidth, shall we limit the scope of this 
ticket to min limitation and I'll open another ticket to work on the max? WDYT?

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-20 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041609#comment-17041609
 ] 

Yangze Guo commented on FLINK-15959:


Hi, [~liuyufei]. I have basic some ideas from the perspective of SlotManager to 
share. Hope these could help you.

>From the perspective of SlotManager, the min/max limitation is mainly related 
>to the "slots" and "pendingSlots" of SlotManager. Thus, I want to first 
>summarize the scenarios in which we maintain these two collections:
 *Regarding the "slots"*:
 * The number of elements will be reduced when TaskExecutor closed because of 
heartbeat timeout, idle timeout, etc.
 * The number of elements will be increased only in the registration of new 
TaskExecutor.

*Regarding the "pendingSlots"*:
 * The number of elements will be reduced only in the registration of new 
TaskExecutor.
 * The number of elements will be increased only in the resource allocation.

The union of these two collections defines the total resource/slots the 
SlotManager expect, especially after the FLINK-14106. So, both minimum/maximum 
limitations should act on the total number of slots of both existing and 
pending.
 *Regarding the min limit*, the sum of the two collections would reduce only 
when TaskExecutor closed. We need the check the min limit and decide should we 
allocate new resources in that scenario.
 *Regarding the max limit*,
 * We need to check the max limit before we allocate resource in 
SlotManager#registerSlot
 * When TaskExecutor closed, we also need to check if there is pending 
SlotReuqest which not assign to a slot or pending slot. If there is, we need to 
allocate resources while respecting the max limit.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-19 Thread Till Rohrmann (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040197#comment-17040197
 ] 

Till Rohrmann commented on FLINK-15959:
---

I would strongly discourage to not touch any of the scheduling components on 
the {{JobManager}} side because of the above-mentioned reasons. They are being 
actively worked on and will change quite significantly in the foreseeable 
future.

Instead I would suggest the following (approximative) solution with the 
corresponding approach:

1. Introduce the {{cluster.number-of-slots.min}} and 
{{cluster.number-of-slots.max}} configuration options.
2. Make the {{SlotManager}} respect the min and max number of slots but do not 
block the scheduling on whether {{min}} has reached
3. Have a special {{SlotManager}} implementation which only starts fulfilling 
slot requests if we have equal or more slots acquired than the configured 
minimum

With 1+2 we can already solve the case where we have a Yarn session cluster 
which has enough time to acquire the required solutions before jobs are being 
submitted. With 3. it should also work for the per-job cluster.

The case where we won't necessarily accomplish a perfect scheduling might be 
the failover case. But I think this is fine because one design principle of the 
existing scheduler was that all slots can be treated equally (modulo their 
resource specifications). And in doubt, it is better to make some progress with 
a not optimal scheduling than to make no progress while waiting on the perfect 
scheduling.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-19 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040019#comment-17040019
 ] 

Xintong Song commented on FLINK-15959:
--

[~liuyufei] It's ok. There's nothing to be sorry for. It's always good to have 
such discussions to talk things through. :)

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-19 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039932#comment-17039932
 ] 

YufeiLiu commented on FLINK-15959:
--

[~xintongsong] Sorry, I was wrong about that, it won't get worse no matter how 
many times it's been restarted, only affect by the number of lost TMs. Sorry 
about wasting your time.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-19 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039879#comment-17039879
 ] 

Xintong Song commented on FLINK-15959:
--

[~liuyufei]
bq. but the situation could get worse after a few times restart.
I don't understand why is that. More failovers does not necessarily make {{ 
SlotPool }} caches less slots/TMs. Shouldn't it only fail to cache slots from 
recent lost TMs? Flink should have already launched new TMs for those lost in 
previous attempts, right?

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-19 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039875#comment-17039875
 ] 

YufeiLiu commented on FLINK-15959:
--

[~xintongsong] I'm worried about the uncertainty, lost a TM won't make much 
difference, but the situation could get worse after a few times restart. If 
source operator parallelism is less than others, the source tasks are tend to 
converge on a few TMs after several times TM lost.
I think combine the function of {{SlotPool}} and {{ResourceManager}} is a good 
idea for the long term, and these config will work in any case if JobMaster 
didn't caching slot.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-18 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039668#comment-17039668
 ] 

Xintong Song commented on FLINK-15959:
--

[~liuyufei]

bq. otherwise scheduler will use slots from SlotPool until available slots are 
empty, this also can cause load balance issue.

Not sure about this. In case of job failover, all the slots in {{SlotPool}} 
should be free when recovering the job, and the `evenly-spread-out-slots` 
introduced in FLINK-12122 applies to those slots in {{SlotPool}} as well.

It's true that we cannot always guarantee to evenly spread out slots across all 
the TMs in case of TM lost. We may first spread out slot requests across the 
existing TMs (whose slots are still in {{SlotPool}}), and the remaining 
unfulfilled slot requests will be allocated to new TMs once the minimum slots 
are registered to RM. I think this might provide good enough load balance in 
most cases, especially in failovers cased by only one or a few TMs are lost. 

bq. maybe should implement it in scheduler, allocate enough resource before 
execution scheduling.

I think this is a good idea for solving the load balance problem. My concern it 
that, it might be conflict with some of the community road map plans.
- One thing I'm aware of is that, the community is planning for a declarative 
resource management approach, in which a job declares its all resource 
requirements at once, instead of currently requesting slots for individual 
task/execution separately. This effort is still in early design discussions, 
and may not be finished in the next release. But I would try to avoid making 
many changes to {{Scheduler}} and {{SlotPool}} at this time considering they 
might be changed again in short time.
- Another thing that I heard of, not completely sure about this, is that people 
are considering getting rid of {{SlotPool}}, or at least make it as less 
responsibility as possible. Because currently we do not benefit much from 
caching slots in {{SlotPool}}, but suffers from the complication that resources 
are managed at two places, the {{SlotPool}} and the {{ResourceManager}}. That's 
also why I do not like the idea of adding more responsibility to {{SlotPool}}.

I'm not very familiar {{Scheduler}} and {{SlotPool}} though. Maybe [~trohrmann] 
or [~zhuzh] could chime in.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-18 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039149#comment-17039149
 ] 

YufeiLiu commented on FLINK-15959:
--

[~xintongsong] Hi, I've got a problem and I need some advice.
How should I deal with when job have restarted because of TaskExecutors has 
lost, seems like have to block the scheduling thread until cluster has been 
recovered, otherwise scheduler will use slots from SlotPool until available 
slots are empty, this also can cause load balance issue.
I feel like have a wrong direction, maybe shoule implement it in scheduler, 
allocate enough resource before execution scheduling.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038086#comment-17038086
 ] 

Xintong Song commented on FLINK-15959:
--

bq. How about put it into ClusterOptions, start with "cluster.*".

I personally prefer "slotmanager.*", but I'm also ok with "cluster.*".

I would try to keep {{ClusterOptions}} as concise as possible, maybe only 
common configurations that are related to all distributed components. But my 
opinion on this is not strong. As I said, I'm ok with either of the two ways.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-16 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038072#comment-17038072
 ] 

YufeiLiu commented on FLINK-15959:
--

[~xintongsong]Got it, check total Executors less than maximum before 
startNewWorker, return a empty list if exceed the limitation.
How about put it into ClusterOptions, start with "cluster.*".

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-16 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038071#comment-17038071
 ] 

YufeiLiu commented on FLINK-15959:
--

[~xintongsong] Got it, check total Executors less than maximum before 
startNewWorker, return a empty list if exceed the limitation.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038036#comment-17038036
 ] 

Xintong Song commented on FLINK-15959:
--

Minor:
I would suggest to replace "taskmanager.[minimaum|maximum].numberOfTotalSlots" 
with "slotmanager.[min|max]-slots". "taskmanager.*" are usually per-TM 
configurations, while what we are discussing are cluster level min/max limits. 
Besides, IIUC, the configuration option should be used mostly by the 
resource/slot manager rather than task managers.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-16 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038027#comment-17038027
 ] 

Xintong Song commented on FLINK-15959:
--

[~liuyufei], thanks for updating the proposal.

Regarding the max limit, I think RM should guarantee that it is not exceeded. 
To be specific, RM can check how many worker / slots are already started, 
including registered and pending ones, and reject to start new workers if 
reaching the max limit.

The background of having a max limit is to control the resource consumption a 
bath job, so that it can be executed with much less slots than its parallelism 
without using up the cluster resources.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-16 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037840#comment-17037840
 ] 

Yangze Guo commented on FLINK-15959:


Yes, I think it is equivalent to the "maximum slots number" without the FLIP-56.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-16 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037773#comment-17037773
 ] 

YufeiLiu commented on FLINK-15959:
--

[~karmagyz] Thanks for your suggestion, I did ignore the batch-job scenario. I 
think maybe I should implement a prototype to confirm how much impact it will 
have, I will open a FLIP if it's necessary.
About "maximum resource limitation for task executors", did you mean limit the 
total resource of all task executors? If so, is that equivalent to the "maximum 
slots number" if slot resource is static?

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-15 Thread Yangze Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037719#comment-17037719
 ] 

Yangze Guo commented on FLINK-15959:


Hi, [~liuyufei]. Thanks for proposing this change. 
Regarding your suggestion, I think ResourceManager does not need to throw an 
exception when exceeding the maximum limit. AFAIK, for batch jobs, the job 
graph could be executed without all the slot requests are fulfilled. We may 
just give an information-level log in this scenario. For stream jobs, those 
slot requests could not be fulfilled would fail by the timeout check.

BTW, since it touches the Public interface, I think we need to open a FLIP for 
this change. I also wanna introduce the maximum resource limitation for task 
executors, there could be an interrelationship between the maximum and minimum 
limit. Would you like to work for it together?

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-14 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037130#comment-17037130
 ] 

YufeiLiu commented on FLINK-15959:
--

I've made some modification of description according to the comments, please 
tell me if I've missed anything.
[~sewen] Can I take this task if the improvement is necessary.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-14 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037127#comment-17037127
 ] 

YufeiLiu commented on FLINK-15959:
--

I've made some modification of description according to the comments, please 
tell me if I've missed anything.
Stephan Ewen Can I take this task if the improvement is necessary.

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots

2020-02-14 Thread YufeiLiu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037120#comment-17037120
 ] 

YufeiLiu commented on FLINK-15959:
--

I've made some modification of description according to the comments. Please 
tell me if  I've missed anything. 

> Add min/max number of slots configuration to limit total number of slots
> 
>
> Key: FLINK-15959
> URL: https://issues.apache.org/jira/browse/FLINK-15959
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.11.0
>Reporter: YufeiLiu
>Priority: Major
>
> Flink removed `-n` option after FLIP-6, change to ResourceManager start a new 
> worker when required. But I think maintain a certain amount of slots is 
> necessary. These workers will start immediately when ResourceManager starts 
> and would not release even if all slots are free.
> Here are some resons:
> # Users actually know how many resources are needed when run a single job, 
> initialize all workers when cluster starts can speed up startup process.
> # Job schedule in  topology order,  next operator won't schedule until prior 
> execution slot allocated. The TaskExecutors will start in several batchs in 
> some cases, it might slow down the startup speed.
> # Flink support 
> [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out 
> tasks evenly across all available registered TaskManagers], but it will only 
> effect if all TMs are registered. Start all TMs at begining can slove this 
> problem.
> *suggestion:*
> * Add config "taskmanager.minimum.numberOfTotalSlots" and 
> "taskmanager.maximum.numberOfTotalSlots", default behavior is still like 
> before.
> * Start plenty number of workers to satisfy minimum slots when 
> ResourceManager accept leadership(subtract recovered workers).
> * Don't comlete slot request until minimum number of slots are registered, 
> and throw exeception when exceed maximum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)