[ 
https://issues.apache.org/jira/browse/FLINK-38538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Varun updated FLINK-38538:
--------------------------
    Description: 
*Components:* Autoscaler, Flink Kubernetes Operator

*Affects Version:*
 * Flink: 1.19.
 * Flink Kubernetes Operator: 1.13.1

*Environment:*
 * Flink job launched as a FlinkDeployment on an on-premise Kubernetes cluster 
managed by Rancher, in application mode.


Typical deployment config (excerpt):

 
yaml
{{job.autoscaler.enabled: "true"}}
{code:java}
jobmanager.scheduler: adaptive
job.autoscaler.stabilization.interval: "1m"
job.autoscaler.metrics.window: "2m"
job.autoscaler.utilization.target: "0.4"
job.autoscaler.utilization.max: "0.6"
job.autoscaler.utilization.min: "0.2"
job.autoscaler.vertex.min-parallelism: "10"
job.autoscaler.vertex.max-parallelism: "200"
job.autoscaler.scale-up.grace-period: "1m"
job.autoscaler.metrics.busy-time.aggregator: "MAX"{code}
*Problem Statement:*
In the current autoscaler implementation, scaling decisions are made using both 
operator busy time ({*}busyTimeMsPerSecond{*}) and the *output ratio* (i.e., 
the ratio of output records to input records per edge), combined as part of the 
recursive target data rate calculation.

However, we observe cases where an operator/sub-job remains *100% busy* across 
all subtasks, yet is aggressively scaled _down,_ sometimes to bare minimum 
parallelism, purely because the autoscaler's recursively computed output ratio 
(or downstream demand) is low. This reflects scenarios with heavy filtering, 
aggregations, or temporarily slow/blocked sinks.

There is {*}currently no way to configure the Flink autoscaler to prioritize, 
or exclusively use, the busy metric for scaling decisions{*}, even if this 
would be more appropriate for certain classes of workloads.
----
*Behavior Observed (with Log Snapshots):*
We provide a multi-stage pipeline where one "hot" vertex is highly loaded:
 * *Vertex scaling metrics* (all subtasks busy):
 
{{}}
{code:java}
Vertex scaling metrics for 20b73495bccfee0c65322e5852f3f496: 
{ACCUMULATED_BUSY_TIME=4021203.0, NUM_RECORDS_IN=47229.0, LOAD=1.0, 
NUM_RECORDS_OUT=7834.0}{code}

 * *Autoscaler parallelism overrides:* (scaling up due to busy metric)

{code:java}
{{[DEBUG][flink/example-flink-pipeline] Applying parallelism overrides: { ..., 
20b73495bccfee0c65322e5852f3f496=44, ... }}}{code}
 * *Output ratio computed near zero:*

{code:java}
Computed output ratio for edge (f134ca61df556898f135a2c691a13fc5 -> 
20b73495bccfee0c65322e5852f3f496): 0.0{code}
 * *Target processing capacity forcibly goes to zero, so scaling down is 
triggered:*

{code:java}
{{Vertex 20b73495bccfee0c65322e5852f3f496 processing rate 404.562 is outside 
(0.0, 0.0)
[DEBUG] Applying parallelism overrides: {..., 
20b73495bccfee0c65322e5852f3f496=18, ...}
[DEBUG] Applying parallelism overrides: {..., 
20b73495bccfee0c65322e5852f3f496=10, ...}}}{code}
 * *Even at 10 parallelism, the stage remains 100% busy:*

{code:java}
{{[{ "id": }}
{{"busyTimeMsPerSecond", }}
{{"min": 1000, }}
{{"max": 1000, }}
{{"avg": 1000, }}
"sum": 10000 }]{code}
----
*Interpretation:*
Even though the operator is maxed out, the autoscaler determines required 
target rate = 0 due to output ratio/backpropagated demand, scaling down 
parallelism to minimum and leaving vertices overloaded.

*Request:*
Can we have a configuration or scaling policy/mode to the Flink autoscaler that 
allows scaling (up or down) {*}primarily or solely on busy metric{*}.
 * Optionally, allow users to disable or strongly de-emphasize the use of 
output ratio and strictly propagate scaling based on busyTime for selected 
vertices or globally.

  was:
*Components:* Autoscaler, Flink Kubernetes Operator



*Affects Version:*
 * Flink: 1.19.
 * Flink Kubernetes Operator: 1.13.1

Deployed via FlinkDeployment CR on an on-premises Rancher-managed Flink 
Kubernetes operator



*Environment:*
 * Flink job launched as a FlinkDeployment on an on-premise Kubernetes cluster 
managed by Rancher, in application mode.

Typical deployment config (excerpt):

 
yaml
{{job.autoscaler.enabled: "true"jobmanager.scheduler: 
adaptivejob.autoscaler.stabilization.interval: 
"1m"job.autoscaler.metrics.window: "2m"job.autoscaler.utilization.target: 
"0.4"job.autoscaler.utilization.max: "0.6"job.autoscaler.utilization.min: 
"0.2"}}
{{job.autoscaler.vertex.min-parallelism: 
"10"job.autoscaler.vertex.max-parallelism: "200"}}
{{job.autoscaler.scale-up.grace-period: 
"1m"job.autoscaler.metrics.busy-time.aggregator: "MAX"}}



*Problem Statement:*
In the current autoscaler implementation, scaling decisions are made using both 
operator busy time ({*}busyTimeMsPerSecond{*}) and the *output ratio* (i.e., 
the ratio of output records to input records per edge), combined as part of the 
recursive target data rate calculation.

However, we observe cases where an operator/sub-job remains *100% busy* across 
all subtasks, yet is aggressively scaled _down,_ sometimes to bare minimum 
parallelism, purely because the autoscaler's recursively computed output ratio 
(or downstream demand) is low. This reflects scenarios with heavy filtering, 
aggregations, or temporarily slow/blocked sinks.

There is {*}currently no way to configure the Flink autoscaler to prioritize, 
or exclusively use, the busy metric for scaling decisions{*}, even if this 
would be more appropriate for certain classes of workloads.
----
*Behavior Observed (with Log Snapshots):*
We provide a multi-stage pipeline where one "hot" vertex is highly loaded:
 * *Vertex scaling metrics* (all subtasks busy):
 
{{Vertex scaling metrics for 20b73495bccfee0c65322e5852f3f496: 
\{ACCUMULATED_BUSY_TIME=4021203.0, NUM_RECORDS_IN=47229.0, LOAD=1.0, 
NUM_RECORDS_OUT=7834.0}}}
 * *Autoscaler parallelism overrides:* (scaling up due to busy metric)


{{[DEBUG][flink/example-flink-pipeline] Applying parallelism overrides: \{ ..., 
20b73495bccfee0c65322e5852f3f496=44, ... }}}
 * *Output ratio computed near zero:*


{{Computed output ratio for edge (f134ca61df556898f135a2c691a13fc5 -> 
20b73495bccfee0c65322e5852f3f496): 0.0}}
 * *Target processing capacity forcibly goes to zero, so scaling down is 
triggered:*


{{Vertex 20b73495bccfee0c65322e5852f3f496 processing rate 404.562 is outside 
(0.0, 0.0)
[DEBUG] Applying parallelism overrides: \{..., 
20b73495bccfee0c65322e5852f3f496=18, ...}
[DEBUG] Applying parallelism overrides: \{..., 
20b73495bccfee0c65322e5852f3f496=10, ...}}}
 * *Even at 10 parallelism, the stage remains 100% busy:*


{{[{ "id": }}
{{"busyTimeMsPerSecond", }}
{{"min": 1000, }}
{{"max": 1000, }}
{{"avg": 1000, }}
{{"sum": 10000 }]}}

----
*Interpretation:*
Even though the operator is maxed out, the autoscaler determines required 
target rate = 0 due to output ratio/backpropagated demand, scaling down 
parallelism to minimum and leaving vertices overloaded.



*Request:*
Can we have a configuration or scaling policy/mode to the Flink autoscaler that 
allows scaling (up or down) {*}primarily or solely on busy metric{*}.
 * Optionally, allow users to disable or strongly de-emphasize the use of 
output ratio and strictly propagate scaling based on busyTime for selected 
vertices or globally.


> Flink Autoscaler: Need Option to Autoscale Based Primarily (or Solely) on 
> Busy Metric, Not Output Ratio
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38538
>                 URL: https://issues.apache.org/jira/browse/FLINK-38538
>             Project: Flink
>          Issue Type: Technical Debt
>          Components: Autoscaler
>    Affects Versions: 1.19.0
>            Reporter: Varun
>            Priority: Major
>
> *Components:* Autoscaler, Flink Kubernetes Operator
> *Affects Version:*
>  * Flink: 1.19.
>  * Flink Kubernetes Operator: 1.13.1
> *Environment:*
>  * Flink job launched as a FlinkDeployment on an on-premise Kubernetes 
> cluster managed by Rancher, in application mode.
> Typical deployment config (excerpt):
>  
> yaml
> {{job.autoscaler.enabled: "true"}}
> {code:java}
> jobmanager.scheduler: adaptive
> job.autoscaler.stabilization.interval: "1m"
> job.autoscaler.metrics.window: "2m"
> job.autoscaler.utilization.target: "0.4"
> job.autoscaler.utilization.max: "0.6"
> job.autoscaler.utilization.min: "0.2"
> job.autoscaler.vertex.min-parallelism: "10"
> job.autoscaler.vertex.max-parallelism: "200"
> job.autoscaler.scale-up.grace-period: "1m"
> job.autoscaler.metrics.busy-time.aggregator: "MAX"{code}
> *Problem Statement:*
> In the current autoscaler implementation, scaling decisions are made using 
> both operator busy time ({*}busyTimeMsPerSecond{*}) and the *output ratio* 
> (i.e., the ratio of output records to input records per edge), combined as 
> part of the recursive target data rate calculation.
> However, we observe cases where an operator/sub-job remains *100% busy* 
> across all subtasks, yet is aggressively scaled _down,_ sometimes to bare 
> minimum parallelism, purely because the autoscaler's recursively computed 
> output ratio (or downstream demand) is low. This reflects scenarios with 
> heavy filtering, aggregations, or temporarily slow/blocked sinks.
> There is {*}currently no way to configure the Flink autoscaler to prioritize, 
> or exclusively use, the busy metric for scaling decisions{*}, even if this 
> would be more appropriate for certain classes of workloads.
> ----
> *Behavior Observed (with Log Snapshots):*
> We provide a multi-stage pipeline where one "hot" vertex is highly loaded:
>  * *Vertex scaling metrics* (all subtasks busy):
>  
> {{}}
> {code:java}
> Vertex scaling metrics for 20b73495bccfee0c65322e5852f3f496: 
> {ACCUMULATED_BUSY_TIME=4021203.0, NUM_RECORDS_IN=47229.0, LOAD=1.0, 
> NUM_RECORDS_OUT=7834.0}{code}
>  * *Autoscaler parallelism overrides:* (scaling up due to busy metric)
> {code:java}
> {{[DEBUG][flink/example-flink-pipeline] Applying parallelism overrides: { 
> ..., 20b73495bccfee0c65322e5852f3f496=44, ... }}}{code}
>  * *Output ratio computed near zero:*
> {code:java}
> Computed output ratio for edge (f134ca61df556898f135a2c691a13fc5 -> 
> 20b73495bccfee0c65322e5852f3f496): 0.0{code}
>  * *Target processing capacity forcibly goes to zero, so scaling down is 
> triggered:*
> {code:java}
> {{Vertex 20b73495bccfee0c65322e5852f3f496 processing rate 404.562 is outside 
> (0.0, 0.0)
> [DEBUG] Applying parallelism overrides: {..., 
> 20b73495bccfee0c65322e5852f3f496=18, ...}
> [DEBUG] Applying parallelism overrides: {..., 
> 20b73495bccfee0c65322e5852f3f496=10, ...}}}{code}
>  * *Even at 10 parallelism, the stage remains 100% busy:*
> {code:java}
> {{[{ "id": }}
> {{"busyTimeMsPerSecond", }}
> {{"min": 1000, }}
> {{"max": 1000, }}
> {{"avg": 1000, }}
> "sum": 10000 }]{code}
> ----
> *Interpretation:*
> Even though the operator is maxed out, the autoscaler determines required 
> target rate = 0 due to output ratio/backpropagated demand, scaling down 
> parallelism to minimum and leaving vertices overloaded.
> *Request:*
> Can we have a configuration or scaling policy/mode to the Flink autoscaler 
> that allows scaling (up or down) {*}primarily or solely on busy metric{*}.
>  * Optionally, allow users to disable or strongly de-emphasize the use of 
> output ratio and strictly propagate scaling based on busyTime for selected 
> vertices or globally.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to