[ 
https://issues.apache.org/jira/browse/FLINK-39306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-39306:
-------------------------------
    Fix Version/s:     (was: kubernetes-operator-1.15.0)

> Non-source vertices do not use per-second rate metrics, producing inaccurate 
> scaling decisions
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39306
>                 URL: https://issues.apache.org/jira/browse/FLINK-39306
>             Project: Flink
>          Issue Type: Bug
>          Components: Autoscaler, Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.14.0
>            Reporter: Dennis-Mircea Ciupitu
>            Priority: Major
>              Labels: autoscaling, operator, pull-request-available
>
> h1. 1. Summary
> The autoscaler's metric evaluation pipeline does not use Flink's native 
> *per-second rate* metrics (*numRecordsInPerSecond*, *numRecordsOutPerSecond*) 
> for non-source vertices, even though equivalent source-specific variants 
> (*Source__*.numRecordsInPerSecond*) are already used for sources. Instead, 
> non-source vertices rely exclusively on accumulated counters (*numRecordsIn*, 
> *numRecordsOut*) and compute rates via endpoint-based interpolation 
> (*getRate*), which only considers the first and last values in the metric 
> history window. This produces inaccurate rate estimates (particularly in the 
> presence of spikes or uneven throughput) and propagates inconsistencies 
> through the entire scaling decision chain.
> The same endpoint-based interpolation problem also affects the *LAG* metric 
> for source vertices. *getRate(LAG, ...)* computes *(last_lag - first_lag) / 
> timeDiff*, which dilutes sudden lag spikes across the entire window. For 
> example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag 
> rate because the spike between sample 1 and 2 is spread over the full window 
> duration, rather than being properly weighted in its interval.
> h1. 2. Root cause
> Non-source vertices never collect *numRecordsInPerSecond* / 
> *numRecordsOutPerSecond*: The *ScalingMetricCollector* only requests the 
> source-specific per-second metrics (*SOURCE_TASK_NUM_RECORDS_IN_PER_SEC*) for 
> source vertices. For non-source vertices, only accumulated counters are 
> collected, and rates are computed by *getRate()* (an endpoint-only 
> interpolation: *(last - first) / timeDiff*).
> Endpoint-based *getRate* on *LAG*: *LAG* is a gauge that can jump abruptly 
> (e.g., 0 → 1M) and then plateau. *getRate* only looks at the first and last 
> samples, so a spike that occurred early in the window is diluted across the 
> entire window duration, producing an artificially low rate of change. The 
> intermediate samples, which show the plateau, are completely ignored.
> h1. 3. Impact
> The inaccurate rate estimates cascade through the scaling formula, 
> compounding at each step:
> 1. *TARGET_DATA_RATE underestimated for all vertices*: Since inputRateAvg is 
> computed from accumulated counters via endpoint interpolation (which smooths 
> out spikes and underestimates bursty throughput), and lagRate for sources is 
> diluted by the same endpoint-only approach, the computed *TARGET_DATA_RATE = 
> inputRate + lagRate* is systematically smaller than the true ingestion rate. 
> This also means *SCALE_UP_RATE_THRESHOLD* and *SCALE_DOWN_RATE_THRESHOLD*, 
> which are derived from *TARGET_DATA_RATE*, are too low.
> 2. *TRUE_PROCESSING_RATE overestimated for non-source vertices*: Non-source 
> vertices that cannot rely on OBSERVED_TPR fall back to computing true 
> processing rate from busyTimeAvg. When the underlying input rate is 
> underestimated (due to endpoint-only interpolation), the ratio inputRate / 
> busyTimeFraction produces an inflated TRUE_PROCESSING_RATE.
> 3. *scalingFactor is too small*: The scaling factor is derived from 
> *TARGET_DATA_RATE / TRUE_PROCESSING_RATE*. With the numerator underestimated 
> and the denominator overestimated, the resulting scaling factor is 
> significantly smaller than it should be.
> 4. End-user impact:
>    - *Excessive scaling iterations*: The autoscaler cannot reach the correct 
> target parallelism in a single scaling decision. Instead, it takes multiple 
> subsequent scalings to converge, each one undershooting because of the same 
> systematic bias.
>    - *Source-to-downstream parallelism inversion in FORWARD shipping 
> strategy*: Because source vertices use the more accurate per-second metrics 
> while downstream vertices use the less accurate accumulated counters, the 
> source can end up with a higher parallelism than its downstream 
> FORWARD-connected vertex, resulting in a topology violation that can cause 
> runtime failures or suboptimal data distribution.
> h1. 4. Proposed solution
> h2. 4.1. Collect per-second rate metrics for non-source vertices
> Add *FlinkMetric.NUM_RECORDS_IN_PER_SEC* (*numRecordsInPerSecond*) and 
> *FlinkMetric.NUM_RECORDS_OUT_PER_SEC* (*numRecordsOutPerSecond*) to the 
> metric collection pipeline for non-source vertices in 
> *ScalingMetricCollector*. Define corresponding 
> *ScalingMetric.NUM_RECORDS_IN_PER_SECOND* and 
> *ScalingMetric.NUM_RECORDS_OUT_PER_SECOND* entries, and store them in 
> *ScalingMetrics.computeDataRateMetrics*.
> h2. 4.2. Prefer per-second metrics with accumulated-counter fallback
> Introduce *getAverageWithRateFallback(perSecondMetric, accumulatedMetric, 
> ...)* in *ScalingMetricEvaluator*:
> - First tries *getAverage(perSecondMetric)*, which is the average of Flink's 
> native per-second rate gauge across the metric window. This uses all samples, 
> not just endpoints.
> - Falls back to *getRate(accumulatedMetric)* if the per-second metric is 
> unavailable (NaN or Infinite) preserving backward compatibility with older 
> Flink versions or custom sources that don't expose per-second metrics.
> h2. 4.3. Use average rate of change for LAG
> Replace *getRate(LAG, ...)* with *getAverageRate(LAG, ...)* which computes 
> the average of per-interval deltas across the full metric window, rather than 
> endpoint-only interpolation. This correctly weights sudden lag spikes in 
> their respective intervals instead of diluting them across the entire window.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to