[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852748#comment-17852748 ] Matthias Pohl edited comment on FLINK-35035 at 6/6/24 11:47 AM: Thanks for the pointer, [~dmvk]. We looked into this issue while working on [FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler] (which is kind of related) and plan to do a follow-up FLIP that will align the resource controlling mechanism of the {{{}AdaptiveScheduler{}}}'s {{WaitingForResources}} and {{Executing}} states. Currently, we have parameters intervening in the rescaling in different places ([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min], [j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max] being utilized in {{Executing}} and [j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout] being utilized in {{{}WaitingForResources){}}}. Having a {{resource-stabilization}} phase in {{Executing}} should resolve the problem described in this Jira issue here. was (Author: mapohl): Thanks for the pointer, [~dmvk]. We looked into this issue while working on [FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler] (which is kind of related) and plan to do a follow-up FLIP that will align the resource controlling mechanism of the {{AdaptiveScheduler}}'s {{WaitingForResources}} and {{Executing}} states. Currently, we have parameters intervening in the rescaling in different places ([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min], [j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max] being utilized in {{Executing}} and [j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout) being utilized in {{WaitingForResources}}). Having a {{resource-stabilization}} phase in {{Executing}} should resolve the problem described in this Jira issue here. > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836515#comment-17836515 ] Etienne Chauchot edited comment on FLINK-35035 at 4/12/24 9:10 AM: --- I think [~dmvk] has started to think about that. He might already have suggestions for improving the overall rescale timeout was (Author: echauchot): I think [~dmvk] has started to think about that. > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836395#comment-17836395 ] yuanfenghu edited comment on FLINK-35035 at 4/12/24 6:08 AM: - [~echauchot] Thank you for your reply. I think you are looking at this scene from the perspective of Reactive Mode, because Reactive Mode only uses the resources of the cluster as a criterion for task parallelism. I don’t know if I understand it correctly. But my above scenario is in non-Reactive Mode. I just use the adaptive scheduler, which means that I increase the parallelism of the running task from 10 to 12. However, because min-parallelism-increase=5, I am satisfied in the cluster slot. When the condition of 12 is met, the expansion of the task cannot be triggered immediately, but it needs to wait for scaling-interval.max before the expansion can be triggered. My purpose is to trigger the expansion when the parallelism of 12 is met, instead of having to after scaling-interval.max or min-parallelism-increase was (Author: JIRAUSER296932): [~echauchot] Thank you for your reply. You should look at this issue from the perspective of Reactive Mode, because Reactive Mode only uses the resources of the cluster as a criterion for task parallelism. I don’t know if I understand it correctly. But my above scenario is in non-Reactive Mode. But I use the adaptive scheduler, which means that I increase the parallelism of the running task from 10 to 12. However, because min-parallelism-increase=5, I am satisfied in the cluster slot. When the condition of 12 is met, the expansion of the task cannot be triggered immediately, but it needs to wait for scaling-interval.max before the expansion can be triggered. My purpose is to trigger the expansion when the parallelism of 12 is met, instead of having to after scaling-interval.max > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835564#comment-17835564 ] yuanfenghu edited comment on FLINK-35035 at 4/10/24 2:19 AM: - [~echauchot] Thank you for your reply, but I have some questions: jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on jobmanager, so I cannot update this value after the cluster is started. Assuming it is set to 5, this time it causes some problems: The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] -> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I only need to add 2 slots to meet the requirements, but because min-parallelism-increase was not reached, So this will not cause the task to trigger expansion. It needs to wait until scaling-interval.max is reached before triggering (scaling-interval.max needs to be set first). I think in this case, should I add a configuration item to support its triggering? Maybe can add a switch similar to jobmanager.adaptive-scheduler.min-parallelism-increase. When the resource changes, it will be judged whether the current resource fully meets the parallelism requirements of the job. If it is satisfied, rescheduling will be triggered directly. If it is not satisfied, it will be rescheduled in after scaling-interval.max . WDYT? [~echauchot] Looking forward to your reply! was (Author: JIRAUSER296932): [~echauchot] Thank you for your reply, but I have some questions: jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on jobmanager, so I cannot update this value after the cluster is started. Assuming it is set to 5, this time it causes some problems: The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] -> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I only need to add 2 slots to meet the requirements, but because min-parallelism-increase was not reached, So this will not cause the task to trigger expansion. It needs to wait until scaling-interval.max is reached before triggering (scaling-interval.max needs to be set first). I think in this case, should I add a configuration item to support its triggering? > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835564#comment-17835564 ] yuanfenghu edited comment on FLINK-35035 at 4/10/24 2:08 AM: - [~echauchot] Thank you for your reply, but I have some questions: jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on jobmanager, so I cannot update this value after the cluster is started. Assuming it is set to 5, this time it causes some problems: The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] -> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I only need to add 2 slots to meet the requirements, but because min-parallelism-increase was not reached, So this will not cause the task to trigger expansion. It needs to wait until scaling-interval.max is reached before triggering (scaling-interval.max needs to be set first). I think in this case, should I add a configuration item to support its triggering? was (Author: JIRAUSER296932): [~echauchot] Thank you for your reply, but I have some questions: jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on jobmanager, so I cannot update this value after the task is started. Assuming it is set to 5, this time it causes some problems: The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] -> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I only need to add 2 slots to meet the requirements, but because min-parallelism-increase was not reached, So this will not cause the task to trigger expansion. It needs to wait until scaling-interval.max is reached before triggering (scaling-interval.max needs to be set first). I think in this case, should I add a configuration item to support its triggering? > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode
[ https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835106#comment-17835106 ] yuanfenghu edited comment on FLINK-35035 at 4/9/24 2:22 AM: [~bgeng777] Thank you for your comment. As you understand, I hope that if a new tm occurs during the running of the task, the task should not restart immediately, but wait for a period of time. Below I will explain this process in detail: Taking the example I gave at the beginning, assume that the current task is [v1 (maxp=10 minp = 1)] > [v2 (maxp=10, minp=1)], but the total number of slots in the cluster is 5, so the task is running [v1 p5]>[v2 p5] runs. I have now added 5 slots. I hope the task will run with [v1 p10]->[v2 p10]. When the number of cluster slots becomes 6, the task will immediately trigger a restart. At this time, according to the jobmanager.adaptive-scheduler.resource-stabilization-timeout parameter, the task will wait for a period of time during the restart phase for resources. If the slot does not reach the target slot number of 10 during this period, the task will run with a lower degree of parallelism. , but my slots will be added to 10 over a period of time, so this will trigger another expansion and restart process. In this process, I have one more restart process and one more resource waiting process. Why don't we start before the first restart? Should I wait for a period of time or determine that the number of slots meets my p=10 before triggering the restart (scale up) action? [~echauchot] hi, can you help me look into this issue, it seems similar to FLINK-21883. Thanks was (Author: JIRAUSER296932): [~bgeng777] Thank you for your comment. As you understand, I hope that if a new tm occurs during the running of the task, the task should not restart immediately, but wait for a period of time. Below I will explain this process in detail: Taking the example I gave at the beginning, assume that the current task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)], but the total number of slots in the cluster is 5, so the task is running [v1 p5]->[v2 p5] runs. I have now added 5 slots. I hope the task will run with [v1 p10]->[v2 p10]. When the number of cluster slots becomes 6, the task will immediately trigger a restart. At this time, according to the jobmanager.adaptive-scheduler.resource-stabilization-timeout parameter, the task will wait for a period of time during the restart phase for resources. If the slot does not reach the target slot number of 10 during this period, the task will run with a lower degree of parallelism. , but my slots will be added to 10 over a period of time, so this will trigger another expansion and restart process. In this process, I have one more restart process and one more resource waiting process. Why don't we start before the first restart? Should I wait for a period of time or determine that the number of slots meets my p=10 before triggering the restart (scale up) action? [~echauchot] hi, can you help me look into this issue, it seems similar to FLINK-21883. Thanks > Reduce job pause time when cluster resources are expanded in adaptive mode > -- > > Key: FLINK-35035 > URL: https://issues.apache.org/jira/browse/FLINK-35035 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.19.0 >Reporter: yuanfenghu >Priority: Minor > > When 'jobmanager.scheduler = adaptive' , job graph changes triggered by > cluster expansion will cause long-term task stagnation. We should reduce this > impact. > As an example: > I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)] > When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5] > When I add slots the task will trigger jobgraph changes,by > org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable, > However, the five new slots I added were not discovered at the same time (for > convenience, I assume that a taskmanager has one slot), because no matter > what environment we add, we cannot guarantee that the new slots will be added > at once, so this will cause onNewResourcesAvailable triggers repeatedly > ,If each new slot action has a certain interval, then the jobgraph will > continue to change during this period. What I hope is that there will be a > stable time to configure the cluster resources and then go to it after the > number of cluster slots has been stable for a certain period of time. Trigger > jobgraph changes to avoid this situation -- This message was sent by Atlassian Jira (v8.20.10#820010)