[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-06-06 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852748#comment-17852748
 ] 

Matthias Pohl edited comment on FLINK-35035 at 6/6/24 11:47 AM:


Thanks for the pointer, [~dmvk]. We looked into this issue while working on 
[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]
 (which is kind of related) and plan to do a follow-up FLIP that will align the 
resource controlling mechanism of the {{{}AdaptiveScheduler{}}}'s 
{{WaitingForResources}} and {{Executing}} states.

Currently, we have parameters intervening in the rescaling in different places 
([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min],
 
[j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max]
 being utilized in {{Executing}} and 
[j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout]
 being utilized in {{{}WaitingForResources){}}}. Having a 
{{resource-stabilization}} phase in {{Executing}} should resolve the problem 
described in this Jira issue here.


was (Author: mapohl):
Thanks for the pointer, [~dmvk]. We looked into this issue while working on 
[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]
 (which is kind of related) and plan to do a follow-up FLIP that will align the 
resource controlling mechanism of the {{AdaptiveScheduler}}'s 
{{WaitingForResources}} and {{Executing}} states. 

Currently, we have parameters intervening in the rescaling in different places 
([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min],
 
[j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max]
 being utilized in {{Executing}} and 
[j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout)
 being utilized in {{WaitingForResources}}). Having a 
{{resource-stabilization}} phase in {{Executing}} should resolve the problem 
described in this Jira issue here.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836515#comment-17836515
 ] 

Etienne Chauchot edited comment on FLINK-35035 at 4/12/24 9:10 AM:
---

I think [~dmvk] has started to think about that.  He might already have 
suggestions for improving the overall rescale timeout


was (Author: echauchot):
I think [~dmvk] has started to think about that. 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17836395#comment-17836395
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/12/24 6:08 AM:
-

[~echauchot] 

Thank you for your reply.
I think you are looking at this scene from the perspective of Reactive Mode, 
because Reactive Mode only uses the resources of the cluster as a criterion for 
task parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. I just use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max or 
min-parallelism-increase


was (Author: JIRAUSER296932):
[~echauchot] 

Thank you for your reply.

You should look at this issue from the perspective of Reactive Mode, because 
Reactive Mode only uses the resources of the cluster as a criterion for task 
parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. But I use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-09 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835564#comment-17835564
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/10/24 2:19 AM:
-

[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the cluster is started. 
Assuming it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?

 

Maybe can add a switch similar to 
jobmanager.adaptive-scheduler.min-parallelism-increase. When the resource 
changes, it will be judged whether the current resource fully meets the 
parallelism requirements of the job. If it is satisfied, rescheduling will be 
triggered directly. If it is not satisfied, it will be rescheduled in after 
scaling-interval.max . WDYT? [~echauchot] 

Looking forward to your reply!

 


was (Author: JIRAUSER296932):
[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the cluster is started. 
Assuming it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-09 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835564#comment-17835564
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/10/24 2:08 AM:
-

[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the cluster is started. 
Assuming it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?
 


was (Author: JIRAUSER296932):
[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the task is started. Assuming 
it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-08 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17835106#comment-17835106
 ] 

yuanfenghu edited comment on FLINK-35035 at 4/9/24 2:22 AM:


[~bgeng777] 
Thank you for your comment. As you understand, I hope that if a new tm occurs 
during the running of the task, the task should not restart immediately, but 
wait for a period of time.
Below I will explain this process in detail:
Taking the example I gave at the beginning, assume that the current task is [v1 
(maxp=10 minp = 1)]  > [v2 (maxp=10, minp=1)], but the total number of slots in 
the cluster is 5, so the task is running [v1 p5]>[v2 p5] runs. I have now added 
5 slots. I hope the task will run with [v1 p10]->[v2 p10]. When the number of 
cluster slots becomes 6, the task will immediately trigger a restart. At this 
time, according to the 
jobmanager.adaptive-scheduler.resource-stabilization-timeout parameter, the 
task will wait for a period of time during the restart phase for resources. If 
the slot does not reach the target slot number of 10 during this period, the 
task will run with a lower degree of parallelism. , but my slots will be added 
to 10 over a period of time, so this will trigger another expansion and restart 
process. In this process, I have one more restart process and one more resource 
waiting process. Why don't we start before the first restart? Should I wait for 
a period of time or determine that the number of slots meets my p=10 before 
triggering the restart (scale up) action?

 
 [~echauchot]  hi, can you help me look into this issue, it seems similar to 
FLINK-21883.
 
Thanks
 
 


was (Author: JIRAUSER296932):
[~bgeng777] 
Thank you for your comment. As you understand, I hope that if a new tm occurs 
during the running of the task, the task should not restart immediately, but 
wait for a period of time.
Below I will explain this process in detail:
Taking the example I gave at the beginning, assume that the current task is [v1 
(maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)], but the total number of slots in 
the cluster is 5, so the task is running [v1 p5]->[v2 p5] runs. I have now 
added 5 slots. I hope the task will run with [v1 p10]->[v2 p10]. When the 
number of cluster slots becomes 6, the task will immediately trigger a restart. 
At this time, according to the 
jobmanager.adaptive-scheduler.resource-stabilization-timeout parameter, the 
task will wait for a period of time during the restart phase for resources. If 
the slot does not reach the target slot number of 10 during this period, the 
task will run with a lower degree of parallelism. , but my slots will be added 
to 10 over a period of time, so this will trigger another expansion and restart 
process. In this process, I have one more restart process and one more resource 
waiting process. Why don't we start before the first restart? Should I wait for 
a period of time or determine that the number of slots meets my p=10 before 
triggering the restart (scale up) action?

 
 [~echauchot]  hi, can you help me look into this issue, it seems similar to 
FLINK-21883.
 
Thanks
 
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)