[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17059911#comment-17059911 ] Yangze Guo commented on FLINK-15959: FYI: As there is no response for nearly two weeks. I open FLINK-16605 to work on the max limit of the total number of slots. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17054589#comment-17054589 ] Yangze Guo commented on FLINK-15959: Hi, [~liuyufei]. I'm very interested and pay close attention to max limitation. Would you like to share the current progress and when will you finish this work? If you do not have enough bandwidth, shall we limit the scope of this ticket to min limitation and I'll open another ticket to work on the max? WDYT? > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17041609#comment-17041609 ] Yangze Guo commented on FLINK-15959: Hi, [~liuyufei]. I have basic some ideas from the perspective of SlotManager to share. Hope these could help you. >From the perspective of SlotManager, the min/max limitation is mainly related >to the "slots" and "pendingSlots" of SlotManager. Thus, I want to first >summarize the scenarios in which we maintain these two collections: *Regarding the "slots"*: * The number of elements will be reduced when TaskExecutor closed because of heartbeat timeout, idle timeout, etc. * The number of elements will be increased only in the registration of new TaskExecutor. *Regarding the "pendingSlots"*: * The number of elements will be reduced only in the registration of new TaskExecutor. * The number of elements will be increased only in the resource allocation. The union of these two collections defines the total resource/slots the SlotManager expect, especially after the FLINK-14106. So, both minimum/maximum limitations should act on the total number of slots of both existing and pending. *Regarding the min limit*, the sum of the two collections would reduce only when TaskExecutor closed. We need the check the min limit and decide should we allocate new resources in that scenario. *Regarding the max limit*, * We need to check the max limit before we allocate resource in SlotManager#registerSlot * When TaskExecutor closed, we also need to check if there is pending SlotReuqest which not assign to a slot or pending slot. If there is, we need to allocate resources while respecting the max limit. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040197#comment-17040197 ] Till Rohrmann commented on FLINK-15959: --- I would strongly discourage to not touch any of the scheduling components on the {{JobManager}} side because of the above-mentioned reasons. They are being actively worked on and will change quite significantly in the foreseeable future. Instead I would suggest the following (approximative) solution with the corresponding approach: 1. Introduce the {{cluster.number-of-slots.min}} and {{cluster.number-of-slots.max}} configuration options. 2. Make the {{SlotManager}} respect the min and max number of slots but do not block the scheduling on whether {{min}} has reached 3. Have a special {{SlotManager}} implementation which only starts fulfilling slot requests if we have equal or more slots acquired than the configured minimum With 1+2 we can already solve the case where we have a Yarn session cluster which has enough time to acquire the required solutions before jobs are being submitted. With 3. it should also work for the per-job cluster. The case where we won't necessarily accomplish a perfect scheduling might be the failover case. But I think this is fine because one design principle of the existing scheduler was that all slots can be treated equally (modulo their resource specifications). And in doubt, it is better to make some progress with a not optimal scheduling than to make no progress while waiting on the perfect scheduling. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17040019#comment-17040019 ] Xintong Song commented on FLINK-15959: -- [~liuyufei] It's ok. There's nothing to be sorry for. It's always good to have such discussions to talk things through. :) > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039932#comment-17039932 ] YufeiLiu commented on FLINK-15959: -- [~xintongsong] Sorry, I was wrong about that, it won't get worse no matter how many times it's been restarted, only affect by the number of lost TMs. Sorry about wasting your time. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039879#comment-17039879 ] Xintong Song commented on FLINK-15959: -- [~liuyufei] bq. but the situation could get worse after a few times restart. I don't understand why is that. More failovers does not necessarily make {{ SlotPool }} caches less slots/TMs. Shouldn't it only fail to cache slots from recent lost TMs? Flink should have already launched new TMs for those lost in previous attempts, right? > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039875#comment-17039875 ] YufeiLiu commented on FLINK-15959: -- [~xintongsong] I'm worried about the uncertainty, lost a TM won't make much difference, but the situation could get worse after a few times restart. If source operator parallelism is less than others, the source tasks are tend to converge on a few TMs after several times TM lost. I think combine the function of {{SlotPool}} and {{ResourceManager}} is a good idea for the long term, and these config will work in any case if JobMaster didn't caching slot. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039668#comment-17039668 ] Xintong Song commented on FLINK-15959: -- [~liuyufei] bq. otherwise scheduler will use slots from SlotPool until available slots are empty, this also can cause load balance issue. Not sure about this. In case of job failover, all the slots in {{SlotPool}} should be free when recovering the job, and the `evenly-spread-out-slots` introduced in FLINK-12122 applies to those slots in {{SlotPool}} as well. It's true that we cannot always guarantee to evenly spread out slots across all the TMs in case of TM lost. We may first spread out slot requests across the existing TMs (whose slots are still in {{SlotPool}}), and the remaining unfulfilled slot requests will be allocated to new TMs once the minimum slots are registered to RM. I think this might provide good enough load balance in most cases, especially in failovers cased by only one or a few TMs are lost. bq. maybe should implement it in scheduler, allocate enough resource before execution scheduling. I think this is a good idea for solving the load balance problem. My concern it that, it might be conflict with some of the community road map plans. - One thing I'm aware of is that, the community is planning for a declarative resource management approach, in which a job declares its all resource requirements at once, instead of currently requesting slots for individual task/execution separately. This effort is still in early design discussions, and may not be finished in the next release. But I would try to avoid making many changes to {{Scheduler}} and {{SlotPool}} at this time considering they might be changed again in short time. - Another thing that I heard of, not completely sure about this, is that people are considering getting rid of {{SlotPool}}, or at least make it as less responsibility as possible. Because currently we do not benefit much from caching slots in {{SlotPool}}, but suffers from the complication that resources are managed at two places, the {{SlotPool}} and the {{ResourceManager}}. That's also why I do not like the idea of adding more responsibility to {{SlotPool}}. I'm not very familiar {{Scheduler}} and {{SlotPool}} though. Maybe [~trohrmann] or [~zhuzh] could chime in. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17039149#comment-17039149 ] YufeiLiu commented on FLINK-15959: -- [~xintongsong] Hi, I've got a problem and I need some advice. How should I deal with when job have restarted because of TaskExecutors has lost, seems like have to block the scheduling thread until cluster has been recovered, otherwise scheduler will use slots from SlotPool until available slots are empty, this also can cause load balance issue. I feel like have a wrong direction, maybe shoule implement it in scheduler, allocate enough resource before execution scheduling. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038086#comment-17038086 ] Xintong Song commented on FLINK-15959: -- bq. How about put it into ClusterOptions, start with "cluster.*". I personally prefer "slotmanager.*", but I'm also ok with "cluster.*". I would try to keep {{ClusterOptions}} as concise as possible, maybe only common configurations that are related to all distributed components. But my opinion on this is not strong. As I said, I'm ok with either of the two ways. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038072#comment-17038072 ] YufeiLiu commented on FLINK-15959: -- [~xintongsong]Got it, check total Executors less than maximum before startNewWorker, return a empty list if exceed the limitation. How about put it into ClusterOptions, start with "cluster.*". > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038071#comment-17038071 ] YufeiLiu commented on FLINK-15959: -- [~xintongsong] Got it, check total Executors less than maximum before startNewWorker, return a empty list if exceed the limitation. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038036#comment-17038036 ] Xintong Song commented on FLINK-15959: -- Minor: I would suggest to replace "taskmanager.[minimaum|maximum].numberOfTotalSlots" with "slotmanager.[min|max]-slots". "taskmanager.*" are usually per-TM configurations, while what we are discussing are cluster level min/max limits. Besides, IIUC, the configuration option should be used mostly by the resource/slot manager rather than task managers. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038027#comment-17038027 ] Xintong Song commented on FLINK-15959: -- [~liuyufei], thanks for updating the proposal. Regarding the max limit, I think RM should guarantee that it is not exceeded. To be specific, RM can check how many worker / slots are already started, including registered and pending ones, and reject to start new workers if reaching the max limit. The background of having a max limit is to control the resource consumption a bath job, so that it can be executed with much less slots than its parallelism without using up the cluster resources. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037840#comment-17037840 ] Yangze Guo commented on FLINK-15959: Yes, I think it is equivalent to the "maximum slots number" without the FLIP-56. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037773#comment-17037773 ] YufeiLiu commented on FLINK-15959: -- [~karmagyz] Thanks for your suggestion, I did ignore the batch-job scenario. I think maybe I should implement a prototype to confirm how much impact it will have, I will open a FLIP if it's necessary. About "maximum resource limitation for task executors", did you mean limit the total resource of all task executors? If so, is that equivalent to the "maximum slots number" if slot resource is static? > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037719#comment-17037719 ] Yangze Guo commented on FLINK-15959: Hi, [~liuyufei]. Thanks for proposing this change. Regarding your suggestion, I think ResourceManager does not need to throw an exception when exceeding the maximum limit. AFAIK, for batch jobs, the job graph could be executed without all the slot requests are fulfilled. We may just give an information-level log in this scenario. For stream jobs, those slot requests could not be fulfilled would fail by the timeout check. BTW, since it touches the Public interface, I think we need to open a FLIP for this change. I also wanna introduce the maximum resource limitation for task executors, there could be an interrelationship between the maximum and minimum limit. Would you like to work for it together? > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037130#comment-17037130 ] YufeiLiu commented on FLINK-15959: -- I've made some modification of description according to the comments, please tell me if I've missed anything. [~sewen] Can I take this task if the improvement is necessary. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037127#comment-17037127 ] YufeiLiu commented on FLINK-15959: -- I've made some modification of description according to the comments, please tell me if I've missed anything. Stephan Ewen Can I take this task if the improvement is necessary. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15959) Add min/max number of slots configuration to limit total number of slots
[ https://issues.apache.org/jira/browse/FLINK-15959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037120#comment-17037120 ] YufeiLiu commented on FLINK-15959: -- I've made some modification of description according to the comments. Please tell me if I've missed anything. > Add min/max number of slots configuration to limit total number of slots > > > Key: FLINK-15959 > URL: https://issues.apache.org/jira/browse/FLINK-15959 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.11.0 >Reporter: YufeiLiu >Priority: Major > > Flink removed `-n` option after FLIP-6, change to ResourceManager start a new > worker when required. But I think maintain a certain amount of slots is > necessary. These workers will start immediately when ResourceManager starts > and would not release even if all slots are free. > Here are some resons: > # Users actually know how many resources are needed when run a single job, > initialize all workers when cluster starts can speed up startup process. > # Job schedule in topology order, next operator won't schedule until prior > execution slot allocated. The TaskExecutors will start in several batchs in > some cases, it might slow down the startup speed. > # Flink support > [FLINK-12122|https://issues.apache.org/jira/browse/FLINK-12122] [Spread out > tasks evenly across all available registered TaskManagers], but it will only > effect if all TMs are registered. Start all TMs at begining can slove this > problem. > *suggestion:* > * Add config "taskmanager.minimum.numberOfTotalSlots" and > "taskmanager.maximum.numberOfTotalSlots", default behavior is still like > before. > * Start plenty number of workers to satisfy minimum slots when > ResourceManager accept leadership(subtract recovered workers). > * Don't comlete slot request until minimum number of slots are registered, > and throw exeception when exceed maximum. -- This message was sent by Atlassian Jira (v8.3.4#803005)