[
https://issues.apache.org/jira/browse/FLINK-38538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Varun updated FLINK-38538:
--------------------------
Description:
*Components:* Autoscaler, Flink Kubernetes Operator
*Affects Version:*
* Flink: 1.19.2
* Flink Kubernetes Operator: 1.13.0
*Environment:*
* Flink job launched as a FlinkDeployment on an on-premise Kubernetes cluster
managed by Rancher, in application mode.
Typical deployment config (excerpt):
yaml
{{job.autoscaler.enabled: "true"}}
{code:java}
jobmanager.scheduler: adaptive
job.autoscaler.stabilization.interval: "1m"
job.autoscaler.metrics.window: "2m"
job.autoscaler.utilization.target: "0.4"
job.autoscaler.utilization.max: "0.6"
job.autoscaler.utilization.min: "0.2"
job.autoscaler.vertex.min-parallelism: "10"
job.autoscaler.vertex.max-parallelism: "200"
job.autoscaler.scale-up.grace-period: "1m"
job.autoscaler.metrics.busy-time.aggregator: "MAX"{code}
*Problem Statement:*
In the current autoscaler implementation, scaling decisions are made using both
operator busy time ({*}busyTimeMsPerSecond{*}) and the *output ratio* (i.e.,
the ratio of output records to input records per edge), combined as part of the
recursive target data rate calculation.
However, we observe cases where an operator/sub-job remains *100% busy* across
all subtasks, yet is aggressively scaled _down,_ sometimes to bare minimum
parallelism, purely because the autoscaler's recursively computed output ratio
(or downstream demand) is low. This reflects scenarios with heavy filtering,
aggregations, or temporarily slow/blocked sinks.
There is {*}currently no way to configure the Flink autoscaler to prioritize,
or exclusively use, the busy metric for scaling decisions{*}, even if this
would be more appropriate for certain classes of workloads.
----
*Behavior Observed (with Log Snapshots):*
We provide a multi-stage pipeline where one "hot" vertex is highly loaded:
* *Vertex scaling metrics* (all subtasks busy):
{{}}
{code:java}
Vertex scaling metrics for 20b73495bccfee0c65322e5852f3f496:
{ACCUMULATED_BUSY_TIME=4021203.0, NUM_RECORDS_IN=47229.0, LOAD=1.0,
NUM_RECORDS_OUT=7834.0}{code}
* *Autoscaler parallelism overrides:* (scaling up due to busy metric)
{code:java}
{{[DEBUG][flink/example-flink-pipeline] Applying parallelism overrides: { ...,
20b73495bccfee0c65322e5852f3f496=44, ... }}}{code}
* *Output ratio computed near zero:*
{code:java}
Computed output ratio for edge (f134ca61df556898f135a2c691a13fc5 ->
20b73495bccfee0c65322e5852f3f496): 0.0{code}
* *Target processing capacity forcibly goes to zero, so scaling down is
triggered:*
{code:java}
{{Vertex 20b73495bccfee0c65322e5852f3f496 processing rate 404.562 is outside
(0.0, 0.0)
[DEBUG] Applying parallelism overrides: {...,
20b73495bccfee0c65322e5852f3f496=18, ...}
[DEBUG] Applying parallelism overrides: {...,
20b73495bccfee0c65322e5852f3f496=10, ...}}}{code}
* *Even at 10 parallelism, the stage remains 100% busy:*
{code:java}
{{[{ "id": }}
{{"busyTimeMsPerSecond", }}
{{"min": 1000, }}
{{"max": 1000, }}
{{"avg": 1000, }}
"sum": 10000 }]{code}
----
*Interpretation:*
Even though the operator is maxed out, the autoscaler determines required
target rate = 0 due to output ratio/backpropagated demand, scaling down
parallelism to minimum and leaving vertices overloaded.
*Request:*
Can we have a configuration or scaling policy/mode to the Flink autoscaler that
allows scaling (up or down) {*}primarily or solely on busy metric{*}.
* Optionally, allow users to disable or strongly de-emphasize the use of
output ratio and strictly propagate scaling based on busyTime for selected
vertices or globally.
was:
*Components:* Autoscaler, Flink Kubernetes Operator
*Affects Version:*
* Flink: 1.19.
* Flink Kubernetes Operator: 1.13.1
*Environment:*
* Flink job launched as a FlinkDeployment on an on-premise Kubernetes cluster
managed by Rancher, in application mode.
Typical deployment config (excerpt):
yaml
{{job.autoscaler.enabled: "true"}}
{code:java}
jobmanager.scheduler: adaptive
job.autoscaler.stabilization.interval: "1m"
job.autoscaler.metrics.window: "2m"
job.autoscaler.utilization.target: "0.4"
job.autoscaler.utilization.max: "0.6"
job.autoscaler.utilization.min: "0.2"
job.autoscaler.vertex.min-parallelism: "10"
job.autoscaler.vertex.max-parallelism: "200"
job.autoscaler.scale-up.grace-period: "1m"
job.autoscaler.metrics.busy-time.aggregator: "MAX"{code}
*Problem Statement:*
In the current autoscaler implementation, scaling decisions are made using both
operator busy time ({*}busyTimeMsPerSecond{*}) and the *output ratio* (i.e.,
the ratio of output records to input records per edge), combined as part of the
recursive target data rate calculation.
However, we observe cases where an operator/sub-job remains *100% busy* across
all subtasks, yet is aggressively scaled _down,_ sometimes to bare minimum
parallelism, purely because the autoscaler's recursively computed output ratio
(or downstream demand) is low. This reflects scenarios with heavy filtering,
aggregations, or temporarily slow/blocked sinks.
There is {*}currently no way to configure the Flink autoscaler to prioritize,
or exclusively use, the busy metric for scaling decisions{*}, even if this
would be more appropriate for certain classes of workloads.
----
*Behavior Observed (with Log Snapshots):*
We provide a multi-stage pipeline where one "hot" vertex is highly loaded:
* *Vertex scaling metrics* (all subtasks busy):
{{}}
{code:java}
Vertex scaling metrics for 20b73495bccfee0c65322e5852f3f496:
{ACCUMULATED_BUSY_TIME=4021203.0, NUM_RECORDS_IN=47229.0, LOAD=1.0,
NUM_RECORDS_OUT=7834.0}{code}
* *Autoscaler parallelism overrides:* (scaling up due to busy metric)
{code:java}
{{[DEBUG][flink/example-flink-pipeline] Applying parallelism overrides: { ...,
20b73495bccfee0c65322e5852f3f496=44, ... }}}{code}
* *Output ratio computed near zero:*
{code:java}
Computed output ratio for edge (f134ca61df556898f135a2c691a13fc5 ->
20b73495bccfee0c65322e5852f3f496): 0.0{code}
* *Target processing capacity forcibly goes to zero, so scaling down is
triggered:*
{code:java}
{{Vertex 20b73495bccfee0c65322e5852f3f496 processing rate 404.562 is outside
(0.0, 0.0)
[DEBUG] Applying parallelism overrides: {...,
20b73495bccfee0c65322e5852f3f496=18, ...}
[DEBUG] Applying parallelism overrides: {...,
20b73495bccfee0c65322e5852f3f496=10, ...}}}{code}
* *Even at 10 parallelism, the stage remains 100% busy:*
{code:java}
{{[{ "id": }}
{{"busyTimeMsPerSecond", }}
{{"min": 1000, }}
{{"max": 1000, }}
{{"avg": 1000, }}
"sum": 10000 }]{code}
----
*Interpretation:*
Even though the operator is maxed out, the autoscaler determines required
target rate = 0 due to output ratio/backpropagated demand, scaling down
parallelism to minimum and leaving vertices overloaded.
*Request:*
Can we have a configuration or scaling policy/mode to the Flink autoscaler that
allows scaling (up or down) {*}primarily or solely on busy metric{*}.
* Optionally, allow users to disable or strongly de-emphasize the use of
output ratio and strictly propagate scaling based on busyTime for selected
vertices or globally.
> Flink Autoscaler: Need Option to Autoscale Based Primarily (or Solely) on
> Busy Metric, Not Output Ratio
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38538
> URL: https://issues.apache.org/jira/browse/FLINK-38538
> Project: Flink
> Issue Type: Technical Debt
> Components: Autoscaler
> Affects Versions: 1.19.0
> Reporter: Varun
> Priority: Minor
>
> *Components:* Autoscaler, Flink Kubernetes Operator
> *Affects Version:*
> * Flink: 1.19.2
> * Flink Kubernetes Operator: 1.13.0
> *Environment:*
> * Flink job launched as a FlinkDeployment on an on-premise Kubernetes
> cluster managed by Rancher, in application mode.
> Typical deployment config (excerpt):
>
> yaml
> {{job.autoscaler.enabled: "true"}}
> {code:java}
> jobmanager.scheduler: adaptive
> job.autoscaler.stabilization.interval: "1m"
> job.autoscaler.metrics.window: "2m"
> job.autoscaler.utilization.target: "0.4"
> job.autoscaler.utilization.max: "0.6"
> job.autoscaler.utilization.min: "0.2"
> job.autoscaler.vertex.min-parallelism: "10"
> job.autoscaler.vertex.max-parallelism: "200"
> job.autoscaler.scale-up.grace-period: "1m"
> job.autoscaler.metrics.busy-time.aggregator: "MAX"{code}
> *Problem Statement:*
> In the current autoscaler implementation, scaling decisions are made using
> both operator busy time ({*}busyTimeMsPerSecond{*}) and the *output ratio*
> (i.e., the ratio of output records to input records per edge), combined as
> part of the recursive target data rate calculation.
> However, we observe cases where an operator/sub-job remains *100% busy*
> across all subtasks, yet is aggressively scaled _down,_ sometimes to bare
> minimum parallelism, purely because the autoscaler's recursively computed
> output ratio (or downstream demand) is low. This reflects scenarios with
> heavy filtering, aggregations, or temporarily slow/blocked sinks.
> There is {*}currently no way to configure the Flink autoscaler to prioritize,
> or exclusively use, the busy metric for scaling decisions{*}, even if this
> would be more appropriate for certain classes of workloads.
> ----
> *Behavior Observed (with Log Snapshots):*
> We provide a multi-stage pipeline where one "hot" vertex is highly loaded:
> * *Vertex scaling metrics* (all subtasks busy):
>
> {{}}
> {code:java}
> Vertex scaling metrics for 20b73495bccfee0c65322e5852f3f496:
> {ACCUMULATED_BUSY_TIME=4021203.0, NUM_RECORDS_IN=47229.0, LOAD=1.0,
> NUM_RECORDS_OUT=7834.0}{code}
> * *Autoscaler parallelism overrides:* (scaling up due to busy metric)
> {code:java}
> {{[DEBUG][flink/example-flink-pipeline] Applying parallelism overrides: {
> ..., 20b73495bccfee0c65322e5852f3f496=44, ... }}}{code}
> * *Output ratio computed near zero:*
> {code:java}
> Computed output ratio for edge (f134ca61df556898f135a2c691a13fc5 ->
> 20b73495bccfee0c65322e5852f3f496): 0.0{code}
> * *Target processing capacity forcibly goes to zero, so scaling down is
> triggered:*
> {code:java}
> {{Vertex 20b73495bccfee0c65322e5852f3f496 processing rate 404.562 is outside
> (0.0, 0.0)
> [DEBUG] Applying parallelism overrides: {...,
> 20b73495bccfee0c65322e5852f3f496=18, ...}
> [DEBUG] Applying parallelism overrides: {...,
> 20b73495bccfee0c65322e5852f3f496=10, ...}}}{code}
> * *Even at 10 parallelism, the stage remains 100% busy:*
> {code:java}
> {{[{ "id": }}
> {{"busyTimeMsPerSecond", }}
> {{"min": 1000, }}
> {{"max": 1000, }}
> {{"avg": 1000, }}
> "sum": 10000 }]{code}
> ----
> *Interpretation:*
> Even though the operator is maxed out, the autoscaler determines required
> target rate = 0 due to output ratio/backpropagated demand, scaling down
> parallelism to minimum and leaving vertices overloaded.
> *Request:*
> Can we have a configuration or scaling policy/mode to the Flink autoscaler
> that allows scaling (up or down) {*}primarily or solely on busy metric{*}.
> * Optionally, allow users to disable or strongly de-emphasize the use of
> output ratio and strictly propagate scaling based on busyTime for selected
> vertices or globally.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)