[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-08-08 Thread Zdenek Tison (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872011#comment-17872011
 ] 

Zdenek Tison commented on FLINK-35035:
--

This task will be used as a parent task for changes proposed in 
[FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states]

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Assignee: Zdenek Tison
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-07-14 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865776#comment-17865776
 ] 

yuanfenghu commented on FLINK-35035:


[~ztison] 
Thanks, I will look into FLIP in detail

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-07-12 Thread Zdenek Tison (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865350#comment-17865350
 ] 

Zdenek Tison commented on FLINK-35035:
--

Hi, [~echauchot] [~heigebupahei]

I took over [~mapohl]'s work and prepared the FLIP document on the discussed 
topic. Please take a look if the topic is still relevant to you. Thanks

https://lists.apache.org/thread/krnjv8fm62nbnrljmk3bfoons86pc1dw

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-06-06 Thread Matthias Pohl (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852748#comment-17852748
 ] 

Matthias Pohl commented on FLINK-35035:
---

Thanks for the pointer, [~dmvk]. We looked into this issue while working on 
[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]
 (which is kind of related) and plan to do a follow-up FLIP that will align the 
resource controlling mechanism of the {{AdaptiveScheduler}}'s 
{{WaitingForResources}} and {{Executing}} states. 

Currently, we have parameters intervening in the rescaling in different places 
([j.a.scaling-interval.min|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-min],
 
[j.a.scaling-interval.max|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-scaling-interval-max]
 being utilized in {{Executing}} and 
[j.a.resource-stabilization-timeout|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#jobmanager-adaptive-scheduler-resource-stabilization-timeout)
 being utilized in {{WaitingForResources}}). Having a 
{{resource-stabilization}} phase in {{Executing}} should resolve the problem 
described in this Jira issue here.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-06-06 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852668#comment-17852668
 ] 

David Morávek commented on FLINK-35035:
---

Yes, we have this on our radar. [~mapohl] is currently looking into this 
direction. The main idea is to move resource stabilization timeout from 
WaitingForResources into Executing state.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836515#comment-17836515
 ] 

Etienne Chauchot commented on FLINK-35035:
--

I think [~dmvk] has started to think about that. 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836508#comment-17836508
 ] 

yuanfenghu commented on FLINK-35035:


> It is not slot per slot rescale, there will be only one rescale in these 
>cases:
 * if the TM comes with 2 slots at once
 * if the second slot comes during the stabilization timeout

 
Yes, there is only one scaling in both cases. However, if a tm has only one 
slot and the time interval between registration and jm is relatively long, it 
will be triggered twice.
 
> That being said, I know there is an ongoing reflection in the community to 
> decrease the overall timeouts during rescale.

good, Can you please help me @ the person who worked on this? Maybe can follow 
up on this question, thanks!
 
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836507#comment-17836507
 ] 

Etienne Chauchot commented on FLINK-35035:
--

> if min-parallelism-increase=1 Then my job may trigger the scaling process 
> twice when I change the number of slots from 10 to 12

It is not slot per slot rescale, there will be only one rescale in these cases:
 * if the TM comes with 2 slots at once
 * if the second slot comes during the stabilization timeout

That being said, I know there is an ongoing reflection in the community to 
decrease the overall timeouts during rescale.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836504#comment-17836504
 ] 

yuanfenghu commented on FLINK-35035:


[~echauchot] 
Thank you for your patience in tracking this issue!
 
> The only thing is that you will have more frequent rescales (each time a slot 
> is added to the cluster) modulo slots that are added during the stabilization 
> period that do not lead to a rescale.

 
This is the problem. Imagine that I originally wanted to adjust the parallelism 
degree from 10 to 12. My execution step is to first adjust the maximum 
parallelism degree of my job to 12 through the rest api, and then I add tm to 
the cluster. If min-parallelism-increase=1 Then my job may trigger the scaling 
process twice when I change the number of slots from 10 to 12. This process may 
last for minutes, if min-parallelism-increase > 2, such as 5, Then my job has 
to wait until scaling-interval.max before scaling. I think we can optimize this 
process ,let the job trigger scaling exactly when slot becomes 12
 
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-12 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836462#comment-17836462
 ] 

Etienne Chauchot commented on FLINK-35035:
--

with adaptive scheduler, the jobMaster declares resources needed with a min and 
a max. The only difference with reactive mode is that the max is +INF. Here we 
are talking about declaring min resources needed. So unless there is something 
I missed, I'm not sure reactive mode is relevant here.

If I understand correctly,  what you want in the end is to use whatever new 
slots arrive in the cluster with a minimal waiting period.  So why not just 
leave default min-parallelism-increase=1, leave default scaling-interval.max 
unset and change default scaling-interval.min of 30s to 0s ?

The only thing is that you will have more frequent rescales (each time a slot 
is added to the cluster) modulo slots that are added during the stabilization 
period that do not lead to a rescale.

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-11 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836395#comment-17836395
 ] 

yuanfenghu commented on FLINK-35035:


[~echauchot] 

Thank you for your reply.

You should look at this issue from the perspective of Reactive Mode, because 
Reactive Mode only uses the resources of the cluster as a criterion for task 
parallelism. I don’t know if I understand it correctly.

But my above scenario is in non-Reactive Mode. But I use the adaptive 
scheduler, which means that I increase the parallelism of the running task from 
10 to 12. However, because min-parallelism-increase=5, I am satisfied in the 
cluster slot. When the condition of 12 is met, the expansion of the task cannot 
be triggered immediately, but it needs to wait for scaling-interval.max before 
the expansion can be triggered. My purpose is to trigger the expansion when the 
parallelism of 12 is met, instead of having to after scaling-interval.max

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-10 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835677#comment-17835677
 ] 

Etienne Chauchot commented on FLINK-35035:
--

> It needs to wait until scaling-interval.max is reached before triggering

Yes or wait until 5 slots are there. 

>When the resource changes, it will be judged whether the current resource 
>fully meets the parallelism requirements of the job. If it is satisfied, 
>rescheduling will be triggered directly. If it is not satisfied, it will be 
>rescheduled in after scaling-interval.max

It is already how things work when you set min-parallelism-increase and 
scaling-interval.max

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-09 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835564#comment-17835564
 ] 

yuanfenghu commented on FLINK-35035:


[~echauchot] 
Thank you for your reply, but I have some questions:

jobmanager.adaptive-scheduler.min-parallelism-increase is a parameter on 
jobmanager, so I cannot update this value after the task is started. Assuming 
it is set to 5, this time it causes some problems:

The original task is [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]. If I 
call restapi, the parallelism is overwritten to the new [v1 (maxp=12 minp = 1)] 
-> [v2 (maxp=12, minp=1)], then I added slots to the cluster, but obviously I 
only need to add 2 slots to meet the requirements, but because 
min-parallelism-increase was not reached, So this will not cause the task to 
trigger expansion. It needs to wait until scaling-interval.max is reached 
before triggering (scaling-interval.max needs to be set first). I think in this 
case, should I add a configuration item to support its triggering?
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-09 Thread Etienne Chauchot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835434#comment-17835434
 ] 

Etienne Chauchot commented on FLINK-35035:
--

FLINK-21883 cooldown period was mainly designed to avoid too frequent rescales. 
Here is how it works when new slots are available:
 - Flink should rescale immediately only if last rescale was done more than 
scaling-interval.min (default 30s) ago.
 - Otherwise it should schedule a rescale at (now + scaling-interval.min) point 
in time.
The rescale is done like this:
 - if minimum scaling requirements are met (AdaptiveScheduler#shouldRescale 
default to minimum 1 slot added), the job is restarted with new parallelism
 - if minimum scaling requirements are not met
 -- if last rescale was done more than scaling-interval.max ago (default 
disabled), a rescale is forced.
 -- otherwise, schedule a forced rescale in scaling-interval.max

So in your case of slots arriving partially during the resource stabilization 
timeout leading to a rescale with only a portion of the ideal number of slots, 
what I see is that you can either:
1. increase the stabilization timeout hopping you'll get all the slots during 
that time
2. set min-parallelism-increase to 5 instead of default 1 and set 
scaling-interval.max. That way the first slots additions will not trigger a 
rescale but the rescale will be issued only when the 5th slot arrives and you 
will still get a security force rescale scheduled no matter what (as long as 
the parallelism has changed) after scaling-interval.max

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-08 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835106#comment-17835106
 ] 

yuanfenghu commented on FLINK-35035:


[~bgeng777] 
Thank you for your comment. As you understand, I hope that if a new tm occurs 
during the running of the task, the task should not restart immediately, but 
wait for a period of time.
Below I will explain this process in detail:
Taking the example I gave at the beginning, assume that the current task is [v1 
(maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)], but the total number of slots in 
the cluster is 5, so the task is running [v1 p5]->[v2 p5] runs. I have now 
added 5 slots. I hope the task will run with [v1 p10]->[v2 p10]. When the 
number of cluster slots becomes 6, the task will immediately trigger a restart. 
At this time, according to the 
jobmanager.adaptive-scheduler.resource-stabilization-timeout parameter, the 
task will wait for a period of time during the restart phase for resources. If 
the slot does not reach the target slot number of 10 during this period, the 
task will run with a lower degree of parallelism. , but my slots will be added 
to 10 over a period of time, so this will trigger another expansion and restart 
process. In this process, I have one more restart process and one more resource 
waiting process. Why don't we start before the first restart? Should I wait for 
a period of time or determine that the number of slots meets my p=10 before 
triggering the restart (scale up) action?

 
 [~echauchot]  hi, can you help me look into this issue, it seems similar to 
FLINK-21883.
 
Thanks
 
 

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35035) Reduce job pause time when cluster resources are expanded in adaptive mode

2024-04-07 Thread Biao Geng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834752#comment-17834752
 ] 

Biao Geng commented on FLINK-35035:
---

I am not very familiar with adaptive scheduler, maybe others can share more 
insights. I just want to ask a question to make sure we are on the same page.
Do you mean that instead of triggering onNewResourcesAvailable repeatedly once 
a new slot is found(in your example, 5 new slots so 5 times of rescheduling), 
you are expecting the JM can discover the 5 new slots at the same time after a 
configurable period of time and only trigger 1 time of rescheduling?

> Reduce job pause time when cluster resources are expanded in adaptive mode
> --
>
> Key: FLINK-35035
> URL: https://issues.apache.org/jira/browse/FLINK-35035
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.19.0
>Reporter: yuanfenghu
>Priority: Minor
>
> When 'jobmanager.scheduler = adaptive' , job graph changes triggered by 
> cluster expansion will cause long-term task stagnation. We should reduce this 
> impact.
> As an example:
> I have jobgraph for : [v1 (maxp=10 minp = 1)] -> [v2 (maxp=10, minp=1)]
> When my cluster has 5 slots, the job will be executed as [v1 p5]->[v2 p5]
> When I add slots the task will trigger jobgraph changes,by
> org.apache.flink.runtime.scheduler.adaptive.ResourceListener#onNewResourcesAvailable,
> However, the five new slots I added were not discovered at the same time (for 
> convenience, I assume that a taskmanager has one slot), because no matter 
> what environment we add, we cannot guarantee that the new slots will be added 
> at once, so this will cause onNewResourcesAvailable triggers repeatedly
> ,If each new slot action has a certain interval, then the jobgraph will 
> continue to change during this period. What I hope is that there will be a 
> stable time to configure the cluster resources  and then go to it after the 
> number of cluster slots has been stable for a certain period of time. Trigger 
> jobgraph changes to avoid this situation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)