This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new ccf3639c [docs][autoscaler] Autoscaler docs and default config improvement ccf3639c is described below commit ccf3639c87400e3bbd94f64ad13dc5d40fd55faf Author: Gyula Fora <g_f...@apple.com> AuthorDate: Thu Aug 17 09:55:59 2023 +0200 [docs][autoscaler] Autoscaler docs and default config improvement --- docs/content/docs/custom-resource/autoscaler.md | 93 +++++++++++++++++---- .../generated/auto_scaler_configuration.html | 2 +- .../static/img/custom-resource/autoscaler_fig1.png | Bin 0 -> 168727 bytes .../static/img/custom-resource/autoscaler_fig2.png | Bin 0 -> 181157 bytes .../static/img/custom-resource/autoscaler_fig3.png | Bin 0 -> 178610 bytes .../autoscaler/config/AutoScalerOptions.java | 2 +- 6 files changed, 78 insertions(+), 19 deletions(-) diff --git a/docs/content/docs/custom-resource/autoscaler.md b/docs/content/docs/custom-resource/autoscaler.md index 201ab3d8..4140122a 100644 --- a/docs/content/docs/custom-resource/autoscaler.md +++ b/docs/content/docs/custom-resource/autoscaler.md @@ -26,7 +26,7 @@ under the License. # Autoscaler -The operator provides a job autoscaler functionality that collects various metrics from running Flink jobs and automatically scales individual job vertexes (chained operator groups) to eliminate backpressure and satisfy the utilization and catch-up duration target set by the user. +The operator provides a job autoscaler functionality that collects various metrics from running Flink jobs and automatically scales individual job vertexes (chained operator groups) to eliminate backpressure and satisfy the utilization target set by the user. By adjusting parallelism on a job vertex level (in contrast to job parallelism) we can efficiently autoscale complex and heterogeneous streaming applications. Key benefits to the user: @@ -35,26 +35,78 @@ Key benefits to the user: - Automatic adaptation to changing load patterns - Detailed utilization metrics for performance debugging -Job requirements: - - The autoscaler currently only works with the latest [Flink 1.17](https://hub.docker.com/_/flink) or after backporting the following fixes to your 1.15/1.16 Flink image - - Job vertex parallelism overrides (must have) - - [Add option to override job vertex parallelisms during job submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9) - - [Change ForwardPartitioner to RebalancePartitioner on parallelism changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits) - - [Fix logic for determining downstream subtasks for partitioner replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3) - - [Support timespan for busyTime metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35) (good to have) - - Source scaling only supports modern sources which - - use the new [Source API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) that exposes the busy time metric (must have, most common connectors already do) - - expose the [standardized connector metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics) for accessing backlog information (good to have, extra capacity will be added for catching up with backlog) +## Overview -In the current state the autoscaler works best with Kafka sources, as they expose all the standardized metrics. It also comes with some additional benefits when using Kafka such as automatically detecting and limiting source max parallelism to the number of Kafka partitions. +The autoscaler relies on the metrics exposed by the Flink metric system for the individual tasks. The metrics are queried directly from the Flink job. -{{< hint info >}} -The autoscaler also supports a passive/metrics-only mode where it only collects and evaluates scaling related performance metrics but does not trigger any job upgrades. -This can be used to gain confidence in the module without any impact on the running applications. +Collected metrics: + - Backlog information at each source + - Incoming data rate at the sources (e.g. records/sec written into the Kafka topic) + - Number of records processed per second in each job vertex + - Busy time per second of each job vertex (current utilization) -To disable scaling actions, set: `kubernetes.operator.job.autoscaler.scaling.enabled: "false"` +{{< hint info >}} +Please note that we are not using any container memory / CPU utilization metrics directly here. High utilization will be reflected in the processing rate and busy time metrics of the individual job vertexes. {{< /hint >}} +The algorithm starts from the sources and recursively computes the required processing capacity (target data rate) for each operator in the pipeline. At the source vertices, target data rate is equal to incoming data rate (from the Kafka topic). + +For downstream operators we compute the target data rate as the sum of the input (upstream) operators output data rate along the given edge in the processing graph. + +{{< img src="/img/custom-resource/autoscaler_fig1.png" alt="Computing Target Data Rates" >}} + +Users configure the target utilization percentage of the operators in the pipeline, e.g. keep the all operators between 60% - 80% busy. The autoscaler then finds a parallelism configuration such that the output rates of all operators match the input rates of all their downstream operators at the targeted utilization. + +In this example we see an upscale operation: + +{{< img src="/img/custom-resource/autoscaler_fig2.png" alt="Scaling Up" >}} + +Similarly as load decreases, the autoscaler adjusts individual operator parallelism levels to match the current rate over time. + +{{< img src="/img/custom-resource/autoscaler_fig3.png" alt="Scaling Down" >}} + +The autoscaler approach is based on [Three steps is all you need: fast, accurate, automatic scaling decisions for distributed streaming dataflows](https://www.usenix.org/system/files/osdi18-kalavri.pdf) by Kalavri et al. + +## Executing rescaling operations + +By default the autoscaler uses the built in job upgrade mechanism from the operator to perform the rescaling as detailed in [Job Management and Stateful upgrades]({{< ref "docs/custom-resource/job-management" >}}). + +### Flink 1.18 and in-place scaling support + +The upcoming Flink 1.18 release brings very significant improvements to the speed of scaling operations through the new resource requirements rest endpoint. +This allows the autoscaler to scale vertices in-place without performing a full job upgrade cycle. + +To try this experimental feature, please use the currently available Flink 1.18 snapshot base image to build you application docker image. +Furthermore make sure you set Flink version to `v1_18` in your FlinkDeployment yaml and enable the adaptive scheduler which is required for this feature. + +``` +jobmanager.scheduler: adaptive +``` + +## Job Requirements and Limitations + +### Requirements + +The autoscaler currently only works with [Flink 1.17](https://hub.docker.com/_/flink) and later flink images, or after backporting the following fixes to your 1.15/1.16 Flink images: + + - Job vertex parallelism overrides (must have) + - [Add option to override job vertex parallelisms during job submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9) + - [Change ForwardPartitioner to RebalancePartitioner on parallelism changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits) + - [Fix logic for determining downstream subtasks for partitioner replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3) + - [Support timespan for busyTime metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35) (good to have) + +### Limitations + +By default the autoscaler can work for all job vertices in the processing graph. + +However source scaling requires that the sources: + + - Use the new [Source API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) that exposes the busy time metric (must have, most common connectors already do) + - Expose the [standardized connector metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics) for accessing backlog information (good to have, extra capacity will be added for catching up with backlog) + +In the current state the autoscaler works best with Kafka sources, as they expose all the standardized metrics. It also comes with some additional benefits when using Kafka such as automatically detecting and limiting source max parallelism to the number of Kafka partitions. + + ## Configuration guide Depending on your environment and job characteristics there are a few very important configurations that will affect how well the autoscaler works. @@ -67,6 +119,13 @@ Key configuration areas: The defaults might work reasonably well for many applications, but some tuning may be required in this early stage of the autoscaler module. +{{< hint info >}} +The autoscaler also supports a passive/metrics-only mode where it only collects and evaluates scaling related performance metrics but does not trigger any job upgrades. +This can be used to gain confidence in the module without any impact on the running applications. + +To disable scaling actions, set: `kubernetes.operator.job.autoscaler.scaling.enabled: "false"` +{{< /hint >}} + ### Job and per operator max parallelism When computing the scaled parallelism, the autoscaler always considers the max parallelism settings for each job vertex to ensure that it doesn't introduce unnecessary data skew. @@ -109,7 +168,7 @@ The amount of extra capacity is determined automatically by the following 2 conf In the future the autoscaler may be able to automatically determine the restart time, but the target catch-up duration depends on the users SLO. By lowering the catch-up duration the autoscaler will have to reserve more extra capacity for the scaling actions. -We suggest setting this based on your actual objective, such us 1, 5, 10 minutes etc. +We suggest setting this based on your actual objective, such us 10,30,60 minutes etc. ### Basic configuration example ```yaml diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html index a3e7b069..95af5541 100644 --- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -16,7 +16,7 @@ </tr> <tr> <td><h5>kubernetes.operator.job.autoscaler.catch-up.duration</h5></td> - <td style="word-wrap: break-word;">5 min</td> + <td style="word-wrap: break-word;">15 min</td> <td>Duration</td> <td>The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.</td> </tr> diff --git a/docs/static/img/custom-resource/autoscaler_fig1.png b/docs/static/img/custom-resource/autoscaler_fig1.png new file mode 100644 index 00000000..5cb37222 Binary files /dev/null and b/docs/static/img/custom-resource/autoscaler_fig1.png differ diff --git a/docs/static/img/custom-resource/autoscaler_fig2.png b/docs/static/img/custom-resource/autoscaler_fig2.png new file mode 100644 index 00000000..9047e96c Binary files /dev/null and b/docs/static/img/custom-resource/autoscaler_fig2.png differ diff --git a/docs/static/img/custom-resource/autoscaler_fig3.png b/docs/static/img/custom-resource/autoscaler_fig3.png new file mode 100644 index 00000000..739c84f0 Binary files /dev/null and b/docs/static/img/custom-resource/autoscaler_fig3.png differ diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java index b31a9bec..df11e28a 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java @@ -109,7 +109,7 @@ public class AutoScalerOptions { public static final ConfigOption<Duration> CATCH_UP_DURATION = autoScalerConfig("catch-up.duration") .durationType() - .defaultValue(Duration.ofMinutes(5)) + .defaultValue(Duration.ofMinutes(15)) .withDescription( "The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.");