[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling
[ https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17880026#comment-17880026 ] Maximilian Michels commented on FLINK-36018: Thanks Rui! Great new feature. Looking forward to trying it out soon. I haven’t looked at the code yet. How does it behave when the existing grace-period is used? > Support lazy scale down to avoid frequent rescaling > --- > > Key: FLINK-36018 > URL: https://issues.apache.org/jira/browse/FLINK-36018 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.10.0 > > > {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent > lags, and make scaling down insensitive to reduce restart frequency. > h1. Background & Motivation > We enabled autoscaler scaling for a few flink production jobs. It works with > Adaptive Scheduler and Rescale api. > Scaling results: > * The recommended parallelism meets expectations most of the time > * When the source traffic increases, the autoscaler scales up the job in > time to prevent lags. > * When the source traffic decreases, the autoscaler scales down job in time > to save resources > * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a > day (job.autoscaler.metrics.window=15 min by default). > As we all know, the job will be unavailable for a while during the restart > for some reasons: > * Cancel job > * Request resources( > [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states] > is optimizing it) > * Initialize task > * Restore state > * Catch up lag during restart > * etc > *{color:#de350b}Expectations:{color}* > * Scaling up in time to prevent lags. > * Lazy scaling down to reduce downtime and ensure resources can be released > later. > h1. Solution: > * Introduce job.autoscaler.scale-down.interval, the default value could be 1 > hour. > * Replace job.autoscaler.scale-up.grace-period with > job.autoscaler.scale-down.interval > Detailed strategies: > * Record the start time of the first scale-down event for each vertex > separately. For example: > ** vertex1: 2024-08-09 01:35:02 > ** vertex2: 2024-08-09 01:38:02 > * Scaling down will be triggered for some cases: > ** Any vertex needs scale up > *** Job restart cannot be avoided, so trigger scale down for another vertex > as well if needed > *** After scale down, clean up the start time of scale-down. > ** The scale down lazy period for any vertex is coming > *** current time - min(start time for each vertex) > scale-down.lazy-period > *** This means that there is no scaling up during the scaling down lazy > period > Note1: If the recommend parallelism >= current parallelism, the start time of > scale-down will be cleaned up for current vertex. > Note2: The recommended parallelism still comes from the latest 15-minute > metrics.For example: > * The current parallelism of vertex1 is 100, the traffic is decreased at > night. > * 2024-08-09 01:00:00, the recommended parallelism is 60. > ** The start time of scale down is 2024-08-09 01:00:00. > * 2024-08-09 01:15:00, the recommended parallelism is 50. > ** Still within the range of scale down lazy period. > ** Don't update the start time of scale down. > * 2024-08-09 01:31:00, the recommended parallelism is 40. > ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the > recommended parallelism. > ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 > 01:16:00 to 2024-08-09 01:31:00 > Note3: If users set job.autoscaler.scale-down.interval <=0, we scale down > directly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36192) Optimize the logic to make it the common divisor of the partition number of the data source when determining the parallelism of the source task.
[ https://issues.apache.org/jira/browse/FLINK-36192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17878606#comment-17878606 ] Maximilian Michels commented on FLINK-36192: Thanks for reporting this issue [~heigebupahei]! If I understand correctly, we try to fix an issue when num_source_partitions > max_parallelism, because the current logic will effectively forget the number of partitions due to capping it at the vertex maximum parallelism. See [update logic|https://github.com/apache/flink-kubernetes-operator/blob/d8568ae28b13f5cc649a83f174dbc88449f0c602/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L287] and [capping logic|https://github.com/apache/flink-kubernetes-operator/blob/d8568ae28b13f5cc649a83f174dbc88449f0c602/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java#L104]. Due to this prior issue, the rest of the key group alignment logic doesn't work correctly, because it assumes that, for sources, the maximum parallelism is the the number of partitions. See [alignemnt logic|https://github.com/apache/flink-kubernetes-operator/blob/d8568ae28b13f5cc649a83f174dbc88449f0c602/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L299]. To fix the issue, we may have to add an explicit metric for the number of source partitions. I don't really see other options because we need both the maximum parallelism and the number of source partitions to perform the key alignment. We can't lower the maximum parallelism like we have previously done when we want to support use cases where the number of partitions exceeds the maximum parallelism, and we also need to ensure the maximum parallelism is preserved to prevent exceeding it. > Optimize the logic to make it the common divisor of the partition number of > the data source when determining the parallelism of the source task. > > > Key: FLINK-36192 > URL: https://issues.apache.org/jira/browse/FLINK-36192 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Priority: Minor > > *Description:* > We hope that when we know the number of partitions of Kafka data, we can try > our best to make the parallelism of tasks that consume Kafka equal to the > common divisor of the partitions, so that the tasks that are consumed can be > balanced. > > {*}current logic{*}: > Currently, the parallelism of tasks in the autoscaler is determined as > follows: > step1: Calculate the processing rate of the task target and the corresponding > parallelism p1 > step2: Use the currently calculated degree of parallelism and the maximum > degree of parallelism of the operator to calculate, and take out the greatest > common divisor p2 of the maximum degree of parallelism / 2. If p2 < > maxparalleliem / 2, use p2 as the final degree of parallelism. If p2 > > maxparalleliem / 2 then use p1 as the final parallelism > If the task that needs to be judged is a task that consumes Kafka or Pulsar, > the maximum parallelism of the task will be determined first: if the number > of partitions < the maximum parallelism of the current task, then the maximum > parallelism of the current task is the number of partitions of Kafka or > Pulsar. , otherwise the maximum degree of parallelism remains unchanged, so > there are the following situations: > When the number of partitions in kafka or pulsar is less than the maximum > parallelism of the operator > 1. If the parallelism calculated in step 1 partitions/2, then the demand is met and the number of tasks can be balanced. > 2. If the parallelism calculated in step 1 > the number of kafka or pulsar > partitions / 2, use the parallelism calculated in step 1. At this time, the > consumption will become unbalanced. For example, the number of partitions in > kafka is 64, and the expected parallelism calculated in step 1 is If the > degree is 48, the final task parallelism degree is 48 > When the number of partitions in kafka or pulsar is greater than the maximum > parallelism of the operator > Calculate the parallelism completely according to the logic of step 1. For > example, the parallelism of one of my kafka partitions is 200, and the > maximum parallelism of the operator is 128. Then the calculated parallelism > is 2, 4, 8, 16... It is very likely that Kafka cannot be consumed evenly > > {*}expect logic{*}: > * When the number of partitions is less than the maximum parallelism, > determine the number of parallelism of the task as the common divisor of the > number of partitions. > * When the number of partitions is greater than the maximum parallelism, the
[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling
[ https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872861#comment-17872861 ] Maximilian Michels commented on FLINK-36018: Yes, that’s fine with me 👍 > Support lazy scale down to avoid frequent rescaling > --- > > Key: FLINK-36018 > URL: https://issues.apache.org/jira/browse/FLINK-36018 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent > lags, and make scaling down insensitive to reduce restart frequency. > h1. Background & Motivation > We enabled autoscaler scaling for a few flink production jobs. It works with > Adaptive Scheduler and Rescale api. > Scaling results: > * The recommended parallelism meets expectations most of the time > * When the source traffic increases, the autoscaler scales up the job in > time to prevent lags. > * When the source traffic decreases, the autoscaler scales down job in time > to save resources > * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a > day (job.autoscaler.metrics.window=15 min by default). > As we all know, the job will be unavailable for a while during the restart > for some reasons: > * Cancel job > * Request resources( > [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states] > is optimizing it) > * Initialize task > * Restore state > * Catch up lag during restart > * etc > *{color:#de350b}Expectations:{color}* > * Scaling up in time to prevent lags. > * Lazy scaling down to reduce downtime and ensure resources can be released > later. > h1. Solution: > Introduce job.autoscaler.scale-down.lazy-period, the default value could be > 30 min. > Detailed strategies: > * Record the start time of the first scale-down event for each vertex > separately. For example: > ** vertex1: 2024-08-09 01:35:02 > ** vertex2: 2024-08-09 01:38:02 > * Scaling down will be triggered for some cases: > ** Any vertex needs scale up > *** Job restart cannot be avoided, so trigger scale down for another vertex > as well if needed > *** After scale down, clean up the start time of scale-down. > ** The scale down lazy period for any vertex is coming > *** current time - min(start time for each vertex) > scale-down.lazy-period > *** This means that there is no scaling up during the scaling down lazy > period > Note1: If the recommend parallelism >= current parallelism, the start time of > scale-down will be cleaned up for current vertex. > Note2: The recommended parallelism still comes from the latest 15-minute > metrics.For example: > * The current parallelism of vertex1 is 100, the traffic is decreased at > night. > * 2024-08-09 01:00:00, the recommended parallelism is 60. > * > ** The start time of scale down is 2024-08-09 01:00:00. > * 2024-08-09 01:15:00, the recommended parallelism is 50. > ** Still within the range of scale down lazy period. > ** Don't update the start time of scale down. > * 2024-08-09 01:31:00, the recommended parallelism is 40. > ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the > recommended parallelism. > ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 > 01:16:00 to 2024-08-09 01:31:00 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling
[ https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872850#comment-17872850 ] Maximilian Michels commented on FLINK-36018: How about autoscaler.scale-down.schedule-interval ? A value of zero would mean immediate scale down execution. > Support lazy scale down to avoid frequent rescaling > --- > > Key: FLINK-36018 > URL: https://issues.apache.org/jira/browse/FLINK-36018 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent > lags, and make scaling down insensitive to reduce restart frequency. > h1. Background & Motivation > We enabled autoscaler scaling for a few flink production jobs. It works with > Adaptive Scheduler and Rescale api. > Scaling results: > * The recommended parallelism meets expectations most of the time > * When the source traffic increases, the autoscaler scales up the job in > time to prevent lags. > * When the source traffic decreases, the autoscaler scales down job in time > to save resources > * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a > day (job.autoscaler.metrics.window=15 min by default). > As we all know, the job will be unavailable for a while during the restart > for some reasons: > * Cancel job > * Request resources( > [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states] > is optimizing it) > * Initialize task > * Restore state > * Catch up lag during restart > * etc > *{color:#de350b}Expectations:{color}* > * Scaling up in time to prevent lags. > * Lazy scaling down to reduce downtime and ensure resources can be released > later. > h1. Solution: > Introduce job.autoscaler.scale-down.lazy-period, the default value could be > 30 min. > Detailed strategies: > * Record the start time of the first scale-down event for each vertex > separately. For example: > ** vertex1: 2024-08-09 01:35:02 > ** vertex2: 2024-08-09 01:38:02 > * Scaling down will be triggered for some cases: > ** Any vertex needs scale up > *** Job restart cannot be avoided, so trigger scale down for another vertex > as well if needed > *** After scale down, clean up the start time of scale-down. > ** The scale down lazy period for any vertex is coming > *** current time - min(start time for each vertex) > scale-down.lazy-period > *** This means that there is no scaling up during the scaling down lazy > period > Note1: If the recommend parallelism >= current parallelism, the start time of > scale-down will be cleaned up for current vertex. > Note2: The recommended parallelism still comes from the latest 15-minute > metrics.For example: > * The current parallelism of vertex1 is 100, the traffic is decreased at > night. > * 2024-08-09 01:00:00, the recommended parallelism is 60. > * > ** The start time of scale down is 2024-08-09 01:00:00. > * 2024-08-09 01:15:00, the recommended parallelism is 50. > ** Still within the range of scale down lazy period. > ** Don't update the start time of scale down. > * 2024-08-09 01:31:00, the recommended parallelism is 40. > ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the > recommended parallelism. > ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 > 01:16:00 to 2024-08-09 01:31:00 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling
[ https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872848#comment-17872848 ] Maximilian Michels commented on FLINK-36018: Name sounds good to me. Slight preference for *.interval over *.delay because a delay would imply that we have a fixed delay between deciding to scale down and the execution, but it is rather fixed schedule at a regular interval. > Support lazy scale down to avoid frequent rescaling > --- > > Key: FLINK-36018 > URL: https://issues.apache.org/jira/browse/FLINK-36018 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent > lags, and make scaling down insensitive to reduce restart frequency. > h1. Background & Motivation > We enabled autoscaler scaling for a few flink production jobs. It works with > Adaptive Scheduler and Rescale api. > Scaling results: > * The recommended parallelism meets expectations most of the time > * When the source traffic increases, the autoscaler scales up the job in > time to prevent lags. > * When the source traffic decreases, the autoscaler scales down job in time > to save resources > * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a > day (job.autoscaler.metrics.window=15 min by default). > As we all know, the job will be unavailable for a while during the restart > for some reasons: > * Cancel job > * Request resources( > [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states] > is optimizing it) > * Initialize task > * Restore state > * Catch up lag during restart > * etc > *{color:#de350b}Expectations:{color}* > * Scaling up in time to prevent lags. > * Lazy scaling down to reduce downtime and ensure resources can be released > later. > h1. Solution: > Introduce job.autoscaler.scale-down.lazy-period, the default value could be > 30 min. > Detailed strategies: > * Record the start time of the first scale-down event for each vertex > separately. For example: > ** vertex1: 2024-08-09 01:35:02 > ** vertex2: 2024-08-09 01:38:02 > * Scaling down will be triggered for some cases: > ** Any vertex needs scale up > *** Job restart cannot be avoided, so trigger scale down for another vertex > as well if needed > *** After scale down, clean up the start time of scale-down. > ** The scale down lazy period for any vertex is coming > *** current time - min(start time for each vertex) > scale-down.lazy-period > *** This means that there is no scaling up during the scaling down lazy > period > Note1: If the recommend parallelism >= current parallelism, the start time of > scale-down will be cleaned up for current vertex. > Note2: The recommended parallelism still comes from the latest 15-minute > metrics.For example: > * The current parallelism of vertex1 is 100, the traffic is decreased at > night. > * 2024-08-09 01:00:00, the recommended parallelism is 60. > * > ** The start time of scale down is 2024-08-09 01:00:00. > * 2024-08-09 01:15:00, the recommended parallelism is 50. > ** Still within the range of scale down lazy period. > ** Don't update the start time of scale down. > * 2024-08-09 01:31:00, the recommended parallelism is 40. > ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the > recommended parallelism. > ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 > 01:16:00 to 2024-08-09 01:31:00 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36022) When scaling.enabled =false, adjust the values of some parameters to provide bette recommendation values.
[ https://issues.apache.org/jira/browse/FLINK-36022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872279#comment-17872279 ] Maximilian Michels commented on FLINK-36022: The idea of having the option to disable scaling execution was to have a dry-run mode to test the behavior of the autoscaler with its current configuration. I understand the idea behind changing certain configuration values in the dry-run mode, but it would also mean it’s not a pure dry-run mode anymore. That’s why I would suggest to add a different setting for this kind of behavior, but perhaps it’s easiest to just set the desired values alongside with the scaling=false setting. On another note, I’m not sure setting the rescale time to zero would be correct even in a dedicated new mode for suggestions. Rescaling will always take some time and not accounting for it could void the scaling suggestion we print. > When scaling.enabled =false, adjust the values of some parameters to > provide bette recommendation values. > - > > Key: FLINK-36022 > URL: https://issues.apache.org/jira/browse/FLINK-36022 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Priority: Major > > h1. Background > We have enabled AUTOSCALER in some scenarios, but we have not enabled > job.autoscaler.scaling.enabled because we only want to use AUTOSCALER to > obtain resource recommendations for tasks, but some parameters can cause > these recommendations to be inaccurate. > example : > * job.autoscaler.scale-down.max-factor > If set to 0.5, it means that the vertex can be reduced to up to 50% of the > original value during scaling. If we do not turn on the > job.autoscaler.scaling.enabled parameter, then the recommended value here > will only be 100 for a vertex with 200 parallelism. But in fact, this may > only require 50 or even lower resources during low periods. > * job.autoscaler.restart.time > This parameter will cause the restart event to be used to calculate the > resources required to chase data during expansion, resulting in the > recommended resources being too large. However, if > job.autoscaler.scaling.enabled =false, the restart time will be 0 > > h1. Solution: > > When job.autoscaler.scaling.enabled = false, actively modify the above > parameters job.autoscaler.scale-down.max-factor=1, > job.autoscaler.restart.time=0 > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling
[ https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872269#comment-17872269 ] Maximilian Michels commented on FLINK-36018: Thanks for clarifying! I think what you propose makes sense. It improves on the original idea of the grace period by optimizing the scheduling of the downscale operations. > Support lazy scale down to avoid frequent rescaling > --- > > Key: FLINK-36018 > URL: https://issues.apache.org/jira/browse/FLINK-36018 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > {*}{color:#de350b}Core idea{color}{*}: Make scaling up sensitive to prevent > lags, and make scaling down insensitive to reduce restart frequency. > h1. Background & Motivation > We enabled autoscaler scaling for a few flink production jobs. It works with > Adaptive Scheduler and Rescale api. > Scaling results: > * The recommended parallelism meets expectations most of the time > * When the source traffic increases, the autoscaler scales up the job in > time to prevent lags. > * When the source traffic decreases, the autoscaler scales down job in time > to save resources > * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a > day (job.autoscaler.metrics.window=15 min by default). > As we all know, the job will be unavailable for a while during the restart > for some reasons: > * Cancel job > * Request resources( > [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states] > is optimizing it) > * Initialize task > * Restore state > * Catch up lag during restart > * etc > *{color:#de350b}Expectations:{color}* > * Scaling up in time to prevent lags. > * Lazy scaling down to reduce downtime and ensure resources can be released > later. > h1. Solution: > Introduce job.autoscaler.scale-down.lazy-period, the default value could be > 30 min. > Detailed strategies: > * Record the start time of the first scale-down event for each vertex > separately. For example: > ** vertex1: 2024-08-09 01:35:02 > ** vertex2: 2024-08-09 01:38:02 > * Scaling down will be triggered for some cases: > ** Any vertex needs scale up > *** Job restart cannot be avoided, so trigger scale down for another vertex > as well if needed > *** After scale down, clean up the start time of scale-down. > ** The scale down lazy period for any vertex is coming > *** current time - min(start time for each vertex) > scale-down.lazy-period > *** This means that there is no scaling up during the scaling down lazy > period > Note1: If the recommend parallelism >= current parallelism, the start time of > scale-down will be cleaned up for current vertex. > Note2: The recommended parallelism still comes from the latest 15-minute > metrics.For example: > * The current parallelism of vertex1 is 100, the traffic is decreased at > night. > * 2024-08-09 01:00:00, the recommended parallelism is 60. > * > ** The start time of scale down is 2024-08-09 01:00:00. > * 2024-08-09 01:15:00, the recommended parallelism is 50. > ** Still within the range of scale down lazy period. > ** Don't update the start time of scale down. > * 2024-08-09 01:31:00, the recommended parallelism is 40. > ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the > recommended parallelism. > ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 > 01:16:00 to 2024-08-09 01:31:00 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36018) Support lazy scale down to avoid frequent rescaling
[ https://issues.apache.org/jira/browse/FLINK-36018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872208#comment-17872208 ] Maximilian Michels commented on FLINK-36018: There may already be an option which more or less does what you propose: job.autoscaler.scale-up.grace-period See https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/ > Support lazy scale down to avoid frequent rescaling > --- > > Key: FLINK-36018 > URL: https://issues.apache.org/jira/browse/FLINK-36018 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > h1. Background & Motivation > We enabled autoscaler scaling for a few flink production jobs. It works with > Adaptive Scheduler and Rescale api. > Scaling results: > * The recommended parallelism meets expectations most of the time > * When the source traffic increases, the autoscaler scales up the job in > time to prevent lags. > * When the source traffic decreases, the autoscaler scales down job in time > to save resources > * {color:#de350b}*Pain point:*{color} Each job rescales more than 20 times a > day (job.autoscaler.metrics.window=15 min by default). > As we all know, the job will be unavailable for a while during the restart > for some reasons: > * Cancel job > * Request resources( > [FLIP-472|https://cwiki.apache.org/confluence/display/FLINK/FLIP-472%3A+Aligning+timeout+logic+in+the+AdaptiveScheduler%27s+WaitingForResources+and+Executing+states] > is optimizing it) > * Initialize task > * Restore state > * Catch up lag during restart > * etc > *{color:#de350b}Expectations:{color}* > * Scaling up in time to prevent lags. > * Lazy scaling down to reduce downtime and ensure resources can be released > later. > h1. Solution: > Introduce job.autoscaler.scale-down.lazy-period, the default value could be > 30 min. > Detailed strategies: > * Record the start time of the first scale-down event for each vertex > separately. For example: > ** vertex1: 2024-08-09 01:35:02 > ** vertex2: 2024-08-09 01:38:02 > * Scaling down will be triggered for some cases: > ** Any vertex needs scale up > *** Job restart cannot be avoided, so trigger scale down for another vertex > as well if needed > *** After scale down, clean up the start time of scale-down. > ** The scale down lazy period for any vertex is coming > *** current time - min(start time for each vertex) > scale-down.lazy-period > *** This means that there is no scaling up during the scaling down lazy > period > Note1: If the recommend parallelism >= current parallelism, the start time of > scale-down will be cleaned up for current vertex. > Note2: The recommended parallelism still comes from the latest 15-minute > metrics.For example: > * The current parallelism of vertex1 is 100, the traffic is decreased at > night. > * 2024-08-09 01:00:00, the recommended parallelism is 60. > ** The start time of scale down is 2024-08-09 01:00:00. > * 2024-08-09 01:15:00, the recommended parallelism is 50. > ** Still within the range of scale down lazy period. > ** Don't update the start time of scale down. > * 2024-08-09 01:31:00, the recommended parallelism is 40. > ** Outside of scale-down.lazy-period, trigger rescale, and use 40 as the > recommended parallelism. > ** The job.autoscaler.metrics.window is 15 min, so metrics from 2024-08-09 > 01:16:00 to 2024-08-09 01:31:00 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868752#comment-17868752 ] Maximilian Michels commented on FLINK-35285: To add, key group alignment ensures that we hit the target processing rate because we achieve the best possible parallel processing (assuming there are no hot keys). If the key groups or source partitions are not aligned, the processing may be limited by the vertices which have the most keys/source partitions. In the case of a 25% scale down, this could mean we are effectively scaled down by 50%, as Gyula showed in his example. That is not what we wanted in the first place. On another note, key group alignment also is beneficial because it keeps the parallelism changes somewhat stable between the different divisors of the max parallelism. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but something to ensure it can make at least some progress. There is > another test that now fails, but just to illustrate the point: > {code:java} > for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) > { > if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p > > currentParallelism)) { > if (maxParallelism % p == 0) { > return p; > } > } > } {code} > > Perhaps this is by design and not a bug, but total failure to scale down in > order to keep optimized key groups does not seem ideal. > > Key group optimization block: > [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35285) Autoscaler key group optimization can interfere with scale-down.max-factor
[ https://issues.apache.org/jira/browse/FLINK-35285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868546#comment-17868546 ] Maximilian Michels commented on FLINK-35285: Hi! Great discussion. Thanks for lopping me in! I think it makes sense to add an option to disable the key group alignment logic. I created a JIRA for this a while ago: FLINK-32120. Key group alignment is an important pillar to ensure stability of the algorithm. I understand your concerns about sticky parallelisms, but it is likely that the number of scalings will increase without key group alignment, at least in some situations. Estimating the partition skew in the absence of key group alignment and compensating for it by only a slightly higher parallelism, is an interesting idea that we might want to test. I was contemplating something like that (see: FLINK-32119). We should make sure though the adjusted parallelism still matches the desired target utilization and its throughput. Without this, the resulting scaling decision may turn out to be less stable because the lower throughput bound will be hit faster. In short, I think we can independently work on: # FLINK-32120 # FLINK-32119 (or use this issue): Add a new skew estimation logic which adjusts the parallelism just enough to match the desired target utilization for scenarios where we don’t use the key group alignment hammer. This would also apply to scenarios where key group alignment is not possible. > Autoscaler key group optimization can interfere with scale-down.max-factor > -- > > Key: FLINK-35285 > URL: https://issues.apache.org/jira/browse/FLINK-35285 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Trystan >Priority: Minor > > When setting a less aggressive scale down limit, the key group optimization > can prevent a vertex from scaling down at all. It will hunt from target > upwards to maxParallelism/2, and will always find currentParallelism again. > > A simple test trying to scale down from a parallelism of 60 with a > scale-down.max-factor of 0.2: > {code:java} > assertEquals(48, JobVertexScaler.scale(60, inputShipStrategies, 360, .8, 8, > 360)); {code} > > It seems reasonable to make a good attempt to spread data across subtasks, > but not at the expense of total deadlock. The problem is that during scale > down it doesn't actually ensure that newParallelism will be < > currentParallelism. The only workaround is to set a scale down factor large > enough such that it finds the next lowest divisor of the maxParallelism. > > Clunky, but something to ensure it can make at least some progress. There is > another test that now fails, but just to illustrate the point: > {code:java} > for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) > { > if ((scaleFactor < 1 && p < currentParallelism) || (scaleFactor > 1 && p > > currentParallelism)) { > if (maxParallelism % p == 0) { > return p; > } > } > } {code} > > Perhaps this is by design and not a bug, but total failure to scale down in > order to keep optimized key groups does not seem ideal. > > Key group optimization block: > [https://github.com/apache/flink-kubernetes-operator/blob/fe3d24e4500d6fcaed55250ccc816546886fd1cf/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L296C1-L303C10] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35814) Don't scale up continuously when the throughout cannot be increased after scaling up
[ https://issues.apache.org/jira/browse/FLINK-35814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865084#comment-17865084 ] Maximilian Michels commented on FLINK-35814: I experienced false positives which led to pipelines building huge backlog. I think the feature needs to be made more robust before being enabled by default. > Don't scale up continuously when the throughout cannot be increased after > scaling up > > > Key: FLINK-35814 > URL: https://issues.apache.org/jira/browse/FLINK-35814 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > h2. Motivation > Currently, the parallelism will be increased continuously when some cases > happen. Such as: data skew, bottleneck occurs in other system. > In these cases, the throughout(processing rate) cannot be increased even if > we increase the parallelism. > h2. Solution > We don't need to scale up the task continuously when the throughout cannot be > increased after scaling up. > And it's better to trigger some events to reminder users fix the issue > manually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35489) Metaspace size can be too little after autotuning change memory setting
[ https://issues.apache.org/jira/browse/FLINK-35489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853606#comment-17853606 ] Maximilian Michels commented on FLINK-35489: Hey [~ashangit]! Good idea to switch the order between busgeting heap and metaspace, but metaspace should be allowed to grow and not be fixed. The reason is that metaspace is often configured too low. We can prevent metaspace related errors by detecting and increasing metaspace. > Metaspace size can be too little after autotuning change memory setting > --- > > Key: FLINK-35489 > URL: https://issues.apache.org/jira/browse/FLINK-35489 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: 1.8.0 >Reporter: Nicolas Fraison >Priority: Major > Labels: pull-request-available > > We have enable the autotuning feature on one of our flink job with below > config > {code:java} > # Autoscaler configuration > job.autoscaler.enabled: "true" > job.autoscaler.stabilization.interval: 1m > job.autoscaler.metrics.window: 10m > job.autoscaler.target.utilization: "0.8" > job.autoscaler.target.utilization.boundary: "0.1" > job.autoscaler.restart.time: 2m > job.autoscaler.catch-up.duration: 10m > job.autoscaler.memory.tuning.enabled: true > job.autoscaler.memory.tuning.overhead: 0.5 > job.autoscaler.memory.tuning.maximize-managed-memory: true{code} > During a scale down the autotuning decided to give all the memory to to JVM > (having heap being scale by 2) settting taskmanager.memory.managed.size to 0b. > Here is the config that was compute by the autotuning for a TM running on a > 4GB pod: > {code:java} > taskmanager.memory.network.max: 4063232b > taskmanager.memory.network.min: 4063232b > taskmanager.memory.jvm-overhead.max: 433791712b > taskmanager.memory.task.heap.size: 3699934605b > taskmanager.memory.framework.off-heap.size: 134217728b > taskmanager.memory.jvm-metaspace.size: 22960020b > taskmanager.memory.framework.heap.size: "0 bytes" > taskmanager.memory.flink.size: 3838215565b > taskmanager.memory.managed.size: 0b {code} > This has lead to some issue starting the TM because we are relying on some > javaagent performing some memory allocation outside of the JVM (rely on some > C bindings). > Tuning the overhead or disabling the scale-down-compensation.enabled could > have helped for that particular event but this can leads to other issue as it > could leads to too little HEAP size being computed. > It would be interesting to be able to set a min memory.managed.size to be > taken in account by the autotuning. > What do you think about this? Do you think that some other specific config > should have been applied to avoid this issue? > > Edit see this comment that leads to the metaspace issue: > https://issues.apache.org/jira/browse/FLINK-35489?focusedCommentId=17850694&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17850694 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33799) Add e2e's for tls enabled operator
[ https://issues.apache.org/jira/browse/FLINK-33799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-33799: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Add e2e's for tls enabled operator > -- > > Key: FLINK-33799 > URL: https://issues.apache.org/jira/browse/FLINK-33799 > Project: Flink > Issue Type: Technical Debt > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Tony Garrard >Priority: Minor > Fix For: kubernetes-operator-1.9.0 > > > It would be good to create some E2E tests to ensure a tls enabled flink > operator works, so that we don't break anything in the future -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34151) Integrate Karpenter resource limits into cluster capacity check
[ https://issues.apache.org/jira/browse/FLINK-34151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34151: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Integrate Karpenter resource limits into cluster capacity check > --- > > Key: FLINK-34151 > URL: https://issues.apache.org/jira/browse/FLINK-34151 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.9.0 > > > FLINK-33771 added cluster capacity checking for Flink Autoscaling decisions. > The checks respect the scaling limits of the Kubernetes Cluster Autoscaler. > We should also support Karpenter-based resource checks, as Karpenter is the > preferred method of expanding the cluster size in some environments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32120) Add autoscaler config option to disable parallelism key group alignment
[ https://issues.apache.org/jira/browse/FLINK-32120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-32120: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Add autoscaler config option to disable parallelism key group alignment > --- > > Key: FLINK-32120 > URL: https://issues.apache.org/jira/browse/FLINK-32120 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.9.0 > > > After choosing the target parallelism for a vertex, we choose a higher > parallelism if that parallelism leads to evenly spreading the number of key > groups. The number of key groups is derived from the max parallelism. > The amount of actual skew we would introduce if we did not do the alignment > would usually be pretty low. In fact, the data itself can have an uneven load > distribution across the keys (hot keys). In this case, the key group > alignment is not effective. > For experiments, we should allow disabling the key group alignment via a > configuration option. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-32119) Revise source partition skew logic
[ https://issues.apache.org/jira/browse/FLINK-32119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-32119: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Revise source partition skew logic > --- > > Key: FLINK-32119 > URL: https://issues.apache.org/jira/browse/FLINK-32119 > Project: Flink > Issue Type: Bug > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.9.0 > > > After choosing the target parallelism for a vertex, we choose a higher > parallelism if that parallelism leads to evenly spreading the number of key > groups (=max parallelism). > Sources don't have keyed state, so this adjustment does not make sense for > key groups. However, we internally limit the max parallelism of sources to > the number of partitions discovered. This prevents partition skew. > The partition skew logic currently doesn’t work correctly when there are > multiple topics because we use the total number of partitions discovered. > Using a single max parallelism doesn’t yield skew free partition distribution > then. However, this is also true for a single topic when the number of > partitions is a prime number or a not easily divisible number. > Hence, we should add an option to guarantee skew free partition distribution > which means using the total number of partitions when another configuration > is not possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34540) Tune number of task slots
[ https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34540: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Tune number of task slots > - > > Key: FLINK-34540 > URL: https://issues.apache.org/jira/browse/FLINK-34540 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn
[ https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-31502: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Limit the number of concurrent scale operations to reduce cluster churn > --- > > Key: FLINK-31502 > URL: https://issues.apache.org/jira/browse/FLINK-31502 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.9.0 > > > Until we move to using the upcoming Rescale API which recycles pods, we need > to be mindful with how many deployments we scale at the same time because > each of them is going to give up all its pods and require the new number of > required pods. > This can cause churn in the cluster and temporary lead to "unallocatable" > pods which triggers the k8s cluster autoscaler to add more cluster nodes. > That is often not desirable because the actual required resources after the > scaling have been settled, are lower. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33789) Expose restart time as a metric
[ https://issues.apache.org/jira/browse/FLINK-33789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-33789: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Expose restart time as a metric > --- > > Key: FLINK-33789 > URL: https://issues.apache.org/jira/browse/FLINK-33789 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Alexander Fedulov >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.9.0 > > > FLINK-30593 added restart time tracking. It would be convenient to also > report is as a metric. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33773) Add fairness to scaling decisions
[ https://issues.apache.org/jira/browse/FLINK-33773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-33773: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Add fairness to scaling decisions > - > > Key: FLINK-33773 > URL: https://issues.apache.org/jira/browse/FLINK-33773 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Deployment / Kubernetes >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.9.0 > > > The current scaling logic is inherently unfair. In a scenario of heavy > backlog, whichever pipelines come first, they will end up taking most of the > resources. Some kind of fairness should be introduced, for example: > * Cap the max number of resulting pods at a % of the cluster resources > * Allow scale up round-robin across all pipelines -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33673) SizeLimits not being set on emptyDir
[ https://issues.apache.org/jira/browse/FLINK-33673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-33673: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > SizeLimits not being set on emptyDir > > > Key: FLINK-33673 > URL: https://issues.apache.org/jira/browse/FLINK-33673 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Tony Garrard >Priority: Minor > Fix For: kubernetes-operator-1.9.0 > > > The operator should set a sizeLimit on any emptyDir's it creates. See > [https://main.kyverno.io/policies/other/a/add-emptydir-sizelimit/add-emptydir-sizelimit/. > > |https://main.kyverno.io/policies/other/a/add-emptydir-sizelimit/add-emptydir-sizelimit/] > This issue is to set a sizeLimit. The default one in question is for > artifacts. My initial guess at a setting would be around 512Mb -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34539) Tune JobManager memory
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > Tune JobManager memory > -- > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.9.0 > > > Adjustments similar to FLINK-34152, but simpler because we only need to > adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34389) JdbcAutoscalerStateStore explicitly writes update_time
[ https://issues.apache.org/jira/browse/FLINK-34389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34389: --- Fix Version/s: kubernetes-operator-1.9.0 (was: kubernetes-operator-1.8.0) > JdbcAutoscalerStateStore explicitly writes update_time > -- > > Key: FLINK-34389 > URL: https://issues.apache.org/jira/browse/FLINK-34389 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Affects Versions: kubernetes-operator-1.8.0 >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.9.0 > > > JdbcAutoscalerStateStore explicitly writes update_time instead of relying on > the database to update. > Some databases doesn't support update the timestamp column automatically. For > example, Derby doesn't support update the update_time automatically when we > update any data. It's hard to do a general test during I developing the test > for JdbcAutoscalerEventHandler. > > As the common&open source service, in order to support all databases well, > it's better to handle it inside of the service. > > In order to unify the design for JdbcAutoscalerEventHandler and > JdbcAutoscalerStateStore, we update the design of JdbcAutoscalerStateStore in > this JIRA. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34152) Tune TaskManager memory
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-34152. Release Note: TaskManager memory (heap, network, metaspace, managed) is optimized together with autoscaling decisions. Resolution: Fixed > Tune TaskManager memory > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34619) Do not wait for scaling completion in UPGRADE state with in-place scaling
[ https://issues.apache.org/jira/browse/FLINK-34619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-34619. Release Note: Merged via 8938658ed245545e6436ff22cbb8b2fabd4047f1 Resolution: Fixed > Do not wait for scaling completion in UPGRADE state with in-place scaling > - > > Key: FLINK-34619 > URL: https://issues.apache.org/jira/browse/FLINK-34619 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Gyula Fora >Assignee: Gyula Fora >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The operator currently puts the resource into upgrading state after > triggering in-place scaling and keeps observing until the desired parallelism > is reached before moving to deployed / stable. > However this means that due to how the adaptive scheduler works this > parallelism may never be reached and this is expected. > We should simplify the logic to consider scaling "done" once the resource > requirements have been set correctly and then leave the rest to the adaptive > scheduler -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-34104) Improve the ScalingReport format of autoscaling
[ https://issues.apache.org/jira/browse/FLINK-34104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-34104. Resolution: Fixed > Improve the ScalingReport format of autoscaling > --- > > Key: FLINK-34104 > URL: https://issues.apache.org/jira/browse/FLINK-34104 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Currently, the scaling report format is > {color:#6a8759} Vertex ID %s | Parallelism %d -> %d | Processing capacity > %.2f -> %.2f | Target data rate %.2f{color} > {color:#172b4d}It has 2 disadvantages:{color} > # {color:#172b4d}When one job has multiple vertices, the report of all > vertices are mixed together without any separator{color}{color:#172b4d}, here > is an example:{color} > ** {color:#172b4d}Scaling execution enabled, begin scaling vertices: Vertex > ID ea632d67b7d595e5b851708ae9ad79d6 | Parallelism 2 -> 1 | Processing > capacity 800466.67 -> 320186.67 | Target data rate 715.10 Vertex ID > bc764cd8ddf7a0cff126f51c16239658 | Parallelism 2 -> 1 | Processing capacity > 79252.08 -> 31700.83 | Target data rate 895.93 Vertex ID > 0a448493b4782967b150582570326227 | Parallelism 8 -> 16 | Processing capacity > 716.05 -> 1141.00 | Target data rate 715.54{color} > ** {color:#172b4d}We can see the Vertex ID is the beginning of each vertex > report, it doesn't have any {color}{color:#172b4d}separator with the last > vertex.{color} > # {color:#172b4d}This format is non-standard{color}{color:#172b4d}, it's > hard to deserialize.{color} > ** {color:#172b4d}When job enables the autoscaler and disable the > scaling.{color} > ** {color:#172b4d}Flink platform maintainer wants to show the scaling report > in WebUI, it's helpful to using the report result for flink users.{color} > ** {color:#172b4d}So easy to deserialize is useful for these flink > platform.{color} > h2. {color:#172b4d}Solution:{color} > * {color:#172b4d}Adding the {{{}} and {{}}} as the separator between > multiple vertices. {color} > * {color:#172b4d}Adding the {{AutoscalerEventUtils}} to easy deserialize the > {{ScalingReport}} message.{color} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34655) Autoscaler doesn't work for flink 1.15
[ https://issues.apache.org/jira/browse/FLINK-34655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826015#comment-17826015 ] Maximilian Michels commented on FLINK-34655: Thanks for raising awareness for the Flink version compatibility, [~fanrui]! Although we've been using Flink Autoscaling with 1.16, it is true that only Flink 1.17 supports it out of the box. {quote}In the short term, we only use the autoscaler to give suggestion instead of scaling directly. After our users think the parallelism calculation is reliable, they will have stronger motivation to upgrade the flink version. {quote} I understand the idea behind providing suggestions. However, it is difficult to assess the quality of Autoscaling decisions without applying them automatically. The reason is that suggestions become stale very quickly if the load pattern is not completely static. Even for static load patterns, if the user doesn't redeploy in a matter of minutes, the suggestions might already be stale again when the number of pending records increased too much. In any case, production load patterns are rarely static which means that autoscaling will inevitable trigger multiple times a day, but that is where its real power is unleashed. It would be great to hear about any concerns your users have for turning on automatic scaling. We've been operating it in production for about a year now. Back to the issue here, should we think about a patch release for 1.15 / 1.16 to add support for overriding vertex parallelism? > Autoscaler doesn't work for flink 1.15 > -- > > Key: FLINK-34655 > URL: https://issues.apache.org/jira/browse/FLINK-34655 > Project: Flink > Issue Type: Bug > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > flink-ubernetes-operator is committed to supporting the latest 4 flink minor > versions, and autoscaler is a part of flink-ubernetes-operator. Currently, > the latest 4 flink minor versions are 1.15, 1.16, 1.17 and 1.18. > But autoscaler doesn't work for flink 1.15. > h2. Root cause: > * FLINK-28310 added some properties in IOMetricsInfo in flink-1.16 > * IOMetricsInfo is a part of JobDetailsInfo > * JobDetailsInfo is necessary for autoscaler [1] > * flink's RestClient doesn't allow miss any property during deserializing the > json > That means that the RestClient after 1.15 cannot fetch JobDetailsInfo for > 1.15 jobs. > h2. How to fix it properly? > - [[FLINK-34655](https://issues.apache.org/jira/browse/FLINK-34655)] Copy > IOMetricsInfo to flink-autoscaler-standalone module > - Removing them after 1.15 are not supported > [1] > https://github.com/apache/flink-kubernetes-operator/blob/ede1a610b3375d31a2e82287eec67ace70c4c8df/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java#L109 > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-401%3A+REST+API+JSON+response+deserialization+unknown+field+tolerance -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34152) Tune TaskManager memory
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822790#comment-17822790 ] Maximilian Michels edited comment on FLINK-34152 at 3/2/24 9:11 AM: {quote}What I'm trying to say is that I didn't notice any updated configuration for the memory request and limit at the pod template level for the taskmanager. Therefore, I assume that the pod's memory allocation won't automatically adjust to reflect changes in the taskmanager's heap size, unless I've missed something. {quote} I would ask you to check again. When the tuning is applied, the pod resource requests/limits of the TaskManager pods will be adjusted. So changes will be directly reflected in terms of resource usage in Kubernetes. {quote}Indeed, by implementing bin-packing, we can optimize resource utilization, which is now clearer to me. However, its management becomes more complex (K8s upgrade, daily node restart/eviction) for sure, especially when there are other application components in the same Kubernetes cluster IMO {quote} You are right, there is more complexity to realize Flink Autoscaling benefits end to end. However, there is also a great amount of resource savings and convenience for the user that come out of it. We have seen 60% fewer nodes after enabling Flink Autoscaling while maintaing the same amount of service and drastically decreasing the maintaince for our users who would have to adjust parallelism constantly to run cost-efficient. They usually did not want to do that and thus all jobs ran very over-provisioned. {quote}Can you take a look on it https://issues.apache.org/jira/browse/FLINK-34563 and [https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if you think it's making sense, thanks :) {quote} Thank you, I'll review in the next days! was (Author: mxm): {quote}What I'm trying to say is that I didn't notice any updated configuration for the memory request and limit at the pod template level for the taskmanager. Therefore, I assume that the pod's memory allocation won't automatically adjust to reflect changes in the taskmanager's heap size, unless I've missed something. {quote} I would ask you to check again. When the tuning is applied, the pod resource requests/limits of the TaskManager pods will be adjusted. So changes will be directly reflected in terms of resource usage in Kubernetes. {quote}Indeed, by implementing bin-packing, we can optimize resource utilization, which is now clearer to me. However, its management becomes more complex (K8s upgrade, daily node restart/eviction) for sure, especially when there are other application components in the same Kubernetes cluster IMO {quote} You are right, there is more complexity to realize Flink Autoscaling benefits end to end. However, there is also a great amount of resource savings and convenience for the user that come out of it. We have seen 60% fewer nodes after enabling Flink Autoscaling while maintaing the same amount of service and drastically decreasing the maintaince for our users who would have to adjust parallelism constantly to run cost-efficient. They usually did not want to do that and thus all jobs ran very over-provisioned. {quote}Can you take a look on it https://issues.apache.org/jira/browse/FLINK-34563 and [https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if you think it's making sense, thanks :) {quote} Thank you, I'll review in the next days. Generally, I wonder. This idea has crossed my mind before. I wasn't really sure how exactly to adjust the parallelism to fill the TaskManagers. Adjusting only the vertex with the highest parallelism might be unfair to other vertices. I think spreading out the unused task slots to the vertices with ther lowest parallelism might be more beneficial for the stability. We have seen more instability with lower parallelisms because the metrics are less precise. > Tune TaskManager memory > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actual
[jira] [Commented] (FLINK-34152) Tune TaskManager memory
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822790#comment-17822790 ] Maximilian Michels commented on FLINK-34152: {quote}What I'm trying to say is that I didn't notice any updated configuration for the memory request and limit at the pod template level for the taskmanager. Therefore, I assume that the pod's memory allocation won't automatically adjust to reflect changes in the taskmanager's heap size, unless I've missed something. {quote} I would ask you to check again. When the tuning is applied, the pod resource requests/limits of the TaskManager pods will be adjusted. So changes will be directly reflected in terms of resource usage in Kubernetes. {quote}Indeed, by implementing bin-packing, we can optimize resource utilization, which is now clearer to me. However, its management becomes more complex (K8s upgrade, daily node restart/eviction) for sure, especially when there are other application components in the same Kubernetes cluster IMO {quote} You are right, there is more complexity to realize Flink Autoscaling benefits end to end. However, there is also a great amount of resource savings and convenience for the user that come out of it. We have seen 60% fewer nodes after enabling Flink Autoscaling while maintaing the same amount of service and drastically decreasing the maintaince for our users who would have to adjust parallelism constantly to run cost-efficient. They usually did not want to do that and thus all jobs ran very over-provisioned. {quote}Can you take a look on it https://issues.apache.org/jira/browse/FLINK-34563 and [https://github.com/apache/flink-kubernetes-operator/pull/787] ? And tell me if you think it's making sense, thanks :) {quote} Thank you, I'll review in the next days. Generally, I wonder. This idea has crossed my mind before. I wasn't really sure how exactly to adjust the parallelism to fill the TaskManagers. Adjusting only the vertex with the highest parallelism might be unfair to other vertices. I think spreading out the unused task slots to the vertices with ther lowest parallelism might be more beneficial for the stability. We have seen more instability with lower parallelisms because the metrics are less precise. > Tune TaskManager memory > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34152) Tune TaskManager memory
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17822512#comment-17822512 ] Maximilian Michels commented on FLINK-34152: Hi [~yang]! Thanks for taking a look at the recent changes. There has been two more follow-up PRs since the initial PR you linked. I'm very curious to hear your feedback. {quote}We may need to dynamically adjust the Kubernetes CPU and memory limits for both the job manager and task manager eventually, to align with the automatically tuned memory and CPU parameters and prevent unnecessary resource allocation. {quote} Tuning JobManager memory is still pending, but I agree that tuning only TaskManagers is not enough. As for tuning CPU, I think we eventually want to tune the number of task slots to fit them to the CPUs assigned. As for scaling CPU itself, that is already taken care of by the autoscaler which essentially scales based on the CPU usage of TaskManagers. {quote}In our specific use-case, our Flink cluster is deployed on a dedicated node group with predefined CPU and memory settings, unlike a typical Kubernetes cluster. Consequently, this auto-tuning feature might not aid in reducing infrastructure costs, as billing is based on the allocated nodes behind the scenes. {quote} Autoscaling assumes some sort of Kubernetes Cluster Autoscaling to be active. When fewer resources are allocated, that should result in fewer nodes, but in practice it isn't quite that easy. It requires a bit of extra work for nodes to get released when fewer resources are used. The default Kubernetes scheduler doesn't bin-pack, but it can be reconfigured to do bin-packing as opposed to its default behavior to evenly spread out pods. > Tune TaskManager memory > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory (was: Tune TaskManager memory of autoscaled jobs) > Tune TaskManager memory > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory of austoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory of austoscaled jobs (was: Tune TaskManager memory) > Tune TaskManager memory of austoscaled jobs > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34540) Tune number of task slots
[ https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-34540: -- Assignee: (was: Maximilian Michels) > Tune number of task slots > - > > Key: FLINK-34540 > URL: https://issues.apache.org/jira/browse/FLINK-34540 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory of autoscaled jobs (was: Tune TaskManager memory of austoscaled jobs) > Tune TaskManager memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34540) Tune number of task slots
[ https://issues.apache.org/jira/browse/FLINK-34540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34540: --- Description: (was: Adjustments similar to FLINK-34152, but simpler because we only need to adjust heap memory and metaspace for the JobManager.) > Tune number of task slots > - > > Key: FLINK-34540 > URL: https://issues.apache.org/jira/browse/FLINK-34540 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34539) Tune JobManager memory
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Summary: Tune JobManager memory (was: Tune JobManager memory of autoscaled jobs) > Tune JobManager memory > -- > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > > Adjustments similar to FLINK-34152, but simpler because we only need to > adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory (was: Tune TaskManager memory of autoscaled jobs) > Tune TaskManager memory > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34540) Tune number of task slots
Maximilian Michels created FLINK-34540: -- Summary: Tune number of task slots Key: FLINK-34540 URL: https://issues.apache.org/jira/browse/FLINK-34540 Project: Flink Issue Type: Sub-task Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 Adjustments similar to FLINK-34152, but simpler because we only need to adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Description: Umbrella issue to tackle tuning the Flink configuration as part of Flink Autoscaling. (was: Umbrella issue to tackle) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > > Umbrella issue to tackle tuning the Flink configuration as part of Flink > Autoscaling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Labels: (was: pull-request-available) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > > Umbrella issue to tackle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Description: Umbrella issue to tackle (was: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit]) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Umbrella issue to tackle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Fix Version/s: (was: kubernetes-operator-1.8.0) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > > Umbrella issue to tackle -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune Flink config of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Summary: Tune Flink config of autoscaled jobs (was: Tune memory of autoscaled jobs) > Tune Flink config of autoscaled jobs > > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Labels: (was: pull-request-available) > Tune JobManager memory of autoscaled jobs > - > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > > Adjustments similar to FLINK-34152, but simpler because we only need to > adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Description: Adjustments similar to FLINK-34152, but simpler because we only need to adjust heap memory and metaspace for the JobManager. (was: Similarly to) > Tune JobManager memory of autoscaled jobs > - > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Adjustments similar to FLINK-34152, but simpler because we only need to > adjust heap memory and metaspace for the JobManager. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Description: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit] > Tune memory of autoscaled jobs > -- > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > [https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34539) Tune JobManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34539: --- Description: Similarly to (was: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit ) > Tune JobManager memory of autoscaled jobs > - > > Key: FLINK-34539 > URL: https://issues.apache.org/jira/browse/FLINK-34539 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Similarly to -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34539) Tune JobManager memory of autoscaled jobs
Maximilian Michels created FLINK-34539: -- Summary: Tune JobManager memory of autoscaled jobs Key: FLINK-34539 URL: https://issues.apache.org/jira/browse/FLINK-34539 Project: Flink Issue Type: Sub-task Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Parent: FLINK-34538 Issue Type: Sub-task (was: New Feature) > Tune TaskManager memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: Sub-task > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34538) Tune memory of autoscaled jobs
Maximilian Michels created FLINK-34538: -- Summary: Tune memory of autoscaled jobs Key: FLINK-34538 URL: https://issues.apache.org/jira/browse/FLINK-34538 Project: Flink Issue Type: New Feature Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34538) Tune memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34538: --- Description: (was: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit ) > Tune memory of autoscaled jobs > -- > > Key: FLINK-34538 > URL: https://issues.apache.org/jira/browse/FLINK-34538 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory of autoscaled jobs (was: Tune memory of autoscaled jobs) > Tune TaskManager memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune TaskManager memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune TaskManager memory of autoscaled jobs (was: Tune memory of autoscaled jobs) > Tune TaskManager memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune memory of autoscaled jobs (was: Tune TaskManager memory of autoscaled jobs) > Tune memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Description: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit was: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. > Tune memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > https://docs.google.com/document/d/19GXHGL_FvN6WBgFvLeXpDABog2H_qqkw1_wrpamkFSc/edit > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Description: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. was: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. A tuning algorithm could look like this: h2. 1. Establish a memory baseline We observe the average heap memory usage at task managers. h2. 2. Calculate memory usage per record The memory requirements per record can be estimated by calculating this ratio: {noformat} memory_per_rec = sum(heap_usage) / sum(records_processed) {noformat} This ratio is surprisingly constant based off empirical data. h2. 3. Scale memory proportionally to the per-record memory needs {noformat} memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers {noformat} A minimum memory limit needs to be added to avoid scaling down memory too much. The max memory per TM should be equal to the initially defined user-specified limit from the ResourceSpec. {noformat} memory_per_tm = max(min_limit, memory_per_tm) memory_per_tm = min(max_limit, memory_per_tm) {noformat} > Tune memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning
[ https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819591#comment-17819591 ] Maximilian Michels commented on FLINK-34471: Yes, that is exactly what I meant: Moving from FORWARD to RESCALE. The workaround you described makes sense. > Tune network memory as part of Autoscaler Memory Tuning > --- > > Key: FLINK-34471 > URL: https://issues.apache.org/jira/browse/FLINK-34471 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Maximilian Michels >Priority: Major > > Design doc: > https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning
[ https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819578#comment-17819578 ] Maximilian Michels edited comment on FLINK-34471 at 2/22/24 10:18 AM: -- Thanks! That would be very helpful – I don’t mind at all. I realized point to point connections might be a bit tricky when the parallelism between tasks changes from being equal to being different because at runtime we then switch to a different partitioner. So worst case we could run out of network buffers in this scenario. I’m curious how you want to solve this. We probably need to add extra buffers to account for this edge case. was (Author: mxm): Thanks! That would be very helpful – I don’t mind at all. I realized point to point connections might be a bit tricky when the parallelism between tasks changes from being equal to changing because at runtime we then switch to a different partitioner. So worst case we could run out of network buffers in this scenario. I’m curious how you want to solve this. We probably need to add extra buffers to account for this edge case. > Tune network memory as part of Autoscaler Memory Tuning > --- > > Key: FLINK-34471 > URL: https://issues.apache.org/jira/browse/FLINK-34471 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Maximilian Michels >Priority: Major > > Design doc: > https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning
[ https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819578#comment-17819578 ] Maximilian Michels commented on FLINK-34471: Thanks! That would be very helpful – I don’t mind at all. I realized point to point connections might be a bit tricky when the parallelism between tasks changes from being equal to changing because at runtime we then switch to a different partitioner. So worst case we could run out of network buffers in this scenario. I’m curious how you want to solve this. We probably need to add extra buffers to account for this edge case. > Tune network memory as part of Autoscaler Memory Tuning > --- > > Key: FLINK-34471 > URL: https://issues.apache.org/jira/browse/FLINK-34471 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Maximilian Michels >Priority: Major > > Design doc: > https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning
[ https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819306#comment-17819306 ] Maximilian Michels commented on FLINK-34471: I think in addition to the fine-grained approach described in the doc, we can do a first implementation which simply uses [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/#network-buffer-lifecycle] and assumes an ALL_TO_ALL relationship. This may not optimize down to the last byte but still gives great savings over the default. > Tune network memory as part of Autoscaler Memory Tuning > --- > > Key: FLINK-34471 > URL: https://issues.apache.org/jira/browse/FLINK-34471 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Maximilian Michels >Priority: Major > > Design doc: > https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34471) Tune network memory as part of Autoscaler Memory Tuning
[ https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34471: --- Summary: Tune network memory as part of Autoscaler Memory Tuning (was: Tune the network memory in Autoscaler) > Tune network memory as part of Autoscaler Memory Tuning > --- > > Key: FLINK-34471 > URL: https://issues.apache.org/jira/browse/FLINK-34471 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Maximilian Michels >Priority: Major > > Design doc: > https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34471) Tune the network memory in Autoscaler
[ https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34471: --- Summary: Tune the network memory in Autoscaler (was: Tune the network memroy in Autoscaler) > Tune the network memory in Autoscaler > - > > Key: FLINK-34471 > URL: https://issues.apache.org/jira/browse/FLINK-34471 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Maximilian Michels >Priority: Major > > Design doc: > https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34471) Tune the network memroy in Autoscaler
[ https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819233#comment-17819233 ] Maximilian Michels commented on FLINK-34471: Thanks Rui! > Tune the network memroy in Autoscaler > - > > Key: FLINK-34471 > URL: https://issues.apache.org/jira/browse/FLINK-34471 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Maximilian Michels >Priority: Major > > Design doc: > https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-34471) Tune the network memroy in Autoscaler
[ https://issues.apache.org/jira/browse/FLINK-34471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-34471: -- Assignee: Maximilian Michels > Tune the network memroy in Autoscaler > - > > Key: FLINK-34471 > URL: https://issues.apache.org/jira/browse/FLINK-34471 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Maximilian Michels >Priority: Major > > Design doc: > https://docs.google.com/document/d/19HYamwMaYYYOeH3NRbk6l9P-bBLBfgzMYjfGEPWEbeo/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33092) Improve the resource-stabilization-timeout mechanism when rescale a job for Adaptive Scheduler
[ https://issues.apache.org/jira/browse/FLINK-33092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812778#comment-17812778 ] Maximilian Michels commented on FLINK-33092: +1 waiting on resources in the Executing state. I think we need to just change the ScalingControler to delay triggering the actual rescale process: [https://github.com/apache/flink/blob/cb9e220c2291088459f0281aa8e8e8584436a9b2/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/scalingpolicy/RescalingController.java#L37] Right now, it triggers immediately on parallelism change. [~dmvk] can probably answer this. > Improve the resource-stabilization-timeout mechanism when rescale a job for > Adaptive Scheduler > -- > > Key: FLINK-33092 > URL: https://issues.apache.org/jira/browse/FLINK-33092 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > Attachments: image-2023-09-15-14-43-35-104.png > > > !image-2023-09-15-14-43-35-104.png|width=916,height=647! > h1. 1. Propose > The above is the state transition graph when rescale a job in Adaptive > Scheduler. > In brief, when we trigger a rescale, the job will wait > _*resource-stabilization-timeout*_ in WaitingForResources State when it has > sufficient resources and it doesn't have the desired resource. > If the _*resource-stabilization-timeout mechanism*_ is moved into the > Executing State, the rescale downtime will be significantly reduced. > h1. 2. Why the downtime is long? > Currently, when rescale a job: > * The Executing will transition to Restarting > * The Restarting will cancel this job first. > * The Restarting will transition to WaitingForResources after the whole job > is terminal. > * When this job has sufficient resources and it doesn't have the desired > resource, the WaitingForResources needs to wait > _*resource-stabilization-timeout*_ . > * WaitingForResources will transition to CreatingExecutionGraph after > resource-stabilization-timeout. > The problem is the job isn't running during the > resource-stabilization-timeout phase. > h1. 3. How to reduce the downtime? > We can move the _*resource-stabilization-timeout mechanism*_ into the > Executing State when trigger a rescale. It means: > * When this job has desired resources, the Executing can rescale directly. > * When this job has sufficient resources and it doesn't have the desired > resource, we can rescale after _*resource-stabilization-timeout.*_ > * The WaitingForResources will ignore the resource-stabilization-timeout > after this improvement. > The resource-stabilization-timeout works before cancel job, so the rescale > downtime will be significantly reduced. > > Note: the resource-stabilization-timeout still works in WaitingForResources > when start a job. It's just changed when rescale a job. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34266) Output ratios should be computed over the whole metric window instead of averaged
[ https://issues.apache.org/jira/browse/FLINK-34266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811988#comment-17811988 ] Maximilian Michels commented on FLINK-34266: Yes, using the total sum and substracting the start from the end observation is the way to go for maximum precision. > Output ratios should be computed over the whole metric window instead of > averaged > - > > Key: FLINK-34266 > URL: https://issues.apache.org/jira/browse/FLINK-34266 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Gyula Fora >Priority: Critical > > Currently Output ratios are computed during metric collection based on the > current in/out metrics an stored as part of the collected metrics. > During evaluation the output ratios previously computed are then averaged > together in the metric window. This however leads to incorrect computation > due to the nature of the computation and averaging. > Example: > Let's look at a window operator that simply sorts and re-emits events in > windows. During the window collection phase, output ratio will be computed > and stored as 0. During the window computation the output ratio will be > last_input_rate / window_size. Depending on the last input rate observation > this can be off when averaged into any direction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34266) Output ratios should be computed over the whole metric window instead of averaged
[ https://issues.apache.org/jira/browse/FLINK-34266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811921#comment-17811921 ] Maximilian Michels commented on FLINK-34266: The way I understand the code is that for every observation, we will store the total output rate of every vertex. During metric window evaluation, we will average all of those. That is in line with how all the code works. I agree 100% that all metrics should be observed over the entire metric window. So rates should be computed by measuring the number of records produced at the start and at the end up the window, then subtracting them from each other. This request seems analogue to FLINK-34213 but for rates instead of busy time. Is that fair to say? > Output ratios should be computed over the whole metric window instead of > averaged > - > > Key: FLINK-34266 > URL: https://issues.apache.org/jira/browse/FLINK-34266 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Gyula Fora >Priority: Critical > > Currently Output ratios are computed during metric collection based on the > current in/out metrics an stored as part of the collected metrics. > During evaluation the output ratios previously computed are then averaged > together in the metric window. This however leads to incorrect computation > due to the nature of the computation and averaging. > Example: > Let's look at a window operator that simply sorts and re-emits events in > windows. During the window collection phase, output ratio will be computed > and stored as 0. During the window computation the output ratio will be > last_input_rate / window_size. Depending on the last input rate observation > this can be off when averaged into any direction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond
[ https://issues.apache.org/jira/browse/FLINK-34213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810006#comment-17810006 ] Maximilian Michels commented on FLINK-34213: If we had to query metrics per vertex, that would be too expensive, but it seems like that is not necessary. Here is an exemplary REST API response to the {{/jobs/}} endpoint: {noformat} { "jid": "b4f918c2a0312de9fe7369a7db093e96", "name": "-", "isStoppable": false, "state": "RUNNING", "start-time": 1705094021727, "end-time": -1, "duration": 928985186, "maxParallelism": 1, "now": 1706023006913, "timestamps": { "SUSPENDED": 0, "RUNNING": 1705094036134, "FAILING": 0, "CANCELED": 0, "CANCELLING": 0, "CREATED": 1705094035034, "INITIALIZING": 1705094021727, "FAILED": 0, "RESTARTING": 0, "RECONCILING": 0, "FINISHED": 0 }, "vertices": [ { "id": "db1f263dc155338dc2a9622a2e06d115", "name": "", "maxParallelism": 1, "parallelism": 18, "status": "RUNNING", "start-time": 1705094037437, "end-time": -1, "duration": 928969476, "tasks": { "CANCELED": 0, "DEPLOYING": 0, "CANCELING": 0, "RECONCILING": 0, "FINISHED": 0, "SCHEDULED": 0, "CREATED": 0, "INITIALIZING": 0, "FAILED": 0, "RUNNING": 18 }, "metrics": { "read-bytes": 0, "read-bytes-complete": true, "write-bytes": 2907138853415272, "write-bytes-complete": true, "read-records": 0, "read-records-complete": true, "write-records": 229589536334, "write-records-complete": true, "accumulated-backpressured-time": 1533744940, "accumulated-idle-time": 10026044858, "accumulated-busy-time": 5161601268 } }, ... ] } {noformat} Note the accumulated backpressure/idle time. > Consider using accumulated busy time instead of busyMsPerSecond > --- > > Key: FLINK-34213 > URL: https://issues.apache.org/jira/browse/FLINK-34213 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Priority: Minor > > We might achieve much better accuracy if we used the accumulated busy time > metrics from Flink, instead of the momentarily collected ones. > We would use the diff between the last accumulated and the current > accumulated busy time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34213) Consider using accumulated busy time instead of busyMsPerSecond
Maximilian Michels created FLINK-34213: -- Summary: Consider using accumulated busy time instead of busyMsPerSecond Key: FLINK-34213 URL: https://issues.apache.org/jira/browse/FLINK-34213 Project: Flink Issue Type: Improvement Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels We might achieve much better accuracy if we used the accumulated busy time metrics from Flink, instead of the momentarily collected ones. We would use the diff between the last accumulated and the current accumulated busy time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Description: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory than we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. A tuning algorithm could look like this: h2. 1. Establish a memory baseline We observe the average heap memory usage at task managers. h2. 2. Calculate memory usage per record The memory requirements per record can be estimated by calculating this ratio: {noformat} memory_per_rec = sum(heap_usage) / sum(records_processed) {noformat} This ratio is surprisingly constant based off empirical data. h2. 3. Scale memory proportionally to the per-record memory needs {noformat} memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers {noformat} A minimum memory limit needs to be added to avoid scaling down memory too much. The max memory per TM should be equal to the initially defined user-specified limit from the ResourceSpec. {noformat} memory_per_tm = max(min_limit, memory_per_tm) memory_per_tm = min(max_limit, memory_per_tm) {noformat} was: The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory then we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. A tuning algorithm could look like this: h2. 1. Establish a memory baseline We observe the average heap memory usage at task managers. h2. 2. Calculate memory usage per record The memory requirements per record can be estimated by calculating this ratio: {noformat} memory_per_rec = sum(heap_usage) / sum(records_processed) {noformat} This ratio is surprisingly constant based off empirical data. h2. 3. Scale memory proportionally to the per-record memory needs {noformat} memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers {noformat} A minimum memory limit needs to be added to avoid scaling down memory too much. The max memory per TM should be equal to the initially defined user-specified limit from the ResourceSpec. {noformat} memory_per_tm = max(min_limit, memory_per_tm) memory_per_tm = min(max_limit, memory_per_tm) {noformat} > Tune memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory than we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfM
[jira] [Created] (FLINK-34152) Tune memory of autoscaled jobs
Maximilian Michels created FLINK-34152: -- Summary: Tune memory of autoscaled jobs Key: FLINK-34152 URL: https://issues.apache.org/jira/browse/FLINK-34152 Project: Flink Issue Type: New Feature Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory then we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. A tuning algorithm could look like this: h2. 1. Establish a memory baseline We observe the average heap memory usage at task managers. h2. 2. Calculate memory usage per record The memory requirements per record can be estimated by calculating this ratio: {noformat} memory_per_rec = sum(heap_usage) / sum(records_processed) {noformat} This ratio is surprisingly constant based off empirical data. h2. 3. Scale memory proportionally to the per-record memory needs {noformat} memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers {noformat} A minimum memory limit needs to be added to avoid scaling down memory too much. The max memory per TM should be equal to the initially defined user-specified limit from the ResourceSpec. {noformat} memory_per_tm = max(min_limit, memory_per_tm) memory_per_tm = min(max_limit, memory_per_tm) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune heap memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune heap memory of autoscaled jobs (was: Tune memory of autoscaled jobs) > Tune heap memory of autoscaled jobs > --- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory then we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > A tuning algorithm could look like this: > h2. 1. Establish a memory baseline > We observe the average heap memory usage at task managers. > h2. 2. Calculate memory usage per record > The memory requirements per record can be estimated by calculating this ratio: > {noformat} > memory_per_rec = sum(heap_usage) / sum(records_processed) > {noformat} > This ratio is surprisingly constant based off empirical data. > h2. 3. Scale memory proportionally to the per-record memory needs > {noformat} > memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers > {noformat} > A minimum memory limit needs to be added to avoid scaling down memory too > much. The max memory per TM should be equal to the initially defined > user-specified limit from the ResourceSpec. > {noformat} > memory_per_tm = max(min_limit, memory_per_tm) > memory_per_tm = min(max_limit, memory_per_tm) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34152) Tune memory of autoscaled jobs
[ https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-34152: --- Summary: Tune memory of autoscaled jobs (was: Tune heap memory of autoscaled jobs) > Tune memory of autoscaled jobs > -- > > Key: FLINK-34152 > URL: https://issues.apache.org/jira/browse/FLINK-34152 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > > The current autoscaling algorithm adjusts the parallelism of the job task > vertices according to the processing needs. By adjusting the parallelism, we > systematically scale the amount of CPU for a task. At the same time, we also > indirectly change the amount of memory tasks have at their dispense. However, > there are some problems with this. > # Memory is overprovisioned: On scale up we may add more memory then we > actually need. Even on scale down, the memory / cpu ratio can still be off > and too much memory is used. > # Memory is underprovisioned: For stateful jobs, we risk running into > OutOfMemoryErrors on scale down. Even before running out of memory, too > little memory can have a negative impact on the effectiveness of the scaling. > We lack the capability to tune memory proportionally to the processing needs. > In the same way that we measure CPU usage and size the tasks accordingly, we > need to evaluate memory usage and adjust the heap memory size. > A tuning algorithm could look like this: > h2. 1. Establish a memory baseline > We observe the average heap memory usage at task managers. > h2. 2. Calculate memory usage per record > The memory requirements per record can be estimated by calculating this ratio: > {noformat} > memory_per_rec = sum(heap_usage) / sum(records_processed) > {noformat} > This ratio is surprisingly constant based off empirical data. > h2. 3. Scale memory proportionally to the per-record memory needs > {noformat} > memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers > {noformat} > A minimum memory limit needs to be added to avoid scaling down memory too > much. The max memory per TM should be equal to the initially defined > user-specified limit from the ResourceSpec. > {noformat} > memory_per_tm = max(min_limit, memory_per_tm) > memory_per_tm = min(max_limit, memory_per_tm) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34151) Integrate Karpenter resource limits into cluster capacity check
Maximilian Michels created FLINK-34151: -- Summary: Integrate Karpenter resource limits into cluster capacity check Key: FLINK-34151 URL: https://issues.apache.org/jira/browse/FLINK-34151 Project: Flink Issue Type: Improvement Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 FLINK-33771 added cluster capacity checking for Flink Autoscaling decisions. The checks respect the scaling limits of the Kubernetes Cluster Autoscaler. We should also support Karpenter-based resource checks, as Karpenter is the preferred method of expanding the cluster size in some environments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33771) Add cluster capacity awareness to Autoscaler
[ https://issues.apache.org/jira/browse/FLINK-33771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-33771. Resolution: Fixed > Add cluster capacity awareness to Autoscaler > > > Key: FLINK-33771 > URL: https://issues.apache.org/jira/browse/FLINK-33771 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > To avoid starvation of pipelines when the Kubernetes cluster runs out of > resources, new scaling attempts should be stopped. > The Rescaling API will probably prevent most of this cases but we will also > have to double-check there. > For the config-based parallelism overrides, we have pretty good heuristics in > the operator to check in Kubernetes for the approximate number of free > cluster resources, the max cluster scaleup for the Cluster Autoscaler, and > the required scaling costs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-31977) If scaling.effectiveness.detection.enabled is false, the call to the detectIneffectiveScaleUp() function is unnecessary
[ https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805650#comment-17805650 ] Maximilian Michels commented on FLINK-31977: I think this is related to FLINK-33993. The name of the configuration option is a bit misleading, as effectiveness detection is always on but scalings are only blocked when the option is set to {{true}}. > If scaling.effectiveness.detection.enabled is false, the call to the > detectIneffectiveScaleUp() function is unnecessary > --- > > Key: FLINK-31977 > URL: https://issues.apache.org/jira/browse/FLINK-31977 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Affects Versions: 1.17.0 >Reporter: Tan Kim >Priority: Minor > > The code below is a function to detect inefficient scaleups. > It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED > (scaling.effectiveness.detection.enabled) is true after all the necessary > computations for detection, but this is an unnecessary computation. > {code:java} > JobVertexScaler.java #175 > private boolean detectIneffectiveScaleUp( > AbstractFlinkResource resource, > JobVertexID vertex, > Configuration conf, > Map evaluatedMetrics, > ScalingSummary lastSummary) { > double lastProcRate = > lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // > 22569.315633422066 > double lastExpectedProcRate = > > lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // > 37340.0 > var currentProcRate = > evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage(); > // To judge the effectiveness of the scale up operation we compute how > much of the expected > // increase actually happened. For example if we expect a 100 increase in > proc rate and only > // got an increase of 10 we only accomplished 10% of the desired > increase. If this number is > // below the threshold, we mark the scaling ineffective. > double expectedIncrease = lastExpectedProcRate - lastProcRate; > double actualIncrease = currentProcRate - lastProcRate; > boolean withinEffectiveThreshold = > (actualIncrease / expectedIncrease) > >= > conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD); > if (withinEffectiveThreshold) { > return false; > } > var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex); > eventRecorder.triggerEvent( > resource, > EventRecorder.Type.Normal, > EventRecorder.Reason.IneffectiveScaling, > EventRecorder.Component.Operator, > message); > if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { > LOG.info(message); > return true; > } else { > return false; > } > } {code} > In the call to the detectIneffectiveScaleUp function, I would suggest > checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows. > {code:java} > JobVertexScaler.java #150 > if (currentParallelism == lastSummary.getNewParallelism() && > lastSummary.isScaledUp()) { > if (scaledUp) { > > if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { > return detectIneffectiveScaleUp(resource, vertex, conf, > evaluatedMetrics, lastSummary); > } else { > return true; > } > } else { > return detectImmediateScaleDownAfterScaleUp(vertex, conf, > lastScalingTs); > } > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33993) Ineffective scaling detection events are misleading
[ https://issues.apache.org/jira/browse/FLINK-33993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed FLINK-33993. -- Resolution: Fixed > Ineffective scaling detection events are misleading > --- > > Key: FLINK-33993 > URL: https://issues.apache.org/jira/browse/FLINK-33993 > Project: Flink > Issue Type: Bug > Components: Autoscaler, Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > When the ineffective scaling decision feature is turned off, events are > regenerated which look like this: > {noformat} > Skipping further scale up after ineffective previous scale up for > 65c763af14a952c064c400d516c25529 > {noformat} > This is misleading because no action will be taken. It is fair to inform > users about ineffective scale up even when the feature is disabled but a > different message should be printed to convey that no action will be taken. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33993) Ineffective scaling detection events are misleading
Maximilian Michels created FLINK-33993: -- Summary: Ineffective scaling detection events are misleading Key: FLINK-33993 URL: https://issues.apache.org/jira/browse/FLINK-33993 Project: Flink Issue Type: Bug Components: Autoscaler, Kubernetes Operator Affects Versions: kubernetes-operator-1.7.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 When the ineffective scaling decision feature is turned off, events are regenerated which look like this: {noformat} Skipping further scale up after ineffective previous scale up for 65c763af14a952c064c400d516c25529 {noformat} This is misleading because no action will be taken. It is fair to inform users about ineffective scale up even when the feature is disabled but a different message should be printed to convey that no action will be taken. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space
[ https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801843#comment-17801843 ] Maximilian Michels commented on FLINK-33940: [~Zhanghao Chen] Even though the factor only affects high parallelism operators > 840, I wonder whether we need to leave more room for scaleup. But I don't have a strong opinion. {quote} IIUC, when the parallelism of one job is very small(it's 1 or 2) and the max parallelism is 1024, one subtask will have 1024 keyGroups. From state backend side, too many key groups may effect the performance. (This is my concern to change it by default in Flink Community.) {quote} [~fanrui] I think we need to find out how big the performance impact actually is when jumping from 128 to 840 key groups. But 128 may just have been a very conservative number. > Update the auto-derivation rule of max parallelism for enlarged upscaling > space > --- > > Key: FLINK-33940 > URL: https://issues.apache.org/jira/browse/FLINK-33940 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > > *Background* > The choice of the max parallelism of an stateful operator is important as it > limits the upper bound of the parallelism of the opeartor while it can also > add extra overhead when being set too large. Currently, the max parallelism > of an opeartor is either fixed to a value specified by API core / pipeline > option or auto-derived with the following rules: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}} > *Problem* > Recently, the elasticity of Flink jobs is becoming more and more valued by > users. The current auto-derived max parallelism was introduced a time time > ago and only allows the operator parallelism to be roughly doubled, which is > not desired for elasticity. Setting an max parallelism manually may not be > desired as well: users may not have the sufficient expertise to select a good > max-parallelism value. > *Proposal* > Update the auto-derivation rule of max parallelism to derive larger max > parallelism for better elasticity experience out of the box. A candidate is > as follows: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), > 32767)}} > Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33940) Update the auto-derivation rule of max parallelism for enlarged upscaling space
[ https://issues.apache.org/jira/browse/FLINK-33940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801790#comment-17801790 ] Maximilian Michels commented on FLINK-33940: [~Zhanghao Chen] Thank you for the proposal. I agree with using highly composite numbers, as this will provide more flexibility to the autoscaler. I'm not sure about the {{operatorParallelism * 5}}. What is the rational for selecting this factor? Why not {{*10}}? > Update the auto-derivation rule of max parallelism for enlarged upscaling > space > --- > > Key: FLINK-33940 > URL: https://issues.apache.org/jira/browse/FLINK-33940 > Project: Flink > Issue Type: Improvement > Components: API / Core >Reporter: Zhanghao Chen >Assignee: Zhanghao Chen >Priority: Major > > *Background* > The choice of the max parallelism of an stateful operator is important as it > limits the upper bound of the parallelism of the opeartor while it can also > add extra overhead when being set too large. Currently, the max parallelism > of an opeartor is either fixed to a value specified by API core / pipeline > option or auto-derived with the following rules: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * 1.5), 128), 32767)}} > *Problem* > Recently, the elasticity of Flink jobs is becoming more and more valued by > users. The current auto-derived max parallelism was introduced a time time > ago and only allows the operator parallelism to be roughly doubled, which is > not desired for elasticity. Setting an max parallelism manually may not be > desired as well: users may not have the sufficient expertise to select a good > max-parallelism value. > *Proposal* > Update the auto-derivation rule of max parallelism to derive larger max > parallelism for better elasticity experience out of the box. A candidate is > as follows: > {{min(max(roundUpToPowerOfTwo(operatorParallelism * {*}5{*}), {*}1024{*}), > 32767)}} > Looking forward to your opinions on this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33795) Add new config to forbid autoscaling in certain periods of a day
[ https://issues.apache.org/jira/browse/FLINK-33795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-33795. Fix Version/s: kubernetes-operator-1.8.0 Assignee: yonghua jian Resolution: Fixed > Add new config to forbid autoscaling in certain periods of a day > > > Key: FLINK-33795 > URL: https://issues.apache.org/jira/browse/FLINK-33795 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yonghua jian >Assignee: yonghua jian >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Add new config to forbid autoscaling in certain periods of a day so that we > keep flink job unaffected by autoscaling's job restart behavior during this > periods -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33917) IllegalArgumentException: hostname can't be null
[ https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-33917. Fix Version/s: kubernetes-operator-1.8.0 Assignee: Tom Resolution: Fixed > IllegalArgumentException: hostname can't be null > > > Key: FLINK-33917 > URL: https://issues.apache.org/jira/browse/FLINK-33917 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Tom >Assignee: Tom >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > In certain scenarios, if the hostname contains certain characters it will > throw an exception when it tries to initialize the `InetSocketAddress` > > {code:java} > java.lang.IllegalArgumentException: hostname can't be null at > java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149) > at > java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code} > > [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236] > > {code:java} > @Override > public boolean isJobManagerPortReady(Configuration config) { > final URI uri; > try (var clusterClient = getClusterClient(config)) { > uri = URI.create(clusterClient.getWebInterfaceURL()); > } catch (Exception ex) { > throw new FlinkRuntimeException(ex); > } > SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), > uri.getPort()); > Socket socket = new Socket(); > try { > socket.connect(socketAddress, 1000); > socket.close(); > return true; > } catch (IOException e) { > return false; > } > } > {code} > > Here's a simple test to reproduce > > URL > {code:java} > @ParameterizedTest > @ValueSource( > strings = {"http://127.0.0.1:8081";, "http://123-dev:8081";, > "http://dev-test.abc:8081";, "http://dev-test.1a:8081";, > "http://dev-test.abc01:8081"}) > void testURLAddresses(String inputAddress) { > assertDoesNotThrow( > () -> { > final URL url = new URL(inputAddress); > new InetSocketAddress(url.getHost(), url.getPort()); > }); > } {code} > > URI > > {code:java} > @ParameterizedTest > @ValueSource( > strings = {"http://127.0.0.1:8081";, "http://123-dev:8081";, > "http://dev-test.abc:8081";, "http://dev-test.1a:8081";, > "http://dev-test.abc01:8081"}) > void testURIAddresses(String inputAddress) { > assertDoesNotThrow( > () -> { > final URI uri = new URI(inputAddress); > new InetSocketAddress(uri.getHost(), uri.getPort()); > }); > } {code} > > All test cases past except for "http://dev-test.1a:8081"; which is a valid > flink host url, but not a valid URI > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33917) IllegalArgumentException: hostname can't be null
[ https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799701#comment-17799701 ] Maximilian Michels commented on FLINK-33917: {{new URI("123-test").getHost()}} returns null. I’m not 100% sure this is a JDK bug. There may be some ambiguity when resolving URIs without all spec parts. But let’s see what upstream says. > IllegalArgumentException: hostname can't be null > > > Key: FLINK-33917 > URL: https://issues.apache.org/jira/browse/FLINK-33917 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Tom >Priority: Major > Labels: pull-request-available > > In certain scenarios, if the hostname contains certain characters it will > throw an exception when it tries to initialize the `InetSocketAddress` > > {code:java} > java.lang.IllegalArgumentException: hostname can't be null at > java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149) > at > java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code} > > [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236] > > {code:java} > @Override > public boolean isJobManagerPortReady(Configuration config) { > final URI uri; > try (var clusterClient = getClusterClient(config)) { > uri = URI.create(clusterClient.getWebInterfaceURL()); > } catch (Exception ex) { > throw new FlinkRuntimeException(ex); > } > SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), > uri.getPort()); > Socket socket = new Socket(); > try { > socket.connect(socketAddress, 1000); > socket.close(); > return true; > } catch (IOException e) { > return false; > } > } > {code} > > Here's a simple test to reproduce > > URL > {code:java} > @ParameterizedTest > @ValueSource( > strings = {"http://127.0.0.1:8081";, "http://123-dev:8081";, > "http://dev-test.abc:8081";, "http://dev-test.1a:8081";, > "http://dev-test.abc01:8081"}) > void testURLAddresses(String inputAddress) { > assertDoesNotThrow( > () -> { > final URL url = new URL(inputAddress); > new InetSocketAddress(url.getHost(), url.getPort()); > }); > } {code} > > URI > > {code:java} > @ParameterizedTest > @ValueSource( > strings = {"http://127.0.0.1:8081";, "http://123-dev:8081";, > "http://dev-test.abc:8081";, "http://dev-test.1a:8081";, > "http://dev-test.abc01:8081"}) > void testURIAddresses(String inputAddress) { > assertDoesNotThrow( > () -> { > final URI uri = new URI(inputAddress); > new InetSocketAddress(uri.getHost(), uri.getPort()); > }); > } {code} > > All test cases past except for "http://dev-test.1a:8081"; which is a valid > flink host url, but not a valid URI > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33917) IllegalArgumentException: hostname can't be null
[ https://issues.apache.org/jira/browse/FLINK-33917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799352#comment-17799352 ] Maximilian Michels commented on FLINK-33917: The description doesn’t describe under which circumstances the host name can be parsed as null. One example is {{new URI("123-test")}} which will return a null host name because the string is parsed as a URI path. Flink itself returns a stringified URL object. So using URL instead works fine. > IllegalArgumentException: hostname can't be null > > > Key: FLINK-33917 > URL: https://issues.apache.org/jira/browse/FLINK-33917 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Reporter: Tom >Priority: Major > Labels: pull-request-available > > In certain scenarios, if the hostname contains certain characters it will > throw an exception when it tries to initialize the `InetSocketAddress` > > {code:java} > java.lang.IllegalArgumentException: hostname can't be null at > java.base/java.net.InetSocketAddress.checkHost(InetSocketAddress.java:149) > at > java.base/java.net.InetSocketAddress.(InetSocketAddress.java:216) {code} > > [https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java|https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java#L236] > > {code:java} > @Override > public boolean isJobManagerPortReady(Configuration config) { > final URI uri; > try (var clusterClient = getClusterClient(config)) { > uri = URI.create(clusterClient.getWebInterfaceURL()); > } catch (Exception ex) { > throw new FlinkRuntimeException(ex); > } > SocketAddress socketAddress = new InetSocketAddress(uri.getHost(), > uri.getPort()); > Socket socket = new Socket(); > try { > socket.connect(socketAddress, 1000); > socket.close(); > return true; > } catch (IOException e) { > return false; > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33770) Autoscaler logs are full of deprecated key warnings
[ https://issues.apache.org/jira/browse/FLINK-33770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-33770. Resolution: Fixed > Autoscaler logs are full of deprecated key warnings > --- > > Key: FLINK-33770 > URL: https://issues.apache.org/jira/browse/FLINK-33770 > Project: Flink > Issue Type: Bug > Components: Autoscaler, Kubernetes Operator >Affects Versions: kubernetes-operator-1.7.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > We moved all autoscaler configuration from > {{kubernetes.operator.job.autoscaler.*}} to {{job.autoscaler.*}}. > With the latest release, the logs are full with logs like this: > {noformat} > level: WARN > logger: org.apache.flink.configuration.Configuration > message: Config uses deprecated configuration key > 'kubernetes.operator.job.autoscaler.target.utilization' instead of proper key > 'job.autoscaler.target.utilization' > {noformat} > The reason is that the configuration is loaded for every reconciliation. > This configuration is already widely adopted across hundreds of pipelines. I > propose to remove the deprecation from the config keys and make them > "fallback" keys instead which removes the deprecation warning. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-31400) Add autoscaler integration for Iceberg source
[ https://issues.apache.org/jira/browse/FLINK-31400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned FLINK-31400: -- Assignee: Mason Chen > Add autoscaler integration for Iceberg source > - > > Key: FLINK-31400 > URL: https://issues.apache.org/jira/browse/FLINK-31400 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Mason Chen >Priority: Major > > A very critical part in the scaling algorithm is setting the source > processing rate correctly such that the Flink pipeline can keep up with the > ingestion rate. The autoscaler does that by looking at the {{pendingRecords}} > Flink source metric. Even if that metric is not available, the source can > still be sized according to the busyTimeMsPerSecond metric, but there will be > no backlog information available. For Kafka, the autoscaler also determines > the number of partitions to avoid scaling higher than the maximum number of > partitions. > In order to support a wider range of use cases, we should investigate an > integration with the Iceberg source. As far as I know, it does not expose the > pedingRecords metric, nor does the autoscaler know about other constraints, > e.g. the maximum number of open files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn
[ https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-31502: --- Fix Version/s: kubernetes-operator-1.8.0 (was: kubernetes-operator-1.5.0) > Limit the number of concurrent scale operations to reduce cluster churn > --- > > Key: FLINK-31502 > URL: https://issues.apache.org/jira/browse/FLINK-31502 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Until we move to using the upcoming Rescale API which recycles pods, we need > to be mindful with how many deployments we scale at the same time because > each of them is going to give up all its pods and require the new number of > required pods. > This can cause churn in the cluster and temporary lead to "unallocatable" > pods which triggers the k8s cluster autoscaler to add more cluster nodes. > That is often not desirable because the actual required resources after the > scaling have been settled, are lower. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33771) Add cluster capacity awareness to Autoscaler
[ https://issues.apache.org/jira/browse/FLINK-33771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-33771: --- Description: To avoid starvation of pipelines when the Kubernetes cluster runs out of resources, new scaling attempts should be stopped. The Rescaling API will probably prevent most of this cases but we will also have to double-check there. For the config-based parallelism overrides, we have pretty good heuristics in the operator to check in Kubernetes for the approximate number of free cluster resources, the max cluster scaleup for the Cluster Autoscaler, and the required scaling costs. was: To avoid starvation of pipelines when the Kubernetes cluster runs out of resources, new scaling attempts should be stopped. The Rescaling API will probably prevent most of this cases but we will also have to double-check there. For the config-based parallelism overrides, we have pretty good heuristics in the operator to check in Kubernetes for the approximate number of free cluster resources and the required scaling costs. > Add cluster capacity awareness to Autoscaler > > > Key: FLINK-33771 > URL: https://issues.apache.org/jira/browse/FLINK-33771 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: kubernetes-operator-1.8.0 > > > To avoid starvation of pipelines when the Kubernetes cluster runs out of > resources, new scaling attempts should be stopped. > The Rescaling API will probably prevent most of this cases but we will also > have to double-check there. > For the config-based parallelism overrides, we have pretty good heuristics in > the operator to check in Kubernetes for the approximate number of free > cluster resources, the max cluster scaleup for the Cluster Autoscaler, and > the required scaling costs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33773) Add fairness to scaling decisions
Maximilian Michels created FLINK-33773: -- Summary: Add fairness to scaling decisions Key: FLINK-33773 URL: https://issues.apache.org/jira/browse/FLINK-33773 Project: Flink Issue Type: New Feature Components: Autoscaler, Deployment / Kubernetes Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 The current scaling logic is inherently unfair. In a scenario of heavy backlog, whichever pipelines come first, they will end up taking most of the resources. Some kind of fairness should be introduced, for example: * Cap the max number of resulting pods at a % of the cluster resources * Allow scale up round-robin across all pipelines -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33771) Add cluster capacity awareness to Autoscaler
Maximilian Michels created FLINK-33771: -- Summary: Add cluster capacity awareness to Autoscaler Key: FLINK-33771 URL: https://issues.apache.org/jira/browse/FLINK-33771 Project: Flink Issue Type: New Feature Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 To avoid starvation of pipelines when the Kubernetes cluster runs out of resources, new scaling attempts should be stopped. The Rescaling API will probably prevent most of this cases but we will also have to double-check there. For the config-based parallelism overrides, we have pretty good heuristics in the operator to check in Kubernetes for the approximate number of free cluster resources and the required scaling costs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-31502) Limit the number of concurrent scale operations to reduce cluster churn
[ https://issues.apache.org/jira/browse/FLINK-31502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reopened FLINK-31502: Reopening because this is an actual issue. > Limit the number of concurrent scale operations to reduce cluster churn > --- > > Key: FLINK-31502 > URL: https://issues.apache.org/jira/browse/FLINK-31502 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.5.0 > > > Until we move to using the upcoming Rescale API which recycles pods, we need > to be mindful with how many deployments we scale at the same time because > each of them is going to give up all its pods and require the new number of > required pods. > This can cause churn in the cluster and temporary lead to "unallocatable" > pods which triggers the k8s cluster autoscaler to add more cluster nodes. > That is often not desirable because the actual required resources after the > scaling have been settled, are lower. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33770) Autoscaler logs are full of deprecated key warnings
Maximilian Michels created FLINK-33770: -- Summary: Autoscaler logs are full of deprecated key warnings Key: FLINK-33770 URL: https://issues.apache.org/jira/browse/FLINK-33770 Project: Flink Issue Type: Bug Components: Autoscaler, Kubernetes Operator Affects Versions: kubernetes-operator-1.7.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 We moved all autoscaler configuration from {{kubernetes.operator.job.autoscaler.*}} to {{job.autoscaler.*}}. With the latest release, the logs are full with logs like this: {noformat} level: WARN logger: org.apache.flink.configuration.Configuration message: Config uses deprecated configuration key 'kubernetes.operator.job.autoscaler.target.utilization' instead of proper key 'job.autoscaler.target.utilization' {noformat} The reason is that the configuration is loaded for every reconciliation. This configuration is already widely adopted across hundreds of pipelines. I propose to remove the deprecation from the config keys and make them "fallback" keys instead which removes the deprecation warning. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33710) Autoscaler redeploys pipeline for a NOOP parallelism change
[ https://issues.apache.org/jira/browse/FLINK-33710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793800#comment-17793800 ] Maximilian Michels commented on FLINK-33710: Additional fix via ca1d8472d1a1e817268950dae079592581fa5b8f to prevent any existing deployments to get affected. > Autoscaler redeploys pipeline for a NOOP parallelism change > --- > > Key: FLINK-33710 > URL: https://issues.apache.org/jira/browse/FLINK-33710 > Project: Flink > Issue Type: Bug > Components: Autoscaler, Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.7.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > The operator supports two modes to apply autoscaler changes: > # Use the internal Flink config {{pipeline.jobvertex-parallelism-overrides}} > # Make use of Flink's Rescale API > For (1), a string has to be generated for the Flink config with the actual > overrides. This string has to be deterministic for a given map. But it is not. > Consider the following observed log: > {noformat} > >>> Event | Info| SPECCHANGED | SCALE change(s) detected (Diff: > FlinkDeploymentSpec[flinkConfiguration.pipeline.jobvertex-parallelism-overrides > : > 92542d1280187bd464274368a5f86977:3,9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,1652184ffd0522859c7840a24936847c:1 > -> > 9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,92542d1280187bd464274368a5f86977:3,1652184ffd0522859c7840a24936847c:1]), > starting reconciliation. > {noformat} > The overrides are identical but the order is different which triggers a > redeploy. This does not seem to happen often but some deterministic string > generation (e.g. sorting by key) is required to prevent any NOOP updates. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33522) Savepoint upgrade mode fails despite the savepoint succeeding
[ https://issues.apache.org/jira/browse/FLINK-33522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-33522: --- Fix Version/s: kubernetes-operator-1.8.0 (was: kubernetes-operator-1.7.0) > Savepoint upgrade mode fails despite the savepoint succeeding > - > > Key: FLINK-33522 > URL: https://issues.apache.org/jira/browse/FLINK-33522 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.6.1 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Under certain circumstances, savepoint creation can succeed but the job fails > afterwards. One example is when there are messages being distributed by the > source coordinator to finished tasks. This is possibly a Flink bug although > it's not clear yet how to solve the issue. > After the savepoint succeeded Flink fails the job like this: > {noformat} > Source (1/2) > (cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) > switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). > {noformat} > {noformat} > An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering > task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', > targetTask: Source (1/2) - execution #0 > Caused by: > org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task > is not running, but in state FINISHED >at > org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502) >at > org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask > {noformat} > Inside the operator this is processed as: > {noformat} > java.util.concurrent.CompletionException: > org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException: > A savepoint has been created at: s3://..., but the corresponding job > 1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is > consistent, but might have uncommitted transactions. If you want to commit > the transaction please restart a job from this savepoint. > > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) > > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319) > > > org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121) > > > org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223) > > > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122) > > > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163) > > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136) > > > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56) > > > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138) > > > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) > > > org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) > > > io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) > > > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139) > > > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119) > > > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89) > > > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62) > > > io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > java.la
[jira] [Commented] (FLINK-33522) Savepoint upgrade mode fails despite the savepoint succeeding
[ https://issues.apache.org/jira/browse/FLINK-33522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17791975#comment-17791975 ] Maximilian Michels commented on FLINK-33522: Additional fix required via 51a91049b5f17f8a0b21e11feceb4410a97c50c1. > Savepoint upgrade mode fails despite the savepoint succeeding > - > > Key: FLINK-33522 > URL: https://issues.apache.org/jira/browse/FLINK-33522 > Project: Flink > Issue Type: Bug > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0, kubernetes-operator-1.6.1 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Under certain circumstances, savepoint creation can succeed but the job fails > afterwards. One example is when there are messages being distributed by the > source coordinator to finished tasks. This is possibly a Flink bug although > it's not clear yet how to solve the issue. > After the savepoint succeeded Flink fails the job like this: > {noformat} > Source (1/2) > (cd4d56ddb71c0e763cc400bcfe2fd8ac_4081cf0163fcce7fe6af0cf07ad2d43c_0_0) > switched from RUNNING to FAILED on host-taskmanager-1-1 @ ip(dataPort=36519). > {noformat} > {noformat} > An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering > task failover to ensure consistency. Event: 'AddSplitEvents[[[B@722a23fa]]', > targetTask: Source (1/2) - execution #0 > Caused by: > org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task > is not running, but in state FINISHED >at > org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1502) >at > org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask > {noformat} > Inside the operator this is processed as: > {noformat} > java.util.concurrent.CompletionException: > org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException: > A savepoint has been created at: s3://..., but the corresponding job > 1b1a3061194c62ded6e2fe823b61b2ea failed during stopping. The savepoint is > consistent, but might have uncommitted transactions. If you want to commit > the transaction please restart a job from this savepoint. > > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) > > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.cancelJob(AbstractFlinkService.java:319) > > > org.apache.flink.kubernetes.operator.service.NativeFlinkService.cancelJob(NativeFlinkService.java:121) > > > org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.cancelJob(ApplicationReconciler.java:223) > > > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:122) > > > org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:163) > > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:136) > > > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56) > > > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138) > > > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) > > > org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) > > > io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) > > > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139) > > > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119) > > > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89) > > > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62) > > > io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
[jira] [Created] (FLINK-33710) Autoscaler redeploys pipeline for a NOOP parallelism change
Maximilian Michels created FLINK-33710: -- Summary: Autoscaler redeploys pipeline for a NOOP parallelism change Key: FLINK-33710 URL: https://issues.apache.org/jira/browse/FLINK-33710 Project: Flink Issue Type: Bug Components: Autoscaler, Kubernetes Operator Affects Versions: kubernetes-operator-1.7.0, kubernetes-operator-1.6.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 The operator supports two modes to apply autoscaler changes: # Use the internal Flink config {{pipeline.jobvertex-parallelism-overrides}} # Make use of Flink's Rescale API For (1), a string has to be generated for the Flink config with the actual overrides. This string has to be deterministic for a given map. But it is not. Consider the following observed log: {noformat} >>> Event | Info| SPECCHANGED | SCALE change(s) detected (Diff: FlinkDeploymentSpec[flinkConfiguration.pipeline.jobvertex-parallelism-overrides : 92542d1280187bd464274368a5f86977:3,9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,1652184ffd0522859c7840a24936847c:1 -> 9f979ed859083299d29f281832cb5be0:1,84881d7bda0dc3d44026e37403420039:1,92542d1280187bd464274368a5f86977:3,1652184ffd0522859c7840a24936847c:1]), starting reconciliation. {noformat} The overrides are identical but the order is different which triggers a redeploy. This does not seem to happen often but some deterministic string generation (e.g. sorting by key) is required to prevent any NOOP updates. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-30593) Determine restart time on the fly for Autoscaler
[ https://issues.apache.org/jira/browse/FLINK-30593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-30593. Fix Version/s: kubernetes-operator-1.8.0 Resolution: Fixed > Determine restart time on the fly for Autoscaler > > > Key: FLINK-30593 > URL: https://issues.apache.org/jira/browse/FLINK-30593 > Project: Flink > Issue Type: New Feature > Components: Autoscaler, Kubernetes Operator >Reporter: Gyula Fora >Assignee: Alexander Fedulov >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.8.0 > > > Currently the autoscaler uses a preconfigured restart time for the job. We > should dynamically adjust this on the observered restart times for scale > operations. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state
[ https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels closed FLINK-33572. -- Resolution: Fixed Implemented via 02840b96ef3116ea95a440af4f945398900d89df and ab0ec081eac86619a22632616fa4c01c074ecd33. > Minimize ConfigMap API operations for autoscaler state > -- > > Key: FLINK-33572 > URL: https://issues.apache.org/jira/browse/FLINK-33572 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.7.0 > > > The newly introduced flush operation after the refactoring the autoscaler > interfaces, optimizes the number of write operations to the underlying state > store. A couple of further optimizations: > 1. Any writes should be deferred until flush is called. > 2. The flush routine should detect whether a write is needed and writing if > there are no changes > 3. Clearing state should only require one write operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state
[ https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-33572: --- Release Note: Minimize ConfigMap operations for autoscaler state (was: Implemented via 02840b96ef3116ea95a440af4f945398900d89df and ab0ec081eac86619a22632616fa4c01c074ecd33.) > Minimize ConfigMap API operations for autoscaler state > -- > > Key: FLINK-33572 > URL: https://issues.apache.org/jira/browse/FLINK-33572 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.7.0 > > > The newly introduced flush operation after the refactoring the autoscaler > interfaces, optimizes the number of write operations to the underlying state > store. A couple of further optimizations: > 1. Any writes should be deferred until flush is called. > 2. The flush routine should detect whether a write is needed and writing if > there are no changes > 3. Clearing state should only require one write operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-33572) Minimize ConfigMap API operations for autoscaler state
[ https://issues.apache.org/jira/browse/FLINK-33572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reopened FLINK-33572: > Minimize ConfigMap API operations for autoscaler state > -- > > Key: FLINK-33572 > URL: https://issues.apache.org/jira/browse/FLINK-33572 > Project: Flink > Issue Type: Improvement > Components: Autoscaler, Kubernetes Operator >Affects Versions: kubernetes-operator-1.6.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.7.0 > > > The newly introduced flush operation after the refactoring the autoscaler > interfaces, optimizes the number of write operations to the underlying state > store. A couple of further optimizations: > 1. Any writes should be deferred until flush is called. > 2. The flush routine should detect whether a write is needed and writing if > there are no changes > 3. Clearing state should only require one write operation. -- This message was sent by Atlassian Jira (v8.20.10#820010)