This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ccf3639c [docs][autoscaler] Autoscaler docs and default config 
improvement
ccf3639c is described below

commit ccf3639c87400e3bbd94f64ad13dc5d40fd55faf
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Thu Aug 17 09:55:59 2023 +0200

    [docs][autoscaler] Autoscaler docs and default config improvement
---
 docs/content/docs/custom-resource/autoscaler.md    |  93 +++++++++++++++++----
 .../generated/auto_scaler_configuration.html       |   2 +-
 .../static/img/custom-resource/autoscaler_fig1.png | Bin 0 -> 168727 bytes
 .../static/img/custom-resource/autoscaler_fig2.png | Bin 0 -> 181157 bytes
 .../static/img/custom-resource/autoscaler_fig3.png | Bin 0 -> 178610 bytes
 .../autoscaler/config/AutoScalerOptions.java       |   2 +-
 6 files changed, 78 insertions(+), 19 deletions(-)

diff --git a/docs/content/docs/custom-resource/autoscaler.md 
b/docs/content/docs/custom-resource/autoscaler.md
index 201ab3d8..4140122a 100644
--- a/docs/content/docs/custom-resource/autoscaler.md
+++ b/docs/content/docs/custom-resource/autoscaler.md
@@ -26,7 +26,7 @@ under the License.
 
 # Autoscaler
 
-The operator provides a job autoscaler functionality that collects various 
metrics from running Flink jobs and automatically scales individual job 
vertexes (chained operator groups) to eliminate backpressure and satisfy the 
utilization and catch-up duration target set by the user.
+The operator provides a job autoscaler functionality that collects various 
metrics from running Flink jobs and automatically scales individual job 
vertexes (chained operator groups) to eliminate backpressure and satisfy the 
utilization target set by the user.
 By adjusting parallelism on a job vertex level (in contrast to job 
parallelism) we can efficiently autoscale complex and heterogeneous streaming 
applications.
 
 Key benefits to the user:
@@ -35,26 +35,78 @@ Key benefits to the user:
  - Automatic adaptation to changing load patterns
  - Detailed utilization metrics for performance debugging
 
-Job requirements:
- - The autoscaler currently only works with the latest [Flink 
1.17](https://hub.docker.com/_/flink) or after backporting the following fixes 
to your 1.15/1.16 Flink image
-   - Job vertex parallelism overrides (must have)
-     - [Add option to override job vertex parallelisms during job 
submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9)
-     - [Change ForwardPartitioner to RebalancePartitioner on parallelism 
changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits)
-     - [Fix logic for determining downstream subtasks for partitioner 
replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3)
-   - [Support timespan for busyTime 
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
 (good to have)
- - Source scaling only supports modern sources which
-   - use the new [Source 
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 that exposes the busy time metric (must have, most common connectors already 
do)
-   - expose the [standardized connector 
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)
 for accessing backlog information (good to have, extra capacity will be added 
for catching up with backlog)
+## Overview
 
-In the current state the autoscaler works best with Kafka sources, as they 
expose all the standardized metrics. It also comes with some additional 
benefits when using Kafka such as automatically detecting and limiting source 
max parallelism to the number of Kafka partitions.
+The autoscaler relies on the metrics exposed by the Flink metric system for 
the individual tasks. The metrics are queried directly from the Flink job.
 
-{{< hint info >}}
-The autoscaler also supports a passive/metrics-only mode where it only 
collects and evaluates scaling related performance metrics but does not trigger 
any job upgrades.
-This can be used to gain confidence in the module without any impact on the 
running applications.
+Collected metrics:
+ - Backlog information at each source
+ - Incoming data rate at the sources (e.g. records/sec written into the Kafka 
topic)
+ - Number of records processed per second in each job vertex
+ - Busy time per second of each job vertex (current utilization)
 
-To disable scaling actions, set: 
`kubernetes.operator.job.autoscaler.scaling.enabled: "false"`
+{{< hint info >}}
+Please note that we are not using any container memory / CPU utilization 
metrics directly here. High utilization will be reflected in the processing 
rate and busy time metrics of the individual job vertexes.
 {{< /hint >}}
 
+The algorithm starts from the sources and recursively computes the required 
processing capacity (target data rate) for each operator in the pipeline. At 
the source vertices, target data rate is equal to incoming data rate (from the 
Kafka topic).
+
+For downstream operators we compute the target data rate as the sum of the 
input (upstream) operators output data rate along the given edge in the 
processing graph.
+
+{{< img src="/img/custom-resource/autoscaler_fig1.png" alt="Computing Target 
Data Rates" >}}
+
+Users configure the target utilization percentage of the operators in the 
pipeline, e.g. keep the all operators between 60% - 80% busy. The autoscaler 
then finds a parallelism configuration such that the output rates of all 
operators match the input rates of all their downstream operators at the 
targeted utilization.
+
+In this example we see an upscale operation:
+
+{{< img src="/img/custom-resource/autoscaler_fig2.png" alt="Scaling Up" >}}
+
+Similarly as load decreases, the autoscaler adjusts individual operator 
parallelism levels to match the current rate over time.
+
+{{< img src="/img/custom-resource/autoscaler_fig3.png" alt="Scaling Down" >}}
+
+The autoscaler approach is based on [Three steps is all you need: fast, 
accurate, automatic scaling decisions for distributed streaming 
dataflows](https://www.usenix.org/system/files/osdi18-kalavri.pdf) by Kalavri 
et al.
+
+## Executing rescaling operations
+
+By default the autoscaler uses the built in job upgrade mechanism from the 
operator to perform the rescaling as detailed in [Job Management and Stateful 
upgrades]({{< ref "docs/custom-resource/job-management" >}}).
+
+### Flink 1.18 and in-place scaling support
+
+The upcoming Flink 1.18 release brings very significant improvements to the 
speed of scaling operations through the new resource requirements rest endpoint.
+This allows the autoscaler to scale vertices in-place without performing a 
full job upgrade cycle.
+
+To try this experimental feature, please use the currently available Flink 
1.18 snapshot base image to build you application docker image.
+Furthermore make sure you set Flink version to `v1_18` in your FlinkDeployment 
yaml and enable the adaptive scheduler which is required for this feature.
+
+```
+jobmanager.scheduler: adaptive
+```
+
+## Job Requirements and Limitations
+
+### Requirements
+
+The autoscaler currently only works with [Flink 
1.17](https://hub.docker.com/_/flink) and later flink images, or after 
backporting the following fixes to your 1.15/1.16 Flink images:
+
+ - Job vertex parallelism overrides (must have)
+   - [Add option to override job vertex parallelisms during job 
submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9)
+   - [Change ForwardPartitioner to RebalancePartitioner on parallelism 
changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits)
+   - [Fix logic for determining downstream subtasks for partitioner 
replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3)
+ - [Support timespan for busyTime 
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
 (good to have)
+
+### Limitations
+
+By default the autoscaler can work for all job vertices in the processing 
graph.
+
+However source scaling requires that the sources:
+
+   - Use the new [Source 
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 that exposes the busy time metric (must have, most common connectors already 
do)
+   - Expose the [standardized connector 
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)
 for accessing backlog information (good to have, extra capacity will be added 
for catching up with backlog)
+
+In the current state the autoscaler works best with Kafka sources, as they 
expose all the standardized metrics. It also comes with some additional 
benefits when using Kafka such as automatically detecting and limiting source 
max parallelism to the number of Kafka partitions.
+
+
 ## Configuration guide
 
 Depending on your environment and job characteristics there are a few very 
important configurations that will affect how well the autoscaler works.
@@ -67,6 +119,13 @@ Key configuration areas:
 
 The defaults might work reasonably well for many applications, but some tuning 
may be required in this early stage of the autoscaler module.
 
+{{< hint info >}}
+The autoscaler also supports a passive/metrics-only mode where it only 
collects and evaluates scaling related performance metrics but does not trigger 
any job upgrades.
+This can be used to gain confidence in the module without any impact on the 
running applications.
+
+To disable scaling actions, set: 
`kubernetes.operator.job.autoscaler.scaling.enabled: "false"`
+{{< /hint >}}
+
 ### Job and per operator max parallelism
 
 When computing the scaled parallelism, the autoscaler always considers the max 
parallelism settings for each job vertex to ensure that it doesn't introduce 
unnecessary data skew.
@@ -109,7 +168,7 @@ The amount of extra capacity is determined automatically by 
the following 2 conf
 In the future the autoscaler may be able to automatically determine the 
restart time, but the target catch-up duration depends on the users SLO.
 
 By lowering the catch-up duration the autoscaler will have to reserve more 
extra capacity for the scaling actions.
-We suggest setting this based on your actual objective, such us 1, 5, 10 
minutes etc.
+We suggest setting this based on your actual objective, such us 10,30,60 
minutes etc.
 
 ### Basic configuration example
 ```yaml
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index a3e7b069..95af5541 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -16,7 +16,7 @@
         </tr>
         <tr>
             
<td><h5>kubernetes.operator.job.autoscaler.catch-up.duration</h5></td>
-            <td style="word-wrap: break-word;">5 min</td>
+            <td style="word-wrap: break-word;">15 min</td>
             <td>Duration</td>
             <td>The target duration for fully processing any backlog after a 
scaling operation. Set to 0 to disable backlog based scaling.</td>
         </tr>
diff --git a/docs/static/img/custom-resource/autoscaler_fig1.png 
b/docs/static/img/custom-resource/autoscaler_fig1.png
new file mode 100644
index 00000000..5cb37222
Binary files /dev/null and 
b/docs/static/img/custom-resource/autoscaler_fig1.png differ
diff --git a/docs/static/img/custom-resource/autoscaler_fig2.png 
b/docs/static/img/custom-resource/autoscaler_fig2.png
new file mode 100644
index 00000000..9047e96c
Binary files /dev/null and 
b/docs/static/img/custom-resource/autoscaler_fig2.png differ
diff --git a/docs/static/img/custom-resource/autoscaler_fig3.png 
b/docs/static/img/custom-resource/autoscaler_fig3.png
new file mode 100644
index 00000000..739c84f0
Binary files /dev/null and 
b/docs/static/img/custom-resource/autoscaler_fig3.png differ
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index b31a9bec..df11e28a 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -109,7 +109,7 @@ public class AutoScalerOptions {
     public static final ConfigOption<Duration> CATCH_UP_DURATION =
             autoScalerConfig("catch-up.duration")
                     .durationType()
-                    .defaultValue(Duration.ofMinutes(5))
+                    .defaultValue(Duration.ofMinutes(15))
                     .withDescription(
                             "The target duration for fully processing any 
backlog after a scaling operation. Set to 0 to disable backlog based scaling.");
 

Reply via email to