[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling

2024-09-07 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17880026#comment-17880026
 ] 

Maximilian Michels commented on FLINK-36018:


Thanks Rui! Great new feature. Looking forward to trying it out soon. I haven’t 
looked at the code yet. How does it behave when the existing grace-period is 
used?

> Support lazy scale down to avoid frequent rescaling
> ---
>
> Key: FLINK-36018
> URL: https://issues.apache.org/jira/browse/FLINK-36018
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.10.0
>
>
> {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent 
> lags, and make scaling down insensitive to reduce restart frequency.
> h1. Background & Motivation
> We enabled autoscaler scaling for a few flink production jobs. It works with 
> Adaptive Scheduler and Rescale api.
> Scaling results:
>  * The recommended parallelism meets expectations most of the time
>  * When the source traffic increases, the autoscaler scales up the job in 
> time to prevent lags.
>  * When the source traffic decreases, the autoscaler scales down job in time 
> to save resources
>  * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a 
> day (job.autoscaler.metrics.window=15 min by default).
> As we all know, the job will be unavailable for a while during the restart 
> for some reasons:
>  * Cancel job
>  * Request resources( 
> [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states]
>  is optimizing it)
>  * Initialize task
>  * Restore state
>  * Catch up lag during restart
>  * etc
> *{color:#de350b}Expectations:{color}*
>  * Scaling up in time to prevent lags.
>  * Lazy scaling down to reduce downtime and ensure resources can be released 
> later.
> h1. Solution:
>  * Introduce job.autoscaler.scale-down.interval, the default value could be 1 
> hour.
>  * Replace job.autoscaler.scale-up.grace-period with 
> job.autoscaler.scale-down.interval
> Detailed strategies:
>  * Record the start time of the first scale-down event for each vertex 
> separately. For example:
>  ** vertex1: 2024-08-09 01:35:02
>  ** vertex2: 2024-08-09 01:38:02
>  * Scaling down will be triggered for some cases:
>  ** Any vertex needs scale up
>  *** Job restart cannot be avoided, so trigger scale down for another vertex 
> as well if needed
>  *** After scale down, clean up the start time of scale-down.
>  ** The scale down lazy period for any vertex is coming
>  *** current time - min(start time for each vertex) > scale-down.lazy-period
>  *** This means that there is no scaling up during the scaling down lazy 
> period
> Note1: If the recommend parallelism >= current parallelism, the start time of 
> scale-down will be cleaned up for current vertex.
> Note2: The recommended parallelism still comes from the latest 15-minute 
> metrics.For example:
>  * The current parallelism of vertex1 is 100, the traffic is decreased at 
> night.
>  * 2024-08-09 01:00:00, the recommended parallelism is 60.
>  ** The start time of scale down is 2024-08-09 01:00:00.
>  * 2024-08-09 01:15:00, the recommended parallelism is 50.
>  ** Still within the range of scale down lazy period.
>  ** Don't update the start time of scale down.
>  * 2024-08-09 01:31:00, the recommended parallelism is 40.
>  ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the 
> recommended parallelism.
>  ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 
> 01:16:00 to 2024-08-09 01:31:00
> Note3: If users set job.autoscaler.scale-down.interval <=0, we scale down 
> directly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.

2024-09-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878606#comment-17878606
 ] 

Maximilian Michels commented on FLINK-36192:


Thanks for reporting this issue [~heigebupahei]!

If I understand correctly, we try to fix an issue when num_source_partitions > 
max_parallelism, because the current logic will effectively forget the number 
of partitions due to capping it at the vertex maximum parallelism. See [update 
logic|https://github.com/apache/flink-kubernetes-operator/blob/d8568ae28b13f5cc649a83f174dbc88449f0c602/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L287]
 and [capping 
logic|https://github.com/apache/flink-kubernetes-operator/blob/d8568ae28b13f5cc649a83f174dbc88449f0c602/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java#L104].

Due to this prior issue, the rest of the key group alignment logic doesn't work 
correctly, because it assumes that, for sources, the maximum parallelism is the 
the number of partitions. See [alignemnt 
logic|https://github.com/apache/flink-kubernetes-operator/blob/d8568ae28b13f5cc649a83f174dbc88449f0c602/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L299].

To fix the issue, we may have to add an explicit metric for the number of 
source partitions. I don't really see other options because we need both the 
maximum parallelism and the number of source partitions to perform the key 
alignment. We can't lower the maximum parallelism like we have previously done 
when we want to support use cases where the number of partitions exceeds the 
maximum parallelism, and we also need to ensure the maximum parallelism is 
preserved to prevent exceeding it.

> Optimize the logic to make it the common divisor of the partition number of 
> the data source when determining the parallelism of the source task.
> 
>
> Key: FLINK-36192
> URL: https://issues.apache.org/jira/browse/FLINK-36192
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Minor
>
> *Description:*
> We hope that when we know the number of partitions of Kafka data, we can try 
> our best to make the parallelism of tasks that consume Kafka equal to the 
> common divisor of the partitions, so that the tasks that are consumed can be 
> balanced.
>  
> {*}current logic{*}:
> Currently, the parallelism of tasks in the autoscaler is determined as 
> follows:
> step1: Calculate the processing rate of the task target and the corresponding 
> parallelism p1
> step2: Use the currently calculated degree of parallelism and the maximum 
> degree of parallelism of the operator to calculate, and take out the greatest 
> common divisor p2 of the maximum degree of parallelism / 2. If p2 < 
> maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > 
> maxparalleliem / 2 then use p1 as the final parallelism
> If the task that needs to be judged is a task that consumes Kafka or Pulsar, 
> the maximum parallelism of the task will be determined first: if the number 
> of partitions < the maximum parallelism of the current task, then the maximum 
> parallelism of the current task is the number of partitions of Kafka or 
> Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so 
> there are the following situations:
> When the number of partitions in kafka or pulsar is less than the maximum 
> parallelism of the operator
> 1. If the parallelism calculated in step 1  partitions/2, then the demand is met and the number of tasks can be balanced.
> 2. If the parallelism calculated in step 1 > the number of kafka or pulsar 
> partitions / 2, use the parallelism calculated in step 1. At this time, the 
> consumption will become unbalanced. For example, the number of partitions in 
> kafka is 64, and the expected parallelism calculated in step 1 is If the 
> degree is 48, the final task parallelism degree is 48
> When the number of partitions in kafka or pulsar is greater than the maximum 
> parallelism of the operator
> Calculate the parallelism completely according to the logic of step 1. For 
> example, the parallelism of one of my kafka partitions is 200, and the 
> maximum parallelism of the operator is 128. Then the calculated parallelism 
> is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly
>  
> {*}expect logic{*}:
>  * When the number of partitions is less than the maximum parallelism, 
> determine the number of parallelism of the task as the common divisor of the 
> number of partitions.
>  * When the number of partitions is greater than the maximum parallelism, the 

[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling

2024-08-12 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872861#comment-17872861
 ] 

Maximilian Michels commented on FLINK-36018:


Yes, that’s fine with me 👍

> Support lazy scale down to avoid frequent rescaling
> ---
>
> Key: FLINK-36018
> URL: https://issues.apache.org/jira/browse/FLINK-36018
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent 
> lags, and make scaling down insensitive to reduce restart frequency.
> h1. Background & Motivation
> We enabled autoscaler scaling for a few flink production jobs. It works with 
> Adaptive Scheduler and Rescale api.
> Scaling results:
>  * The recommended parallelism meets expectations most of the time
>  * When the source traffic increases, the autoscaler scales up the job in 
> time to prevent lags.
>  * When the source traffic decreases, the autoscaler scales down job in time 
> to save resources
>  * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a 
> day (job.autoscaler.metrics.window=15 min by default).
> As we all know, the job will be unavailable for a while during the restart 
> for some reasons:
>  * Cancel job
>  * Request resources( 
> [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states]
>  is optimizing it)
>  * Initialize task
>  * Restore state
>  * Catch up lag during restart
>  * etc
> *{color:#de350b}Expectations:{color}*
>  * Scaling up in time to prevent lags.
>  * Lazy scaling down to reduce downtime and ensure resources can be released 
> later.
> h1. Solution:
> Introduce job.autoscaler.scale-down.lazy-period, the default value could be 
> 30 min.
> Detailed strategies:
>  * Record the start time of the first scale-down event for each vertex 
> separately. For example:
>  ** vertex1: 2024-08-09 01:35:02
>  ** vertex2: 2024-08-09 01:38:02
>  * Scaling down will be triggered for some cases:
>  ** Any vertex needs scale up
>  *** Job restart cannot be avoided, so trigger scale down for another vertex 
> as well if needed
>  *** After scale down, clean up the start time of scale-down.
>  ** The scale down lazy period for any vertex is coming
>  *** current time - min(start time for each vertex) > scale-down.lazy-period
>  *** This means that there is no scaling up during the scaling down lazy 
> period
> Note1: If the recommend parallelism >= current parallelism, the start time of 
> scale-down will be cleaned up for current vertex.
> Note2: The recommended parallelism still comes from the latest 15-minute 
> metrics.For example:
>  * The current parallelism of vertex1 is 100, the traffic is decreased at 
> night.
>  * 2024-08-09 01:00:00, the recommended parallelism is 60.
>  * 
>  ** The start time of scale down is 2024-08-09 01:00:00.
>  * 2024-08-09 01:15:00, the recommended parallelism is 50.
>  ** Still within the range of scale down lazy period.
>  ** Don't update the start time of scale down.
>  * 2024-08-09 01:31:00, the recommended parallelism is 40.
>  ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the 
> recommended parallelism.
>  ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 
> 01:16:00 to 2024-08-09 01:31:00



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling

2024-08-12 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872850#comment-17872850
 ] 

Maximilian Michels commented on FLINK-36018:


How about autoscaler.scale-down.schedule-interval ? A value of zero would mean 
immediate scale down execution. 

> Support lazy scale down to avoid frequent rescaling
> ---
>
> Key: FLINK-36018
> URL: https://issues.apache.org/jira/browse/FLINK-36018
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent 
> lags, and make scaling down insensitive to reduce restart frequency.
> h1. Background & Motivation
> We enabled autoscaler scaling for a few flink production jobs. It works with 
> Adaptive Scheduler and Rescale api.
> Scaling results:
>  * The recommended parallelism meets expectations most of the time
>  * When the source traffic increases, the autoscaler scales up the job in 
> time to prevent lags.
>  * When the source traffic decreases, the autoscaler scales down job in time 
> to save resources
>  * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a 
> day (job.autoscaler.metrics.window=15 min by default).
> As we all know, the job will be unavailable for a while during the restart 
> for some reasons:
>  * Cancel job
>  * Request resources( 
> [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states]
>  is optimizing it)
>  * Initialize task
>  * Restore state
>  * Catch up lag during restart
>  * etc
> *{color:#de350b}Expectations:{color}*
>  * Scaling up in time to prevent lags.
>  * Lazy scaling down to reduce downtime and ensure resources can be released 
> later.
> h1. Solution:
> Introduce job.autoscaler.scale-down.lazy-period, the default value could be 
> 30 min.
> Detailed strategies:
>  * Record the start time of the first scale-down event for each vertex 
> separately. For example:
>  ** vertex1: 2024-08-09 01:35:02
>  ** vertex2: 2024-08-09 01:38:02
>  * Scaling down will be triggered for some cases:
>  ** Any vertex needs scale up
>  *** Job restart cannot be avoided, so trigger scale down for another vertex 
> as well if needed
>  *** After scale down, clean up the start time of scale-down.
>  ** The scale down lazy period for any vertex is coming
>  *** current time - min(start time for each vertex) > scale-down.lazy-period
>  *** This means that there is no scaling up during the scaling down lazy 
> period
> Note1: If the recommend parallelism >= current parallelism, the start time of 
> scale-down will be cleaned up for current vertex.
> Note2: The recommended parallelism still comes from the latest 15-minute 
> metrics.For example:
>  * The current parallelism of vertex1 is 100, the traffic is decreased at 
> night.
>  * 2024-08-09 01:00:00, the recommended parallelism is 60.
>  * 
>  ** The start time of scale down is 2024-08-09 01:00:00.
>  * 2024-08-09 01:15:00, the recommended parallelism is 50.
>  ** Still within the range of scale down lazy period.
>  ** Don't update the start time of scale down.
>  * 2024-08-09 01:31:00, the recommended parallelism is 40.
>  ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the 
> recommended parallelism.
>  ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 
> 01:16:00 to 2024-08-09 01:31:00



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling

2024-08-12 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872848#comment-17872848
 ] 

Maximilian Michels commented on FLINK-36018:


Name sounds good to me. Slight preference for *.interval over *.delay because a 
delay would imply that we have a fixed delay between deciding to scale down and 
the execution, but it is rather fixed schedule at a regular interval.  

> Support lazy scale down to avoid frequent rescaling
> ---
>
> Key: FLINK-36018
> URL: https://issues.apache.org/jira/browse/FLINK-36018
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent 
> lags, and make scaling down insensitive to reduce restart frequency.
> h1. Background & Motivation
> We enabled autoscaler scaling for a few flink production jobs. It works with 
> Adaptive Scheduler and Rescale api.
> Scaling results:
>  * The recommended parallelism meets expectations most of the time
>  * When the source traffic increases, the autoscaler scales up the job in 
> time to prevent lags.
>  * When the source traffic decreases, the autoscaler scales down job in time 
> to save resources
>  * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a 
> day (job.autoscaler.metrics.window=15 min by default).
> As we all know, the job will be unavailable for a while during the restart 
> for some reasons:
>  * Cancel job
>  * Request resources( 
> [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states]
>  is optimizing it)
>  * Initialize task
>  * Restore state
>  * Catch up lag during restart
>  * etc
> *{color:#de350b}Expectations:{color}*
>  * Scaling up in time to prevent lags.
>  * Lazy scaling down to reduce downtime and ensure resources can be released 
> later.
> h1. Solution:
> Introduce job.autoscaler.scale-down.lazy-period, the default value could be 
> 30 min.
> Detailed strategies:
>  * Record the start time of the first scale-down event for each vertex 
> separately. For example:
>  ** vertex1: 2024-08-09 01:35:02
>  ** vertex2: 2024-08-09 01:38:02
>  * Scaling down will be triggered for some cases:
>  ** Any vertex needs scale up
>  *** Job restart cannot be avoided, so trigger scale down for another vertex 
> as well if needed
>  *** After scale down, clean up the start time of scale-down.
>  ** The scale down lazy period for any vertex is coming
>  *** current time - min(start time for each vertex) > scale-down.lazy-period
>  *** This means that there is no scaling up during the scaling down lazy 
> period
> Note1: If the recommend parallelism >= current parallelism, the start time of 
> scale-down will be cleaned up for current vertex.
> Note2: The recommended parallelism still comes from the latest 15-minute 
> metrics.For example:
>  * The current parallelism of vertex1 is 100, the traffic is decreased at 
> night.
>  * 2024-08-09 01:00:00, the recommended parallelism is 60.
>  * 
>  ** The start time of scale down is 2024-08-09 01:00:00.
>  * 2024-08-09 01:15:00, the recommended parallelism is 50.
>  ** Still within the range of scale down lazy period.
>  ** Don't update the start time of scale down.
>  * 2024-08-09 01:31:00, the recommended parallelism is 40.
>  ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the 
> recommended parallelism.
>  ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 
> 01:16:00 to 2024-08-09 01:31:00



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36022) When scaling.enabled =false, adjust the values ​​​​of some parameters to provide bette recommendation values.

2024-08-09 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872279#comment-17872279
 ] 

Maximilian Michels commented on FLINK-36022:


The idea of having the option to disable scaling execution was to have a 
dry-run mode to test the behavior of the autoscaler with its current 
configuration. 

I understand the idea behind changing certain configuration values in the 
dry-run mode, but it would also mean it’s not a pure dry-run mode anymore. 
That’s why I would suggest to add a different setting for this kind of 
behavior, but perhaps it’s easiest to just set the desired values alongside 
with the scaling=false setting. 

On another note, I’m not sure setting the rescale time to zero would be correct 
even in a dedicated new mode for suggestions. Rescaling will always take some 
time and not accounting for it could void the scaling suggestion we print. 

> When scaling.enabled =false, adjust the values of some parameters to 
> provide bette recommendation values.
> -
>
> Key: FLINK-36022
> URL: https://issues.apache.org/jira/browse/FLINK-36022
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Priority: Major
>
> h1. Background
> We have enabled AUTOSCALER in some scenarios, but we have not enabled 
> job.autoscaler.scaling.enabled because we only want to use AUTOSCALER to 
> obtain resource recommendations for tasks, but some parameters can cause 
> these recommendations to be inaccurate.
> example :
>  * job.autoscaler.scale-down.max-factor
> If set to 0.5, it means that the vertex can be reduced to up to 50% of the 
> original value during scaling. If we do not turn on the 
> job.autoscaler.scaling.enabled parameter, then the recommended value here 
> will only be 100 for a vertex with 200 parallelism. But in fact, this may 
> only require 50 or even lower resources during low periods.
>  * job.autoscaler.restart.time
> This parameter will cause the restart event to be used to calculate the 
> resources required to chase data during expansion, resulting in the 
> recommended resources being too large. However, if 
> job.autoscaler.scaling.enabled =false, the restart time will be 0
>  
> h1. Solution:
>  
> When job.autoscaler.scaling.enabled = false, actively modify the above 
> parameters job.autoscaler.scale-down.max-factor=1, 
> job.autoscaler.restart.time=0
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling

2024-08-09 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872269#comment-17872269
 ] 

Maximilian Michels commented on FLINK-36018:


Thanks for clarifying! I think what you propose makes sense. It improves on the 
original idea of the grace period by optimizing the scheduling of the downscale 
operations. 

> Support lazy scale down to avoid frequent rescaling
> ---
>
> Key: FLINK-36018
> URL: https://issues.apache.org/jira/browse/FLINK-36018
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent 
> lags, and make scaling down insensitive to reduce restart frequency.
> h1. Background & Motivation
> We enabled autoscaler scaling for a few flink production jobs. It works with 
> Adaptive Scheduler and Rescale api.
> Scaling results:
>  * The recommended parallelism meets expectations most of the time
>  * When the source traffic increases, the autoscaler scales up the job in 
> time to prevent lags.
>  * When the source traffic decreases, the autoscaler scales down job in time 
> to save resources
>  * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a 
> day (job.autoscaler.metrics.window=15 min by default).
> As we all know, the job will be unavailable for a while during the restart 
> for some reasons:
>  * Cancel job
>  * Request resources( 
> [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states]
>  is optimizing it)
>  * Initialize task
>  * Restore state
>  * Catch up lag during restart
>  * etc
> *{color:#de350b}Expectations:{color}*
>  * Scaling up in time to prevent lags.
>  * Lazy scaling down to reduce downtime and ensure resources can be released 
> later.
> h1. Solution:
> Introduce job.autoscaler.scale-down.lazy-period, the default value could be 
> 30 min.
> Detailed strategies:
>  * Record the start time of the first scale-down event for each vertex 
> separately. For example:
>  ** vertex1: 2024-08-09 01:35:02
>  ** vertex2: 2024-08-09 01:38:02
>  * Scaling down will be triggered for some cases:
>  ** Any vertex needs scale up
>  *** Job restart cannot be avoided, so trigger scale down for another vertex 
> as well if needed
>  *** After scale down, clean up the start time of scale-down.
>  ** The scale down lazy period for any vertex is coming
>  *** current time - min(start time for each vertex) > scale-down.lazy-period
>  *** This means that there is no scaling up during the scaling down lazy 
> period
> Note1: If the recommend parallelism >= current parallelism, the start time of 
> scale-down will be cleaned up for current vertex.
> Note2: The recommended parallelism still comes from the latest 15-minute 
> metrics.For example:
>  * The current parallelism of vertex1 is 100, the traffic is decreased at 
> night.
>  * 2024-08-09 01:00:00, the recommended parallelism is 60.
>  * 
>  ** The start time of scale down is 2024-08-09 01:00:00.
>  * 2024-08-09 01:15:00, the recommended parallelism is 50.
>  ** Still within the range of scale down lazy period.
>  ** Don't update the start time of scale down.
>  * 2024-08-09 01:31:00, the recommended parallelism is 40.
>  ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the 
> recommended parallelism.
>  ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 
> 01:16:00 to 2024-08-09 01:31:00



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling

2024-08-09 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872208#comment-17872208
 ] 

Maximilian Michels commented on FLINK-36018:


There may already be an option which more or less does what you propose: 
job.autoscaler.scale-up.grace-period

See 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/

> Support lazy scale down to avoid frequent rescaling
> ---
>
> Key: FLINK-36018
> URL: https://issues.apache.org/jira/browse/FLINK-36018
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> h1. Background & Motivation
> We enabled autoscaler scaling for a few flink production jobs. It works with 
> Adaptive Scheduler and Rescale api.
> Scaling results:
>  * The recommended parallelism meets expectations most of the time
>  * When the source traffic increases, the autoscaler scales up the job in 
> time to prevent lags.
>  * When the source traffic decreases, the autoscaler scales down job in time 
> to save resources
>  * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a 
> day (job.autoscaler.metrics.window=15 min by default). 
> As we all know, the job will be unavailable for a while during the restart 
> for some reasons:
>  * Cancel job
>  * Request resources( 
> [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states]
>  is optimizing it)
>  * Initialize task
>  * Restore state
>  * Catch up lag during restart
>  * etc
> *{color:#de350b}Expectations:{color}*
>  * Scaling up in time to prevent lags.
>  * Lazy scaling down to reduce downtime and ensure resources can be released 
> later.
> h1. Solution:
> Introduce job.autoscaler.scale-down.lazy-period, the default value could be 
> 30 min.
> Detailed strategies:
>  * Record the start time of the first scale-down event for each vertex 
> separately. For example:
>  ** vertex1: 2024-08-09 01:35:02
>  ** vertex2: 2024-08-09 01:38:02
>  * Scaling down will be triggered for some cases:
>  ** Any vertex needs scale up
>  *** Job restart cannot be avoided, so trigger scale down for another vertex 
> as well if needed
>  *** After scale down, clean up the start time of scale-down.
>  ** The scale down lazy period for any vertex is coming
>  *** current time - min(start time for each vertex) > scale-down.lazy-period
>  *** This means that there is no scaling up during the scaling down lazy 
> period
> Note1: If the recommend parallelism >= current parallelism, the start time of 
> scale-down will be cleaned up for current vertex.
> Note2: The recommended parallelism still comes from the latest 15-minute 
> metrics.For example:
>  * The current parallelism of vertex1 is 100, the traffic is decreased at 
> night.
>  * 2024-08-09 01:00:00, the recommended parallelism is 60.
>  ** The start time of scale down is 2024-08-09 01:00:00.
>  * 2024-08-09 01:15:00, the recommended parallelism is 50.
>  ** Still within the range of scale down lazy period.
>  ** Don't update the start time of scale down.
>  * 2024-08-09 01:31:00, the recommended parallelism is 40.
>  ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the 
> recommended parallelism.
>  ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 
> 01:16:00 to 2024-08-09 01:31:00



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-25 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868752#comment-17868752
 ] 

Maximilian Michels commented on FLINK-35285:


To add, key group alignment ensures that we hit the target processing rate 
because we achieve the best possible parallel processing (assuming there are no 
hot keys). If the key groups or source partitions are not aligned, the 
processing may be limited by the vertices which have the most keys/source 
partitions. In the case of a 25% scale down, this could mean we are effectively 
scaled down by 50%, as Gyula showed in his example. That is not what we wanted 
in the first place. 

On another note, key group alignment also is beneficial because it keeps the 
parallelism changes somewhat stable between the different divisors of the max 
parallelism. 

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor

2024-07-24 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868546#comment-17868546
 ] 

Maximilian Michels commented on FLINK-35285:



Hi! Great discussion. Thanks for lopping me in! I think it makes sense to add 
an option to disable the key group alignment logic. I created a JIRA for this a 
while ago: FLINK-32120. 

Key group alignment is an important pillar to ensure stability of the 
algorithm. I understand your concerns about sticky parallelisms, but it is 
likely that the number of scalings will increase without key group alignment, 
at least in some situations. 

Estimating the partition skew in the absence of key group alignment and 
compensating for it by only a slightly higher parallelism, is an interesting 
idea that we might want to test. I was contemplating something like that (see: 
FLINK-32119). We should make sure though the adjusted parallelism still matches 
the desired target utilization and its throughput. Without this, the resulting 
scaling decision may turn out to be less stable because the lower throughput 
bound will be hit faster. 

In short, I think we can independently work on:

# FLINK-32120
# FLINK-32119 (or use this issue): Add a new skew estimation logic which 
adjusts the parallelism just enough to match the desired target utilization for 
scenarios where we don’t use the key group alignment hammer. This would also 
apply to scenarios where key group alignment is not possible.

> Autoscaler key group optimization can interfere with scale-down.max-factor
> --
>
> Key: FLINK-35285
> URL: https://issues.apache.org/jira/browse/FLINK-35285
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Trystan
>Priority: Minor
>
> When setting a less aggressive scale down limit, the key group optimization 
> can prevent a vertex from scaling down at all. It will hunt from target 
> upwards to maxParallelism/2, and will always find currentParallelism again.
>  
> A simple test trying to scale down from a parallelism of 60 with a 
> scale-down.max-factor of 0.2:
> {code:java}
> assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, 
> 360)); {code}
>  
> It seems reasonable to make a good attempt to spread data across subtasks, 
> but not at the expense of total deadlock. The problem is that during scale 
> down it doesn't actually ensure that newParallelism will be < 
> currentParallelism. The only workaround is to set a scale down factor large 
> enough such that it finds the next lowest divisor of the maxParallelism.
>  
> Clunky, but something to ensure it can make at least some progress. There is 
> another test that now fails, but just to illustrate the point:
> {code:java}
> for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) 
> {
> if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p 
> > currentParallelism)) {
> if (maxParallelism % p == 0) {
> return p;
> }
> }
> } {code}
>  
> Perhaps this is by design and not a bug, but total failure to scale down in 
> order to keep optimized key groups does not seem ideal.
>  
> Key group optimization block:
> [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35814) Don't scale up continuously when the throughout cannot be increased after scaling up

2024-07-11 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865084#comment-17865084
 ] 

Maximilian Michels commented on FLINK-35814:


I experienced false positives which led to pipelines building huge backlog. I 
think the feature needs to be made more robust before being enabled by default. 

> Don't scale up continuously when the throughout cannot be increased after 
> scaling up
> 
>
> Key: FLINK-35814
> URL: https://issues.apache.org/jira/browse/FLINK-35814
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> h2. Motivation
> Currently, the parallelism will be increased continuously when some cases 
> happen. Such as: data skew, bottleneck occurs in other system.
> In these cases, the throughout(processing rate) cannot be increased even if 
> we increase the parallelism.
> h2. Solution
> We don't need to scale up the task continuously when the throughout cannot be 
> increased after scaling up. 
> And it's better to trigger some events to reminder users fix the issue 
> manually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting

2024-06-10 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853606#comment-17853606
 ] 

Maximilian Michels commented on FLINK-35489:


Hey [~ashangit]! Good idea to switch the order between busgeting heap and 
metaspace, but metaspace should be allowed to grow and not be fixed. The reason 
is that metaspace is often configured too low. We can prevent metaspace related 
errors by detecting and increasing metaspace. 

> Metaspace size can be too little after autotuning change memory setting
> ---
>
> Key: FLINK-35489
> URL: https://issues.apache.org/jira/browse/FLINK-35489
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: 1.8.0
>Reporter: Nicolas Fraison
>Priority: Major
>  Labels: pull-request-available
>
> We have enable the autotuning feature on one of our flink job with below 
> config
> {code:java}
> # Autoscaler configuration
> job.autoscaler.enabled: "true"
> job.autoscaler.stabilization.interval: 1m
> job.autoscaler.metrics.window: 10m
> job.autoscaler.target.utilization: "0.8"
> job.autoscaler.target.utilization.boundary: "0.1"
> job.autoscaler.restart.time: 2m
> job.autoscaler.catch-up.duration: 10m
> job.autoscaler.memory.tuning.enabled: true
> job.autoscaler.memory.tuning.overhead: 0.5
> job.autoscaler.memory.tuning.maximize-managed-memory: true{code}
> During a scale down the autotuning decided to give all the memory to to JVM 
> (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b.
> Here is the config that was compute by the autotuning for a TM running on a 
> 4GB pod:
> {code:java}
> taskmanager.memory.network.max: 4063232b
> taskmanager.memory.network.min: 4063232b
> taskmanager.memory.jvm-overhead.max: 433791712b
> taskmanager.memory.task.heap.size: 3699934605b
> taskmanager.memory.framework.off-heap.size: 134217728b
> taskmanager.memory.jvm-metaspace.size: 22960020b
> taskmanager.memory.framework.heap.size: "0 bytes"
> taskmanager.memory.flink.size: 3838215565b
> taskmanager.memory.managed.size: 0b {code}
> This has lead to some issue starting the TM because we are relying on some 
> javaagent performing some memory allocation outside of the JVM (rely on some 
> C bindings).
> Tuning the overhead or disabling the scale-down-compensation.enabled could 
> have helped for that particular event but this can leads to other issue as it 
> could leads to too little HEAP size being computed.
> It would be interesting to be able to set a min memory.managed.size to be 
> taken in account by the autotuning.
> What do you think about this? Do you think that some other specific config 
> should have been applied to avoid this issue?
>  
> Edit see this comment that leads to the metaspace issue: 
> https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33799) Add e2e's for tls enabled operator

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33799:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Add e2e's for tls enabled operator
> --
>
> Key: FLINK-33799
> URL: https://issues.apache.org/jira/browse/FLINK-33799
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Minor
> Fix For: kubernetes-operator-1.9.0
>
>
> It would be good to create some E2E tests to ensure a tls enabled flink 
> operator works, so that we don't break anything in the future



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34151) Integrate Karpenter resource limits into cluster capacity check

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34151:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Integrate Karpenter resource limits into cluster capacity check
> ---
>
> Key: FLINK-34151
> URL: https://issues.apache.org/jira/browse/FLINK-34151
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> FLINK-33771 added cluster capacity checking for Flink Autoscaling decisions. 
> The checks respect the scaling limits of the Kubernetes Cluster Autoscaler. 
> We should also support Karpenter-based resource checks, as Karpenter is the 
> preferred method of expanding the cluster size in some environments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32120) Add autoscaler config option to disable parallelism key group alignment

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-32120:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Add autoscaler config option to disable parallelism key group alignment
> ---
>
> Key: FLINK-32120
> URL: https://issues.apache.org/jira/browse/FLINK-32120
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> After choosing the target parallelism for a vertex, we choose a higher 
> parallelism if that parallelism leads to evenly spreading the number of key 
> groups. The number of key groups is derived from the max parallelism.
> The amount of actual skew we would introduce if we did not do the alignment 
> would usually be pretty low. In fact, the data itself can have an uneven load 
> distribution across the keys (hot keys). In this case, the key group 
> alignment is not effective.
> For experiments, we should allow disabling the key group alignment via a 
> configuration option.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32119) Revise source partition skew logic

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-32119:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Revise source partition skew logic 
> ---
>
> Key: FLINK-32119
> URL: https://issues.apache.org/jira/browse/FLINK-32119
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> After choosing the target parallelism for a vertex, we choose a higher 
> parallelism if that parallelism leads to evenly spreading the number of key 
> groups (=max parallelism).
> Sources don't have keyed state, so this adjustment does not make sense for 
> key groups. However, we internally limit the max parallelism of sources to 
> the number of partitions discovered. This prevents partition skew. 
> The partition skew logic currently doesn’t work correctly when there are 
> multiple topics because we use the total number of partitions discovered. 
> Using a single max parallelism doesn’t yield skew free partition distribution 
> then. However, this is also true for a single topic when the number of 
> partitions is a prime number or a not easily divisible number. 
> Hence, we should add an option to guarantee skew free partition distribution 
> which means using the total number of partitions when another configuration 
> is not possible. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34540) Tune number of task slots

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34540:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Tune number of task slots
> -
>
> Key: FLINK-34540
> URL: https://issues.apache.org/jira/browse/FLINK-34540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-31502:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Limit the number of concurrent scale operations to reduce cluster churn
> ---
>
> Key: FLINK-31502
> URL: https://issues.apache.org/jira/browse/FLINK-31502
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> Until we move to using the upcoming Rescale API which recycles pods, we need 
> to be mindful with how many deployments we scale at the same time because 
> each of them is going to give up all its pods and require the new number of 
> required pods. 
> This can cause churn in the cluster and temporary lead to "unallocatable" 
> pods which triggers the k8s cluster autoscaler to add more cluster nodes. 
> That is often not desirable because the actual required resources after the 
> scaling have been settled, are lower.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33789) Expose restart time as a metric

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33789:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Expose restart time as a metric
> ---
>
> Key: FLINK-33789
> URL: https://issues.apache.org/jira/browse/FLINK-33789
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Alexander Fedulov
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> FLINK-30593 added restart time tracking. It would be convenient to also 
> report is as a metric.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33773) Add fairness to scaling decisions

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33773:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Add fairness to scaling decisions
> -
>
> Key: FLINK-33773
> URL: https://issues.apache.org/jira/browse/FLINK-33773
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Deployment / Kubernetes
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> The current scaling logic is inherently unfair. In a scenario of heavy 
> backlog, whichever pipelines come first, they will end up taking most of the 
> resources. Some kind of fairness should be introduced, for example:
> * Cap the max number of resulting pods at a % of the cluster resources
> * Allow scale up round-robin across all pipelines



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33673) SizeLimits not being set on emptyDir

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33673:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> SizeLimits not being set on emptyDir
> 
>
> Key: FLINK-33673
> URL: https://issues.apache.org/jira/browse/FLINK-33673
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Tony Garrard
>Priority: Minor
> Fix For: kubernetes-operator-1.9.0
>
>
> The operator should set a sizeLimit on any emptyDir's it creates. See 
> [https://main.kyverno.io/policies/other/a/add-emptydir-sizelimit/add-emptydir-sizelimit/.
>  
> |https://main.kyverno.io/policies/other/a/add-emptydir-sizelimit/add-emptydir-sizelimit/]
> This issue is to set a sizeLimit. The default one in question is for 
> artifacts. My initial guess at a setting would be around 512Mb



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> Tune JobManager memory
> --
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.9.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34389) JdbcAutoscalerStateStore explicitly writes update_time

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34389:
---
Fix Version/s: kubernetes-operator-1.9.0
   (was: kubernetes-operator-1.8.0)

> JdbcAutoscalerStateStore explicitly writes update_time
> --
>
> Key: FLINK-34389
> URL: https://issues.apache.org/jira/browse/FLINK-34389
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: kubernetes-operator-1.8.0
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.9.0
>
>
> JdbcAutoscalerStateStore explicitly writes update_time instead of relying on 
> the database to update.
> Some databases doesn't support update the timestamp column automatically. For 
> example, Derby doesn't support update the update_time automatically when we 
> update any data. It's hard to do a general test during I developing the test 
> for JdbcAutoscalerEventHandler.
>  
> As the common&open source service, in order to support all databases well, 
> it's better to handle it inside of the service.
>  
> In order to unify the design for JdbcAutoscalerEventHandler and 
> JdbcAutoscalerStateStore, we update the design of JdbcAutoscalerStateStore in 
> this JIRA.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34152) Tune TaskManager memory

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-34152.

Release Note: TaskManager memory (heap, network, metaspace, managed) is 
optimized together with autoscaling decisions.
  Resolution: Fixed

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34619) Do not wait for scaling completion in UPGRADE state with in-place scaling

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-34619.

Release Note: Merged via 8938658ed245545e6436ff22cbb8b2fabd4047f1
  Resolution: Fixed

> Do not wait for scaling completion in UPGRADE state with in-place scaling
> -
>
> Key: FLINK-34619
> URL: https://issues.apache.org/jira/browse/FLINK-34619
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The operator currently puts the resource into upgrading state after 
> triggering in-place scaling and keeps observing until the desired parallelism 
> is reached before moving to deployed / stable. 
> However this means that due to how the adaptive scheduler works this 
> parallelism may never be reached and this is expected.
> We should simplify the logic to consider scaling "done" once the resource 
> requirements have been set correctly and then leave the rest to the adaptive 
> scheduler



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-34104) Improve the ScalingReport format of autoscaling

2024-03-14 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-34104.

Resolution: Fixed

> Improve the ScalingReport format of autoscaling
> ---
>
> Key: FLINK-34104
> URL: https://issues.apache.org/jira/browse/FLINK-34104
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Currently, the scaling report format is 
> {color:#6a8759} Vertex ID %s | Parallelism %d -> %d | Processing capacity 
> %.2f -> %.2f | Target data rate %.2f{color}
> {color:#172b4d}It has 2 disadvantages:{color}
>  # {color:#172b4d}When one job has multiple vertices, the report of all 
> vertices are mixed together without any separator{color}{color:#172b4d}, here 
> is an example:{color}
>  ** {color:#172b4d}Scaling execution enabled, begin scaling vertices: Vertex 
> ID ea632d67b7d595e5b851708ae9ad79d6 | Parallelism 2 -> 1 | Processing 
> capacity 800466.67 -> 320186.67 | Target data rate 715.10 Vertex ID 
> bc764cd8ddf7a0cff126f51c16239658 | Parallelism 2 -> 1 | Processing capacity 
> 79252.08 -> 31700.83 | Target data rate 895.93 Vertex ID 
> 0a448493b4782967b150582570326227 | Parallelism 8 -> 16 | Processing capacity 
> 716.05 -> 1141.00 | Target data rate 715.54{color}
>  ** {color:#172b4d}We can see the Vertex ID is the beginning of each vertex 
> report, it doesn't have any {color}{color:#172b4d}separator with the last 
> vertex.{color}
>  # {color:#172b4d}This format is non-standard{color}{color:#172b4d}, it's 
> hard to deserialize.{color}
>  ** {color:#172b4d}When job enables the autoscaler and disable the 
> scaling.{color}
>  ** {color:#172b4d}Flink platform maintainer wants to show the scaling report 
> in WebUI, it's helpful to using the report result for flink users.{color}
>  ** {color:#172b4d}So easy to deserialize is useful for these flink 
> platform.{color}
> h2. {color:#172b4d}Solution:{color}
>  * {color:#172b4d}Adding the {{{}} and {{}}} as the separator between 
> multiple vertices. {color}
>  * {color:#172b4d}Adding the {{AutoscalerEventUtils}} to easy deserialize the 
> {{ScalingReport}} message.{color}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15

2024-03-13 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826015#comment-17826015
 ] 

Maximilian Michels commented on FLINK-34655:


Thanks for raising awareness for the Flink version compatibility, [~fanrui]! 
Although we've been using Flink Autoscaling with 1.16, it is true that only 
Flink 1.17 supports it out of the box.
{quote}In the short term, we only use the autoscaler to give suggestion instead 
of scaling directly. After our users think the parallelism calculation is 
reliable, they will have stronger motivation to upgrade the flink version.
{quote}
I understand the idea behind providing suggestions. However, it is difficult to 
assess the quality of Autoscaling decisions without applying them 
automatically. The reason is that suggestions become stale very quickly if the 
load pattern is not completely static. Even for static load patterns, if the 
user doesn't redeploy in a matter of minutes, the suggestions might already be 
stale again when the number of pending records increased too much. In any case, 
production load patterns are rarely static which means that autoscaling will 
inevitable trigger multiple times a day, but that is where its real power is 
unleashed. It would be great to hear about any concerns your users have for 
turning on automatic scaling. We've been operating it in production for about a 
year now.

Back to the issue here, should we think about a patch release for 1.15 / 1.16 
to add support for overriding vertex parallelism?

> Autoscaler doesn't work for flink 1.15
> --
>
> Key: FLINK-34655
> URL: https://issues.apache.org/jira/browse/FLINK-34655
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> flink-ubernetes-operator is committed to supporting the latest 4 flink minor 
> versions, and autoscaler is a part of flink-ubernetes-operator. Currently,  
> the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18.
> But autoscaler doesn't work for  flink 1.15.
> h2. Root cause: 
> * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16
> * IOMetricsInfo is a part of JobDetailsInfo
> * JobDetailsInfo is necessary for autoscaler [1]
> * flink's RestClient doesn't allow miss any property during deserializing the 
> json
> That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for 
> 1.15 jobs.
> h2. How to fix it properly?
> - [[FLINK-34655](https://issues.apache.org/jira/browse/FLINK-34655)] Copy 
> IOMetricsInfo to flink-autoscaler-standalone module
> - Removing them after 1.15 are not supported
> [1] 
> https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34152) Tune TaskManager memory

2024-03-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822790#comment-17822790
 ] 

Maximilian Michels edited comment on FLINK-34152 at 3/2/24 9:11 AM:


{quote}What I'm trying to say is that I didn't notice any updated configuration 
for the memory request and limit at the pod template level for the taskmanager. 
Therefore, I assume that the pod's memory allocation won't automatically adjust 
to reflect changes in the taskmanager's heap size, unless I've missed something.
{quote}
I would ask you to check again. When the tuning is applied, the pod resource 
requests/limits of the TaskManager pods will be adjusted. So changes will be 
directly reflected in terms of resource usage in Kubernetes.
{quote}Indeed, by implementing bin-packing, we can optimize resource 
utilization, which is now clearer to me. However, its management becomes more 
complex (K8s upgrade, daily node restart/eviction) for sure, especially when 
there are other application components in the same Kubernetes cluster IMO
{quote}
You are right, there is more complexity to realize Flink Autoscaling benefits 
end to end. However, there is also a great amount of resource savings and 
convenience for the user that come out of it. We have seen 60% fewer nodes 
after enabling Flink Autoscaling while maintaing the same amount of service and 
drastically decreasing the maintaince for our users who would have to adjust 
parallelism constantly to run cost-efficient. They usually did not want to do 
that and thus all jobs ran very over-provisioned.
{quote}Can you take a look on it 
https://issues.apache.org/jira/browse/FLINK-34563 and 
[https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if 
you think it's making sense, thanks :)
{quote}
Thank you, I'll review in the next days!


was (Author: mxm):
{quote}What I'm trying to say is that I didn't notice any updated configuration 
for the memory request and limit at the pod template level for the taskmanager. 
Therefore, I assume that the pod's memory allocation won't automatically adjust 
to reflect changes in the taskmanager's heap size, unless I've missed something.
{quote}
I would ask you to check again. When the tuning is applied, the pod resource 
requests/limits of the TaskManager pods will be adjusted. So changes will be 
directly reflected in terms of resource usage in Kubernetes.
{quote}Indeed, by implementing bin-packing, we can optimize resource 
utilization, which is now clearer to me. However, its management becomes more 
complex (K8s upgrade, daily node restart/eviction) for sure, especially when 
there are other application components in the same Kubernetes cluster IMO
{quote}
You are right, there is more complexity to realize Flink Autoscaling benefits 
end to end. However, there is also a great amount of resource savings and 
convenience for the user that come out of it. We have seen 60% fewer nodes 
after enabling Flink Autoscaling while maintaing the same amount of service and 
drastically decreasing the maintaince for our users who would have to adjust 
parallelism constantly to run cost-efficient. They usually did not want to do 
that and thus all jobs ran very over-provisioned.
{quote}Can you take a look on it 
https://issues.apache.org/jira/browse/FLINK-34563 and 
[https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if 
you think it's making sense, thanks :)
{quote}
Thank you, I'll review in the next days. Generally, I wonder. This idea has 
crossed my mind before. I wasn't really sure how exactly to adjust the 
parallelism to fill the TaskManagers. Adjusting only the vertex with the 
highest parallelism might be unfair to other vertices. I think spreading out 
the unused task slots to the vertices with ther lowest parallelism might be 
more beneficial for the stability. We have seen more instability with lower 
parallelisms because the metrics are less precise.

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actual

[jira] [Commented] (FLINK-34152) Tune TaskManager memory

2024-03-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822790#comment-17822790
 ] 

Maximilian Michels commented on FLINK-34152:


{quote}What I'm trying to say is that I didn't notice any updated configuration 
for the memory request and limit at the pod template level for the taskmanager. 
Therefore, I assume that the pod's memory allocation won't automatically adjust 
to reflect changes in the taskmanager's heap size, unless I've missed something.
{quote}
I would ask you to check again. When the tuning is applied, the pod resource 
requests/limits of the TaskManager pods will be adjusted. So changes will be 
directly reflected in terms of resource usage in Kubernetes.
{quote}Indeed, by implementing bin-packing, we can optimize resource 
utilization, which is now clearer to me. However, its management becomes more 
complex (K8s upgrade, daily node restart/eviction) for sure, especially when 
there are other application components in the same Kubernetes cluster IMO
{quote}
You are right, there is more complexity to realize Flink Autoscaling benefits 
end to end. However, there is also a great amount of resource savings and 
convenience for the user that come out of it. We have seen 60% fewer nodes 
after enabling Flink Autoscaling while maintaing the same amount of service and 
drastically decreasing the maintaince for our users who would have to adjust 
parallelism constantly to run cost-efficient. They usually did not want to do 
that and thus all jobs ran very over-provisioned.
{quote}Can you take a look on it 
https://issues.apache.org/jira/browse/FLINK-34563 and 
[https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if 
you think it's making sense, thanks :)
{quote}
Thank you, I'll review in the next days. Generally, I wonder. This idea has 
crossed my mind before. I wasn't really sure how exactly to adjust the 
parallelism to fill the TaskManagers. Adjusting only the vertex with the 
highest parallelism might be unfair to other vertices. I think spreading out 
the unused task slots to the vertices with ther lowest parallelism might be 
more beneficial for the stability. We have seen more instability with lower 
parallelisms because the metrics are less precise.

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34152) Tune TaskManager memory

2024-03-01 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822512#comment-17822512
 ] 

Maximilian Michels commented on FLINK-34152:


Hi [~yang]! Thanks for taking a look at the recent changes. There has been two 
more follow-up PRs since the initial PR you linked. I'm very curious to hear 
your feedback.
{quote}We may need to dynamically adjust the Kubernetes CPU and memory limits 
for both the job manager and task manager eventually, to align with the 
automatically tuned memory and CPU parameters and prevent unnecessary resource 
allocation.
{quote}
Tuning JobManager memory is still pending, but I agree that tuning only 
TaskManagers is not enough. As for tuning CPU, I think we eventually want to 
tune the number of task slots to fit them to the CPUs assigned. As for scaling 
CPU itself, that is already taken care of by the autoscaler which essentially 
scales based on the CPU usage of TaskManagers.
{quote}In our specific use-case, our Flink cluster is deployed on a dedicated 
node group with predefined CPU and memory settings, unlike a typical Kubernetes 
cluster. Consequently, this auto-tuning feature might not aid in reducing 
infrastructure costs, as billing is based on the allocated nodes behind the 
scenes.
{quote}
Autoscaling assumes some sort of Kubernetes Cluster Autoscaling to be active. 
When fewer resources are allocated, that should result in fewer nodes, but in 
practice it isn't quite that easy. It requires a bit of extra work for nodes to 
get released when fewer resources are used. The default Kubernetes scheduler 
doesn't bin-pack, but it can be reconfigured to do bin-packing as opposed to 
its default behavior to evenly spread out pods.

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory  (was: Tune TaskManager memory of 
autoscaled jobs)

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of austoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of austoscaled jobs  (was: Tune 
TaskManager memory)

> Tune TaskManager memory of austoscaled jobs
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-34540:
--

Assignee: (was: Maximilian Michels)

> Tune number of task slots
> -
>
> Key: FLINK-34540
> URL: https://issues.apache.org/jira/browse/FLINK-34540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of autoscaled jobs  (was: Tune TaskManager 
memory of austoscaled jobs)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34540:
---
Description: (was: Adjustments similar to FLINK-34152, but simpler 
because we only need to adjust heap memory and metaspace for the JobManager.)

> Tune number of task slots
> -
>
> Key: FLINK-34540
> URL: https://issues.apache.org/jira/browse/FLINK-34540
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Summary: Tune JobManager memory  (was: Tune JobManager memory of autoscaled 
jobs)

> Tune JobManager memory
> --
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory  (was: Tune TaskManager memory of 
autoscaled jobs)

> Tune TaskManager memory
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34540) Tune number of task slots

2024-02-28 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34540:
--

 Summary: Tune number of task slots
 Key: FLINK-34540
 URL: https://issues.apache.org/jira/browse/FLINK-34540
 Project: Flink
  Issue Type: Sub-task
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


Adjustments similar to FLINK-34152, but simpler because we only need to adjust 
heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: Umbrella issue to tackle tuning the Flink configuration as 
part of Flink Autoscaling.  (was: Umbrella issue to tackle)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>
> Umbrella issue to tackle tuning the Flink configuration as part of Flink 
> Autoscaling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Labels:   (was: pull-request-available)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: Umbrella issue to tackle  (was: The current autoscaling 
algorithm adjusts the parallelism of the job task vertices according to the 
processing needs. By adjusting the parallelism, we systematically scale the 
amount of CPU for a task. At the same time, we also indirectly change the 
amount of memory tasks have at their dispense. However, there are some problems 
with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

[https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit])

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Fix Version/s: (was: kubernetes-operator-1.8.0)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
>
> Umbrella issue to tackle



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Summary: Tune Flink config of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune Flink config of autoscaled jobs
> 
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Labels:   (was: pull-request-available)

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Description: Adjustments similar to FLINK-34152, but simpler because we 
only need to adjust heap memory and metaspace for the JobManager.  (was: 
Similarly to)

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Adjustments similar to FLINK-34152, but simpler because we only need to 
> adjust heap memory and metaspace for the JobManager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

[https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34539:
---
Description: Similarly to  (was: The current autoscaling algorithm adjusts 
the parallelism of the job task vertices according to the processing needs. By 
adjusting the parallelism, we systematically scale the amount of CPU for a 
task. At the same time, we also indirectly change the amount of memory tasks 
have at their dispense. However, there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 )

> Tune JobManager memory of autoscaled jobs
> -
>
> Key: FLINK-34539
> URL: https://issues.apache.org/jira/browse/FLINK-34539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Similarly to



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34539) Tune JobManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34539:
--

 Summary: Tune JobManager memory of autoscaled jobs
 Key: FLINK-34539
 URL: https://issues.apache.org/jira/browse/FLINK-34539
 Project: Flink
  Issue Type: Sub-task
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Parent: FLINK-34538
Issue Type: Sub-task  (was: New Feature)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: Sub-task
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34538) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34538:
--

 Summary: Tune memory of autoscaled jobs
 Key: FLINK-34538
 URL: https://issues.apache.org/jira/browse/FLINK-34538
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34538) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34538:
---
Description: (was: The current autoscaling algorithm adjusts the 
parallelism of the job task vertices according to the processing needs. By 
adjusting the parallelism, we systematically scale the amount of CPU for a 
task. At the same time, we also indirectly change the amount of memory tasks 
have at their dispense. However, there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 )

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34538
> URL: https://issues.apache.org/jira/browse/FLINK-34538
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune TaskManager memory of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune TaskManager memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune memory of autoscaled jobs  (was: Tune TaskManager memory of 
autoscaled jobs)

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit

 

  was:
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

 


> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-02-28 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

 

  was:
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

A tuning algorithm could look like this:
h2. 1. Establish a memory baseline

We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record

The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much. 
The max memory per TM should be equal to the initially defined user-specified 
limit from the ResourceSpec. 
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}


> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-22 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819591#comment-17819591
 ] 

Maximilian Michels commented on FLINK-34471:


Yes, that is exactly what I meant: Moving from FORWARD to RESCALE. The 
workaround you described makes sense.

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-22 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819578#comment-17819578
 ] 

Maximilian Michels edited comment on FLINK-34471 at 2/22/24 10:18 AM:
--

Thanks! That would be very helpful – I don’t mind at all. I realized point to 
point connections might be a bit tricky when the parallelism between tasks 
changes from being equal to being different because at runtime we then switch 
to a different partitioner. So worst case we could run out of network buffers 
in this scenario. I’m curious how you want to solve this. We probably need to 
add extra buffers to account for this edge case. 


was (Author: mxm):
Thanks! That would be very helpful – I don’t mind at all. I realized point to 
point connections might be a bit tricky when the parallelism between tasks 
changes from being equal to changing because at runtime we then switch to a 
different partitioner. So worst case we could run out of network buffers in 
this scenario. I’m curious how you want to solve this. We probably need to add 
extra buffers to account for this edge case. 

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-22 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819578#comment-17819578
 ] 

Maximilian Michels commented on FLINK-34471:


Thanks! That would be very helpful – I don’t mind at all. I realized point to 
point connections might be a bit tricky when the parallelism between tasks 
changes from being equal to changing because at runtime we then switch to a 
different partitioner. So worst case we could run out of network buffers in 
this scenario. I’m curious how you want to solve this. We probably need to add 
extra buffers to account for this edge case. 

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-21 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819306#comment-17819306
 ] 

Maximilian Michels commented on FLINK-34471:


I think in addition to the fine-grained approach described in the doc, we can 
do a first implementation which simply uses 
[https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/#network-buffer-lifecycle]
 and assumes an ALL_TO_ALL relationship. This may not optimize down to the last 
byte but still gives great savings over the default.

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning

2024-02-21 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34471:
---
Summary: Tune network memory as part of Autoscaler Memory Tuning  (was: 
Tune the network memory in Autoscaler)

> Tune network memory as part of Autoscaler Memory Tuning
> ---
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34471) Tune the network memory in Autoscaler

2024-02-21 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34471:
---
Summary: Tune the network memory in Autoscaler  (was: Tune the network 
memroy in Autoscaler)

> Tune the network memory in Autoscaler
> -
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34471) Tune the network memroy in Autoscaler

2024-02-21 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819233#comment-17819233
 ] 

Maximilian Michels commented on FLINK-34471:


Thanks Rui!

> Tune the network memroy in Autoscaler
> -
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34471) Tune the network memroy in Autoscaler

2024-02-21 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-34471:
--

Assignee: Maximilian Michels

> Tune the network memroy in Autoscaler
> -
>
> Key: FLINK-34471
> URL: https://issues.apache.org/jira/browse/FLINK-34471
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Maximilian Michels
>Priority: Major
>
> Design doc: 
> https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33092) Improve the resource-stabilization-timeout mechanism when rescale a job for Adaptive Scheduler

2024-01-31 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812778#comment-17812778
 ] 

Maximilian Michels commented on FLINK-33092:


+1 waiting on resources in the Executing state.

I think we need to just change the ScalingControler to delay triggering the 
actual rescale process: 
[https://github.com/apache/flink/blob/cb9e220c2291088459f0281aa8e8e8584436a9b2/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/RescalingController.java#L37]

Right now, it triggers immediately on parallelism change. [~dmvk] can probably 
answer this.

> Improve the resource-stabilization-timeout mechanism when rescale a job for 
> Adaptive Scheduler
> --
>
> Key: FLINK-33092
> URL: https://issues.apache.org/jira/browse/FLINK-33092
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
> Attachments: image-2023-09-15-14-43-35-104.png
>
>
> !image-2023-09-15-14-43-35-104.png|width=916,height=647!
> h1. 1. Propose
> The above is the state transition graph when rescale a job in Adaptive 
> Scheduler.
> In brief, when we trigger a rescale, the job will wait 
> _*resource-stabilization-timeout*_ in WaitingForResources State when it has 
> sufficient resources and it doesn't have the desired resource.
> If the _*resource-stabilization-timeout mechanism*_ is moved into the 
> Executing State, the rescale downtime will be significantly reduced.
> h1. 2. Why the downtime is long?
> Currently, when rescale a job:
>  * The Executing will transition to Restarting
>  * The Restarting will cancel this job first.
>  * The Restarting will transition to WaitingForResources after the whole job 
> is terminal.
>  * When this job has sufficient resources and it doesn't have the desired 
> resource, the WaitingForResources needs to wait  
> _*resource-stabilization-timeout*_ .
>  * WaitingForResources will transition to CreatingExecutionGraph after  
> resource-stabilization-timeout.
> The problem is the job isn't running during the 
> resource-stabilization-timeout phase.
> h1. 3. How to reduce the downtime?
> We can move the _*resource-stabilization-timeout mechanism*_ into the 
> Executing State when trigger a rescale. It means:
>  * When this job has desired resources, the Executing can rescale directly.
>  * When this job has sufficient resources and it doesn't have the desired 
> resource, we can rescale after _*resource-stabilization-timeout.*_
>  * The WaitingForResources will ignore the resource-stabilization-timeout 
> after this improvement.
> The resource-stabilization-timeout works before cancel job, so the rescale 
> downtime will be significantly reduced.
>  
> Note: the resource-stabilization-timeout still works in WaitingForResources 
> when start a job. It's just changed when rescale a job.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34266) Output ratios should be computed over the whole metric window instead of averaged

2024-01-29 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811988#comment-17811988
 ] 

Maximilian Michels commented on FLINK-34266:


Yes, using the total sum and substracting the start from the end observation is 
the way to go for maximum precision. 

> Output ratios should be computed over the whole metric window instead of 
> averaged
> -
>
> Key: FLINK-34266
> URL: https://issues.apache.org/jira/browse/FLINK-34266
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Gyula Fora
>Priority: Critical
>
> Currently Output ratios are computed during metric collection based on the 
> current in/out metrics an stored as part of the collected metrics.
> During evaluation the output ratios previously computed are then averaged 
> together in the metric window. This however leads to incorrect computation 
> due to the nature of the computation and averaging.
> Example:
> Let's look at a window operator that simply sorts and re-emits events in 
> windows. During the window collection phase, output ratio will be computed 
> and stored as 0. During the window computation the output ratio will be 
> last_input_rate / window_size.  Depending on the last input rate observation 
> this can be off when averaged into any direction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34266) Output ratios should be computed over the whole metric window instead of averaged

2024-01-29 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811921#comment-17811921
 ] 

Maximilian Michels commented on FLINK-34266:


The way I understand the code is that for every observation, we will store the 
total output rate of every vertex. During metric window evaluation, we will 
average all of those. That is in line with how all the code works.

I agree 100% that all metrics should be observed over the entire metric window. 
So rates should be computed by measuring the number of records produced at the 
start and at the end up the window, then subtracting them from each other.

This request seems analogue to FLINK-34213 but for rates instead of busy time. 
Is that fair to say?

> Output ratios should be computed over the whole metric window instead of 
> averaged
> -
>
> Key: FLINK-34266
> URL: https://issues.apache.org/jira/browse/FLINK-34266
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Gyula Fora
>Priority: Critical
>
> Currently Output ratios are computed during metric collection based on the 
> current in/out metrics an stored as part of the collected metrics.
> During evaluation the output ratios previously computed are then averaged 
> together in the metric window. This however leads to incorrect computation 
> due to the nature of the computation and averaging.
> Example:
> Let's look at a window operator that simply sorts and re-emits events in 
> windows. During the window collection phase, output ratio will be computed 
> and stored as 0. During the window computation the output ratio will be 
> last_input_rate / window_size.  Depending on the last input rate observation 
> this can be off when averaged into any direction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond

2024-01-23 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810006#comment-17810006
 ] 

Maximilian Michels commented on FLINK-34213:


If we had to query metrics per vertex, that would be too expensive, but it 
seems like that is not necessary. Here is an exemplary REST API response to the 
{{/jobs/}} endpoint:

{noformat}
{
"jid": "b4f918c2a0312de9fe7369a7db093e96",
"name": "-",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1705094021727,
"end-time": -1,
"duration": 928985186,
"maxParallelism": 1,
"now": 1706023006913,
"timestamps": {
"SUSPENDED": 0,
"RUNNING": 1705094036134,
"FAILING": 0,
"CANCELED": 0,
"CANCELLING": 0,
"CREATED": 1705094035034,
"INITIALIZING": 1705094021727,
"FAILED": 0,
"RESTARTING": 0,
"RECONCILING": 0,
"FINISHED": 0
},
"vertices": [
{
"id": "db1f263dc155338dc2a9622a2e06d115",
"name": "",
"maxParallelism": 1,
"parallelism": 18,
"status": "RUNNING",
"start-time": 1705094037437,
"end-time": -1,
"duration": 928969476,
"tasks": {
"CANCELED": 0,
"DEPLOYING": 0,
"CANCELING": 0,
"RECONCILING": 0,
"FINISHED": 0,
"SCHEDULED": 0,
"CREATED": 0,
"INITIALIZING": 0,
"FAILED": 0,
"RUNNING": 18
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 2907138853415272,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 229589536334,
"write-records-complete": true,
"accumulated-backpressured-time": 1533744940,
"accumulated-idle-time": 10026044858,
"accumulated-busy-time": 5161601268
}
},
   ...
]
}
{noformat}

Note the accumulated backpressure/idle time.

> Consider using accumulated busy time instead of busyMsPerSecond
> ---
>
> Key: FLINK-34213
> URL: https://issues.apache.org/jira/browse/FLINK-34213
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Priority: Minor
>
> We might achieve much better accuracy if we used the accumulated busy time 
> metrics from Flink, instead of the momentarily collected ones.
> We would use the diff between the last accumulated and the current 
> accumulated busy time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond

2024-01-23 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34213:
--

 Summary: Consider using accumulated busy time instead of 
busyMsPerSecond
 Key: FLINK-34213
 URL: https://issues.apache.org/jira/browse/FLINK-34213
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels


We might achieve much better accuracy if we used the accumulated busy time 
metrics from Flink, instead of the momentarily collected ones.

We would use the diff between the last accumulated and the current accumulated 
busy time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-01-22 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Description: 
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory than we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

A tuning algorithm could look like this:
h2. 1. Establish a memory baseline

We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record

The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much. 
The max memory per TM should be equal to the initially defined user-specified 
limit from the ResourceSpec. 
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}

  was:
The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory then we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

A tuning algorithm could look like this:
h2. 1. Establish a memory baseline

We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record

The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much. 
The max memory per TM should be equal to the initially defined user-specified 
limit from the ResourceSpec. 
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}


> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfM

[jira] [Created] (FLINK-34152) Tune memory of autoscaled jobs

2024-01-18 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34152:
--

 Summary: Tune memory of autoscaled jobs
 Key: FLINK-34152
 URL: https://issues.apache.org/jira/browse/FLINK-34152
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The current autoscaling algorithm adjusts the parallelism of the job task 
vertices according to the processing needs. By adjusting the parallelism, we 
systematically scale the amount of CPU for a task. At the same time, we also 
indirectly change the amount of memory tasks have at their dispense. However, 
there are some problems with this.
 # Memory is overprovisioned: On scale up we may add more memory then we 
actually need. Even on scale down, the memory / cpu ratio can still be off and 
too much memory is used.
 # Memory is underprovisioned: For stateful jobs, we risk running into 
OutOfMemoryErrors on scale down. Even before running out of memory, too little 
memory can have a negative impact on the effectiveness of the scaling.

We lack the capability to tune memory proportionally to the processing needs. 
In the same way that we measure CPU usage and size the tasks accordingly, we 
need to evaluate memory usage and adjust the heap memory size.

A tuning algorithm could look like this:
h2. 1. Establish a memory baseline

We observe the average heap memory usage at task managers.
h2. 2. Calculate memory usage per record

The memory requirements per record can be estimated by calculating this ratio:
{noformat}
memory_per_rec = sum(heap_usage) / sum(records_processed)
{noformat}
This ratio is surprisingly constant based off empirical data.
h2. 3. Scale memory proportionally to the per-record memory needs
{noformat}
memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
{noformat}
A minimum memory limit needs to be added to avoid scaling down memory too much. 
The max memory per TM should be equal to the initially defined user-specified 
limit from the ResourceSpec. 
{noformat}
memory_per_tm = max(min_limit, memory_per_tm)
memory_per_tm = min(max_limit, memory_per_tm) {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune heap memory of autoscaled jobs

2024-01-18 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune heap memory of autoscaled jobs  (was: Tune memory of 
autoscaled jobs)

> Tune heap memory of autoscaled jobs
> ---
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory then we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> A tuning algorithm could look like this:
> h2. 1. Establish a memory baseline
> We observe the average heap memory usage at task managers.
> h2. 2. Calculate memory usage per record
> The memory requirements per record can be estimated by calculating this ratio:
> {noformat}
> memory_per_rec = sum(heap_usage) / sum(records_processed)
> {noformat}
> This ratio is surprisingly constant based off empirical data.
> h2. 3. Scale memory proportionally to the per-record memory needs
> {noformat}
> memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
> {noformat}
> A minimum memory limit needs to be added to avoid scaling down memory too 
> much. The max memory per TM should be equal to the initially defined 
> user-specified limit from the ResourceSpec. 
> {noformat}
> memory_per_tm = max(min_limit, memory_per_tm)
> memory_per_tm = min(max_limit, memory_per_tm) {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs

2024-01-18 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-34152:
---
Summary: Tune memory of autoscaled jobs  (was: Tune heap memory of 
autoscaled jobs)

> Tune memory of autoscaled jobs
> --
>
> Key: FLINK-34152
> URL: https://issues.apache.org/jira/browse/FLINK-34152
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory then we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> A tuning algorithm could look like this:
> h2. 1. Establish a memory baseline
> We observe the average heap memory usage at task managers.
> h2. 2. Calculate memory usage per record
> The memory requirements per record can be estimated by calculating this ratio:
> {noformat}
> memory_per_rec = sum(heap_usage) / sum(records_processed)
> {noformat}
> This ratio is surprisingly constant based off empirical data.
> h2. 3. Scale memory proportionally to the per-record memory needs
> {noformat}
> memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
> {noformat}
> A minimum memory limit needs to be added to avoid scaling down memory too 
> much. The max memory per TM should be equal to the initially defined 
> user-specified limit from the ResourceSpec. 
> {noformat}
> memory_per_tm = max(min_limit, memory_per_tm)
> memory_per_tm = min(max_limit, memory_per_tm) {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34151) Integrate Karpenter resource limits into cluster capacity check

2024-01-18 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-34151:
--

 Summary: Integrate Karpenter resource limits into cluster capacity 
check
 Key: FLINK-34151
 URL: https://issues.apache.org/jira/browse/FLINK-34151
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


FLINK-33771 added cluster capacity checking for Flink Autoscaling decisions. 
The checks respect the scaling limits of the Kubernetes Cluster Autoscaler. 

We should also support Karpenter-based resource checks, as Karpenter is the 
preferred method of expanding the cluster size in some environments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33771) Add cluster capacity awareness to Autoscaler

2024-01-18 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-33771.

Resolution: Fixed

> Add cluster capacity awareness to Autoscaler
> 
>
> Key: FLINK-33771
> URL: https://issues.apache.org/jira/browse/FLINK-33771
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> To avoid starvation of pipelines when the Kubernetes cluster runs out of 
> resources, new scaling attempts should be stopped. 
> The Rescaling API will probably prevent most of this cases but we will also 
> have to double-check there. 
> For the config-based parallelism overrides, we have pretty good heuristics in 
> the operator to check in Kubernetes for the approximate number of free 
> cluster resources, the max cluster scaleup for the Cluster Autoscaler, and 
> the required scaling costs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary

2024-01-11 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805650#comment-17805650
 ] 

Maximilian Michels commented on FLINK-31977:


I think this is related to FLINK-33993. The name of the configuration option is 
a bit misleading, as effectiveness detection is always on but scalings are only 
blocked when the option is set to {{true}}.

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> ---
>
> Key: FLINK-31977
> URL: https://issues.apache.org/jira/browse/FLINK-31977
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Affects Versions: 1.17.0
>Reporter: Tan Kim
>Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
> AbstractFlinkResource resource,
> JobVertexID vertex,
> Configuration conf,
> Map evaluatedMetrics,
> ScalingSummary lastSummary) {
> double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
> double lastExpectedProcRate =
> 
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
> var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
> // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
> // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
> // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
> // below the threshold, we mark the scaling ineffective.
> double expectedIncrease = lastExpectedProcRate - lastProcRate;
> double actualIncrease = currentProcRate - lastProcRate;
> boolean withinEffectiveThreshold =
> (actualIncrease / expectedIncrease)
> >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
> if (withinEffectiveThreshold) {
> return false;
> }
> var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
> eventRecorder.triggerEvent(
> resource,
> EventRecorder.Type.Normal,
> EventRecorder.Reason.IneffectiveScaling,
> EventRecorder.Component.Operator,
> message);
> if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
> LOG.info(message);
> return true;
> } else {
> return false;
> }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest 
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && 
> lastSummary.isScaledUp()) {
>     if (scaledUp) {
>         
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>             return detectIneffectiveScaleUp(resource, vertex, conf, 
> evaluatedMetrics, lastSummary);
>         } else {
>             return true;
>         }
>     } else {
>         return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
> lastScalingTs);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33993) Ineffective scaling detection events are misleading

2024-01-05 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-33993.
--
Resolution: Fixed

> Ineffective scaling detection events are misleading
> ---
>
> Key: FLINK-33993
> URL: https://issues.apache.org/jira/browse/FLINK-33993
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> When the ineffective scaling decision feature is turned off, events are 
> regenerated which look like this:
> {noformat}
> Skipping further scale up after ineffective previous scale up for 
> 65c763af14a952c064c400d516c25529
> {noformat}
> This is misleading because no action will be taken. It is fair to inform 
> users about ineffective scale up even when the feature is disabled but a 
> different message should be printed to convey that no action will be taken.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33993) Ineffective scaling detection events are misleading

2024-01-04 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33993:
--

 Summary: Ineffective scaling detection events are misleading
 Key: FLINK-33993
 URL: https://issues.apache.org/jira/browse/FLINK-33993
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


When the ineffective scaling decision feature is turned off, events are 
regenerated which look like this:

{noformat}
Skipping further scale up after ineffective previous scale up for 
65c763af14a952c064c400d516c25529
{noformat}

This is misleading because no action will be taken. It is fair to inform users 
about ineffective scale up even when the feature is disabled but a different 
message should be printed to convey that no action will be taken.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801843#comment-17801843
 ] 

Maximilian Michels commented on FLINK-33940:


[~Zhanghao Chen] Even though the factor only affects high parallelism operators 
> 840, I wonder whether we need to leave more room for scaleup. But I don't 
have a strong opinion.

{quote}
IIUC, when the parallelism of one job is very small(it's 1 or 2) and the max 
parallelism is 1024, one subtask will have 1024 keyGroups. From state backend 
side, too many key groups may effect the performance. (This is my concern to 
change it by default in Flink Community.)
{quote}

[~fanrui] I think we need to find out how big the performance impact actually 
is when jumping from 128 to 840 key groups. But 128 may just have been a very 
conservative number.

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space

2024-01-02 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801790#comment-17801790
 ] 

Maximilian Michels commented on FLINK-33940:


[~Zhanghao Chen] Thank you for the proposal. I agree with using highly 
composite numbers, as this will provide more flexibility to the autoscaler. I'm 
not sure about the {{operatorParallelism * 5}}. What is the rational for 
selecting this factor? Why not {{*10}}? 

> Update the auto-derivation rule of max parallelism for enlarged upscaling 
> space
> ---
>
> Key: FLINK-33940
> URL: https://issues.apache.org/jira/browse/FLINK-33940
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Core
>Reporter: Zhanghao Chen
>Assignee: Zhanghao Chen
>Priority: Major
>
> *Background*
> The choice of the max parallelism of an stateful operator is important as it 
> limits the upper bound of the parallelism of the opeartor while it can also 
> add extra overhead when being set too large. Currently, the max parallelism 
> of an opeartor is either fixed to a value specified by API core / pipeline 
> option or auto-derived with the following rules:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}}
> *Problem*
> Recently, the elasticity of Flink jobs is becoming more and more valued by 
> users. The current auto-derived max parallelism was introduced a time time 
> ago and only allows the operator parallelism to be roughly doubled, which is 
> not desired for elasticity. Setting an max parallelism manually may not be 
> desired as well: users may not have the sufficient expertise to select a good 
> max-parallelism value.
> *Proposal*
> Update the auto-derivation rule of max parallelism to derive larger max 
> parallelism for better elasticity experience out of the box. A candidate is 
> as follows:
> {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), 
> 32767)}}
> Looking forward to your opinions on this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33795) Add new config to forbid autoscaling in certain periods of a day

2024-01-02 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-33795.

Fix Version/s: kubernetes-operator-1.8.0
 Assignee: yonghua jian
   Resolution: Fixed

> Add new config to forbid autoscaling in certain periods of a day
> 
>
> Key: FLINK-33795
> URL: https://issues.apache.org/jira/browse/FLINK-33795
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yonghua jian
>Assignee: yonghua jian
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Add new config to forbid autoscaling in certain periods of a day so that we 
> keep flink job unaffected by autoscaling's job restart behavior during this 
> periods



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33917) IllegalArgumentException: hostname can't be null

2024-01-02 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-33917.

Fix Version/s: kubernetes-operator-1.8.0
 Assignee: Tom
   Resolution: Fixed

> IllegalArgumentException: hostname can't be null
> 
>
> Key: FLINK-33917
> URL: https://issues.apache.org/jira/browse/FLINK-33917
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Tom
>Assignee: Tom
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> In certain scenarios, if the hostname contains certain characters it will 
> throw an exception when it tries to initialize the `InetSocketAddress`
>  
> {code:java}
> java.lang.IllegalArgumentException: hostname can't be null    at 
> java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>     at 
> java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code}
>  
> [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236]
>  
> {code:java}
>        @Override
>     public boolean isJobManagerPortReady(Configuration config) {
>         final URI uri;
>         try (var clusterClient = getClusterClient(config)) {
>             uri = URI.create(clusterClient.getWebInterfaceURL());
>         } catch (Exception ex) {
>             throw new FlinkRuntimeException(ex);
>         }
>         SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), 
> uri.getPort());
>         Socket socket = new Socket();
>         try {
>             socket.connect(socketAddress, 1000);
>             socket.close();
>             return true;
>         } catch (IOException e) {
>             return false;
>         }
>     }
>   {code}
>  
> Here's a simple test to reproduce
>  
> URL
> {code:java}
> @ParameterizedTest
> @ValueSource(
> strings = {"http://127.0.0.1:8081";, "http://123-dev:8081";, 
> "http://dev-test.abc:8081";, "http://dev-test.1a:8081";, 
> "http://dev-test.abc01:8081"})
> void testURLAddresses(String inputAddress) {
> assertDoesNotThrow(
> () -> {
> final URL url = new URL(inputAddress);
> new InetSocketAddress(url.getHost(), url.getPort());
> });
> } {code}
>  
> URI
>  
> {code:java}
> @ParameterizedTest
> @ValueSource(
> strings = {"http://127.0.0.1:8081";, "http://123-dev:8081";, 
> "http://dev-test.abc:8081";, "http://dev-test.1a:8081";, 
> "http://dev-test.abc01:8081"})
> void testURIAddresses(String inputAddress) {
> assertDoesNotThrow(
> () -> {
> final URI uri = new URI(inputAddress);
> new InetSocketAddress(uri.getHost(), uri.getPort());
> });
> }  {code}
>  
> All test cases past except for  "http://dev-test.1a:8081"; which is a valid 
> flink host url, but not a valid URI
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33917) IllegalArgumentException: hostname can't be null

2023-12-22 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799701#comment-17799701
 ] 

Maximilian Michels commented on FLINK-33917:


{{new URI("123-test").getHost()}} returns null. 

I’m not 100% sure this is a JDK bug. There may be some ambiguity when resolving 
URIs without all spec parts. But let’s see what upstream says. 

> IllegalArgumentException: hostname can't be null
> 
>
> Key: FLINK-33917
> URL: https://issues.apache.org/jira/browse/FLINK-33917
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Tom
>Priority: Major
>  Labels: pull-request-available
>
> In certain scenarios, if the hostname contains certain characters it will 
> throw an exception when it tries to initialize the `InetSocketAddress`
>  
> {code:java}
> java.lang.IllegalArgumentException: hostname can't be null    at 
> java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>     at 
> java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code}
>  
> [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236]
>  
> {code:java}
>        @Override
>     public boolean isJobManagerPortReady(Configuration config) {
>         final URI uri;
>         try (var clusterClient = getClusterClient(config)) {
>             uri = URI.create(clusterClient.getWebInterfaceURL());
>         } catch (Exception ex) {
>             throw new FlinkRuntimeException(ex);
>         }
>         SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), 
> uri.getPort());
>         Socket socket = new Socket();
>         try {
>             socket.connect(socketAddress, 1000);
>             socket.close();
>             return true;
>         } catch (IOException e) {
>             return false;
>         }
>     }
>   {code}
>  
> Here's a simple test to reproduce
>  
> URL
> {code:java}
> @ParameterizedTest
> @ValueSource(
> strings = {"http://127.0.0.1:8081";, "http://123-dev:8081";, 
> "http://dev-test.abc:8081";, "http://dev-test.1a:8081";, 
> "http://dev-test.abc01:8081"})
> void testURLAddresses(String inputAddress) {
> assertDoesNotThrow(
> () -> {
> final URL url = new URL(inputAddress);
> new InetSocketAddress(url.getHost(), url.getPort());
> });
> } {code}
>  
> URI
>  
> {code:java}
> @ParameterizedTest
> @ValueSource(
> strings = {"http://127.0.0.1:8081";, "http://123-dev:8081";, 
> "http://dev-test.abc:8081";, "http://dev-test.1a:8081";, 
> "http://dev-test.abc01:8081"})
> void testURIAddresses(String inputAddress) {
> assertDoesNotThrow(
> () -> {
> final URI uri = new URI(inputAddress);
> new InetSocketAddress(uri.getHost(), uri.getPort());
> });
> }  {code}
>  
> All test cases past except for  "http://dev-test.1a:8081"; which is a valid 
> flink host url, but not a valid URI
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33917) IllegalArgumentException: hostname can't be null

2023-12-21 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799352#comment-17799352
 ] 

Maximilian Michels commented on FLINK-33917:


The description doesn’t describe under which circumstances the host name can be 
parsed as null. One example is {{new URI("123-test")}} which will return a null 
host name because the string is parsed as a URI path. Flink itself returns a 
stringified URL object. So using URL instead works fine. 

> IllegalArgumentException: hostname can't be null
> 
>
> Key: FLINK-33917
> URL: https://issues.apache.org/jira/browse/FLINK-33917
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Reporter: Tom
>Priority: Major
>  Labels: pull-request-available
>
> In certain scenarios, if the hostname contains certain characters it will 
> throw an exception when it tries to initialize the `InetSocketAddress`
>  
> {code:java}
> java.lang.IllegalArgumentException: hostname can't be null    at 
> java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149)
>     at 
> java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code}
>  
> [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236]
>  
> {code:java}
>        @Override
>     public boolean isJobManagerPortReady(Configuration config) {
>         final URI uri;
>         try (var clusterClient = getClusterClient(config)) {
>             uri = URI.create(clusterClient.getWebInterfaceURL());
>         } catch (Exception ex) {
>             throw new FlinkRuntimeException(ex);
>         }
>         SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), 
> uri.getPort());
>         Socket socket = new Socket();
>         try {
>             socket.connect(socketAddress, 1000);
>             socket.close();
>             return true;
>         } catch (IOException e) {
>             return false;
>         }
>     }
>   {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33770) Autoscaler logs are full of deprecated key warnings

2023-12-11 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-33770.

Resolution: Fixed

> Autoscaler logs are full of deprecated key warnings
> ---
>
> Key: FLINK-33770
> URL: https://issues.apache.org/jira/browse/FLINK-33770
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> We moved all autoscaler configuration from 
> {{kubernetes.operator.job.autoscaler.*}} to {{job.autoscaler.*}}. 
> With the latest release, the logs are full with logs like this:
> {noformat}
> level:  WARN 
> logger:  org.apache.flink.configuration.Configuration 
> message:  Config uses deprecated configuration key 
> 'kubernetes.operator.job.autoscaler.target.utilization' instead of proper key 
> 'job.autoscaler.target.utilization' 
> {noformat}
> The reason is that the configuration is loaded for every reconciliation.
> This configuration is already widely adopted across hundreds of pipelines. I 
> propose to remove the deprecation from the config keys and make them 
> "fallback" keys instead which removes the deprecation warning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31400) Add autoscaler integration for Iceberg source

2023-12-07 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned FLINK-31400:
--

Assignee: Mason Chen

> Add autoscaler integration for Iceberg source
> -
>
> Key: FLINK-31400
> URL: https://issues.apache.org/jira/browse/FLINK-31400
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Mason Chen
>Priority: Major
>
> A very critical part in the scaling algorithm is setting the source 
> processing rate correctly such that the Flink pipeline can keep up with the 
> ingestion rate. The autoscaler does that by looking at the {{pendingRecords}} 
> Flink source metric. Even if that metric is not available, the source can 
> still be sized according to the busyTimeMsPerSecond metric, but there will be 
> no backlog information available. For Kafka, the autoscaler also determines 
> the number of partitions to avoid scaling higher than the maximum number of 
> partitions.
> In order to support a wider range of use cases, we should investigate an 
> integration with the Iceberg source. As far as I know, it does not expose the 
> pedingRecords metric, nor does the autoscaler know about other constraints, 
> e.g. the maximum number of open files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn

2023-12-07 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-31502:
---
Fix Version/s: kubernetes-operator-1.8.0
   (was: kubernetes-operator-1.5.0)

> Limit the number of concurrent scale operations to reduce cluster churn
> ---
>
> Key: FLINK-31502
> URL: https://issues.apache.org/jira/browse/FLINK-31502
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Until we move to using the upcoming Rescale API which recycles pods, we need 
> to be mindful with how many deployments we scale at the same time because 
> each of them is going to give up all its pods and require the new number of 
> required pods. 
> This can cause churn in the cluster and temporary lead to "unallocatable" 
> pods which triggers the k8s cluster autoscaler to add more cluster nodes. 
> That is often not desirable because the actual required resources after the 
> scaling have been settled, are lower.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33771) Add cluster capacity awareness to Autoscaler

2023-12-07 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33771:
---
Description: 
To avoid starvation of pipelines when the Kubernetes cluster runs out of 
resources, new scaling attempts should be stopped. 

The Rescaling API will probably prevent most of this cases but we will also 
have to double-check there. 

For the config-based parallelism overrides, we have pretty good heuristics in 
the operator to check in Kubernetes for the approximate number of free cluster 
resources, the max cluster scaleup for the Cluster Autoscaler, and the required 
scaling costs.

  was:
To avoid starvation of pipelines when the Kubernetes cluster runs out of 
resources, new scaling attempts should be stopped. 

The Rescaling API will probably prevent most of this cases but we will also 
have to double-check there. 

For the config-based parallelism overrides, we have pretty good heuristics in 
the operator to check in Kubernetes for the approximate number of free cluster 
resources and the required scaling costs.


> Add cluster capacity awareness to Autoscaler
> 
>
> Key: FLINK-33771
> URL: https://issues.apache.org/jira/browse/FLINK-33771
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: kubernetes-operator-1.8.0
>
>
> To avoid starvation of pipelines when the Kubernetes cluster runs out of 
> resources, new scaling attempts should be stopped. 
> The Rescaling API will probably prevent most of this cases but we will also 
> have to double-check there. 
> For the config-based parallelism overrides, we have pretty good heuristics in 
> the operator to check in Kubernetes for the approximate number of free 
> cluster resources, the max cluster scaleup for the Cluster Autoscaler, and 
> the required scaling costs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33773) Add fairness to scaling decisions

2023-12-07 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33773:
--

 Summary: Add fairness to scaling decisions
 Key: FLINK-33773
 URL: https://issues.apache.org/jira/browse/FLINK-33773
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Deployment / Kubernetes
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The current scaling logic is inherently unfair. In a scenario of heavy backlog, 
whichever pipelines come first, they will end up taking most of the resources. 
Some kind of fairness should be introduced, for example:

* Cap the max number of resulting pods at a % of the cluster resources
* Allow scale up round-robin across all pipelines



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33771) Add cluster capacity awareness to Autoscaler

2023-12-07 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33771:
--

 Summary: Add cluster capacity awareness to Autoscaler
 Key: FLINK-33771
 URL: https://issues.apache.org/jira/browse/FLINK-33771
 Project: Flink
  Issue Type: New Feature
  Components: Autoscaler, Kubernetes Operator
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


To avoid starvation of pipelines when the Kubernetes cluster runs out of 
resources, new scaling attempts should be stopped. 

The Rescaling API will probably prevent most of this cases but we will also 
have to double-check there. 

For the config-based parallelism overrides, we have pretty good heuristics in 
the operator to check in Kubernetes for the approximate number of free cluster 
resources and the required scaling costs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn

2023-12-07 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reopened FLINK-31502:


Reopening because this is an actual issue.

> Limit the number of concurrent scale operations to reduce cluster churn
> ---
>
> Key: FLINK-31502
> URL: https://issues.apache.org/jira/browse/FLINK-31502
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.5.0
>
>
> Until we move to using the upcoming Rescale API which recycles pods, we need 
> to be mindful with how many deployments we scale at the same time because 
> each of them is going to give up all its pods and require the new number of 
> required pods. 
> This can cause churn in the cluster and temporary lead to "unallocatable" 
> pods which triggers the k8s cluster autoscaler to add more cluster nodes. 
> That is often not desirable because the actual required resources after the 
> scaling have been settled, are lower.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33770) Autoscaler logs are full of deprecated key warnings

2023-12-07 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33770:
--

 Summary: Autoscaler logs are full of deprecated key warnings
 Key: FLINK-33770
 URL: https://issues.apache.org/jira/browse/FLINK-33770
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


We moved all autoscaler configuration from 
{{kubernetes.operator.job.autoscaler.*}} to {{job.autoscaler.*}}. 

With the latest release, the logs are full with logs like this:

{noformat}
level:  WARN 
logger:  org.apache.flink.configuration.Configuration 
message:  Config uses deprecated configuration key 
'kubernetes.operator.job.autoscaler.target.utilization' instead of proper key 
'job.autoscaler.target.utilization' 
{noformat}

The reason is that the configuration is loaded for every reconciliation.

This configuration is already widely adopted across hundreds of pipelines. I 
propose to remove the deprecation from the config keys and make them "fallback" 
keys instead which removes the deprecation warning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33710) Autoscaler redeploys pipeline for a NOOP parallelism change

2023-12-06 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793800#comment-17793800
 ] 

Maximilian Michels commented on FLINK-33710:


Additional fix via ca1d8472d1a1e817268950dae079592581fa5b8f to prevent any 
existing deployments to get affected.

> Autoscaler redeploys pipeline for a NOOP parallelism change
> ---
>
> Key: FLINK-33710
> URL: https://issues.apache.org/jira/browse/FLINK-33710
> Project: Flink
>  Issue Type: Bug
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> The operator supports two modes to apply autoscaler changes:
> # Use the internal Flink config {{pipeline.jobvertex-parallelism-overrides}} 
> # Make use of Flink's Rescale API 
> For (1), a string has to be generated for the Flink config with the actual 
> overrides. This string has to be deterministic for a given map. But it is not.
> Consider the following observed log:
> {noformat}
>   >>> Event  | Info| SPECCHANGED | SCALE change(s) detected (Diff: 
> FlinkDeploymentSpec[flinkConfiguration.pipeline.jobvertex-parallelism-overrides
>  : 
> 92542d1280187bd464274368a5f86977:3,9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,1652184ffd0522859c7840a24936847c:1
>  -> 
> 9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,92542d1280187bd464274368a5f86977:3,1652184ffd0522859c7840a24936847c:1]),
>  starting reconciliation. 
> {noformat}
> The overrides are identical but the order is different which triggers a 
> redeploy. This does not seem to happen often but some deterministic string 
> generation (e.g. sorting by key) is required to prevent any NOOP updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33522) Savepoint upgrade mode fails despite the savepoint succeeding

2023-12-01 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33522:
---
Fix Version/s: kubernetes-operator-1.8.0
   (was: kubernetes-operator-1.7.0)

> Savepoint upgrade mode fails despite the savepoint succeeding
> -
>
> Key: FLINK-33522
> URL: https://issues.apache.org/jira/browse/FLINK-33522
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.6.1
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Under certain circumstances, savepoint creation can succeed but the job fails 
> afterwards. One example is when there are messages being distributed by the 
> source coordinator to finished tasks. This is possibly a Flink bug although 
> it's not clear yet how to solve the issue.
> After the savepoint succeeded Flink fails the job like this:
> {noformat}
> Source (1/2) 
> (cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
> switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
> {noformat}
> {noformat}
> An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
> task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
> targetTask: Source (1/2) - execution #0
> Caused by:
> org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
> is not running, but in state FINISHED
>at 
> org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
>at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
> {noformat}
> Inside the operator this is processed as:
> {noformat}
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
>  A savepoint has been created at: s3://..., but the corresponding job 
> 1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
> consistent, but might have uncommitted transactions. If you want to commit 
> the transaction please restart a job from this savepoint. 
>   
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
>   
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
>   
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
>  
>   
> org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
>  
>   
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
>  
>   
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
>  
>  
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
>   
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
>  
>   
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>  
>   
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>  
>   
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>  
>   
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  
>   
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  
>   java.la

[jira] [Commented] (FLINK-33522) Savepoint upgrade mode fails despite the savepoint succeeding

2023-12-01 Thread Maximilian Michels (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17791975#comment-17791975
 ] 

Maximilian Michels commented on FLINK-33522:


Additional fix required via 51a91049b5f17f8a0b21e11feceb4410a97c50c1.

> Savepoint upgrade mode fails despite the savepoint succeeding
> -
>
> Key: FLINK-33522
> URL: https://issues.apache.org/jira/browse/FLINK-33522
> Project: Flink
>  Issue Type: Bug
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.6.1
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Under certain circumstances, savepoint creation can succeed but the job fails 
> afterwards. One example is when there are messages being distributed by the 
> source coordinator to finished tasks. This is possibly a Flink bug although 
> it's not clear yet how to solve the issue.
> After the savepoint succeeded Flink fails the job like this:
> {noformat}
> Source (1/2) 
> (cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) 
> switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). 
> {noformat}
> {noformat}
> An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering 
> task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', 
> targetTask: Source (1/2) - execution #0
> Caused by:
> org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task 
> is not running, but in state FINISHED
>at 
> org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502)
>at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask
> {noformat}
> Inside the operator this is processed as:
> {noformat}
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException:
>  A savepoint has been created at: s3://..., but the corresponding job 
> 1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is 
> consistent, but might have uncommitted transactions. If you want to commit 
> the transaction please restart a job from this savepoint. 
>   
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
>   
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) 
>   
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319)
>  
>   
> org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121)
>  
>   
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223)
>  
>   
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122)
>  
>  
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163)
>   
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136)
>  
>   
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>  
>   
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>  
>   
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>  
>   
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>  
>   
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>  
>   
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  
>   
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6

[jira] [Created] (FLINK-33710) Autoscaler redeploys pipeline for a NOOP parallelism change

2023-11-30 Thread Maximilian Michels (Jira)
Maximilian Michels created FLINK-33710:
--

 Summary: Autoscaler redeploys pipeline for a NOOP parallelism 
change
 Key: FLINK-33710
 URL: https://issues.apache.org/jira/browse/FLINK-33710
 Project: Flink
  Issue Type: Bug
  Components: Autoscaler, Kubernetes Operator
Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.6.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
 Fix For: kubernetes-operator-1.8.0


The operator supports two modes to apply autoscaler changes:

# Use the internal Flink config {{pipeline.jobvertex-parallelism-overrides}} 
# Make use of Flink's Rescale API 

For (1), a string has to be generated for the Flink config with the actual 
overrides. This string has to be deterministic for a given map. But it is not.

Consider the following observed log:

{noformat}
  >>> Event  | Info| SPECCHANGED | SCALE change(s) detected (Diff: 
FlinkDeploymentSpec[flinkConfiguration.pipeline.jobvertex-parallelism-overrides 
: 
92542d1280187bd464274368a5f86977:3,9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,1652184ffd0522859c7840a24936847c:1
 -> 
9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,92542d1280187bd464274368a5f86977:3,1652184ffd0522859c7840a24936847c:1]),
 starting reconciliation. 
{noformat}

The overrides are identical but the order is different which triggers a 
redeploy. This does not seem to happen often but some deterministic string 
generation (e.g. sorting by key) is required to prevent any NOOP updates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-30593) Determine restart time on the fly for Autoscaler

2023-11-24 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-30593.

Fix Version/s: kubernetes-operator-1.8.0
   Resolution: Fixed

> Determine restart time on the fly for Autoscaler
> 
>
> Key: FLINK-30593
> URL: https://issues.apache.org/jira/browse/FLINK-30593
> Project: Flink
>  Issue Type: New Feature
>  Components: Autoscaler, Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Alexander Fedulov
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.8.0
>
>
> Currently the autoscaler uses a preconfigured restart time for the job. We 
> should dynamically adjust this on the observered restart times for scale 
> operations.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state

2023-11-16 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels closed FLINK-33572.
--
Resolution: Fixed

Implemented via 02840b96ef3116ea95a440af4f945398900d89df and 
ab0ec081eac86619a22632616fa4c01c074ecd33.

> Minimize ConfigMap API operations for autoscaler state
> --
>
> Key: FLINK-33572
> URL: https://issues.apache.org/jira/browse/FLINK-33572
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The newly introduced flush operation after the refactoring the autoscaler 
> interfaces, optimizes the number of write operations to the underlying state 
> store. A couple of further optimizations:
> 1. Any writes should be deferred until flush is called.
> 2. The flush routine should detect whether a write is needed and writing if 
> there are no changes
> 3. Clearing state should only require one write operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state

2023-11-16 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-33572:
---
Release Note: Minimize ConfigMap operations for autoscaler state  (was: 
Implemented via 02840b96ef3116ea95a440af4f945398900d89df and 
ab0ec081eac86619a22632616fa4c01c074ecd33.)

> Minimize ConfigMap API operations for autoscaler state
> --
>
> Key: FLINK-33572
> URL: https://issues.apache.org/jira/browse/FLINK-33572
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The newly introduced flush operation after the refactoring the autoscaler 
> interfaces, optimizes the number of write operations to the underlying state 
> store. A couple of further optimizations:
> 1. Any writes should be deferred until flush is called.
> 2. The flush routine should detect whether a write is needed and writing if 
> there are no changes
> 3. Clearing state should only require one write operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state

2023-11-16 Thread Maximilian Michels (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reopened FLINK-33572:


> Minimize ConfigMap API operations for autoscaler state
> --
>
> Key: FLINK-33572
> URL: https://issues.apache.org/jira/browse/FLINK-33572
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler, Kubernetes Operator
>Affects Versions: kubernetes-operator-1.6.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.7.0
>
>
> The newly introduced flush operation after the refactoring the autoscaler 
> interfaces, optimizes the number of write operations to the underlying state 
> store. A couple of further optimizations:
> 1. Any writes should be deferred until flush is called.
> 2. The flush routine should detect whether a write is needed and writing if 
> there are no changes
> 3. Clearing state should only require one write operation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >