[
https://issues.apache.org/jira/browse/FLINK-39306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gyula Fora updated FLINK-39306:
-------------------------------
Fix Version/s: (was: kubernetes-operator-1.15.0)
> Non-source vertices do not use per-second rate metrics, producing inaccurate
> scaling decisions
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-39306
> URL: https://issues.apache.org/jira/browse/FLINK-39306
> Project: Flink
> Issue Type: Bug
> Components: Autoscaler, Kubernetes Operator
> Affects Versions: kubernetes-operator-1.14.0
> Reporter: Dennis-Mircea Ciupitu
> Priority: Major
> Labels: autoscaling, operator, pull-request-available
>
> h1. 1. Summary
> The autoscaler's metric evaluation pipeline does not use Flink's native
> *per-second rate* metrics (*numRecordsInPerSecond*, *numRecordsOutPerSecond*)
> for non-source vertices, even though equivalent source-specific variants
> (*Source__*.numRecordsInPerSecond*) are already used for sources. Instead,
> non-source vertices rely exclusively on accumulated counters (*numRecordsIn*,
> *numRecordsOut*) and compute rates via endpoint-based interpolation
> (*getRate*), which only considers the first and last values in the metric
> history window. This produces inaccurate rate estimates (particularly in the
> presence of spikes or uneven throughput) and propagates inconsistencies
> through the entire scaling decision chain.
> The same endpoint-based interpolation problem also affects the *LAG* metric
> for source vertices. *getRate(LAG, ...)* computes *(last_lag - first_lag) /
> timeDiff*, which dilutes sudden lag spikes across the entire window. For
> example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag
> rate because the spike between sample 1 and 2 is spread over the full window
> duration, rather than being properly weighted in its interval.
> h1. 2. Root cause
> Non-source vertices never collect *numRecordsInPerSecond* /
> *numRecordsOutPerSecond*: The *ScalingMetricCollector* only requests the
> source-specific per-second metrics (*SOURCE_TASK_NUM_RECORDS_IN_PER_SEC*) for
> source vertices. For non-source vertices, only accumulated counters are
> collected, and rates are computed by *getRate()* (an endpoint-only
> interpolation: *(last - first) / timeDiff*).
> Endpoint-based *getRate* on *LAG*: *LAG* is a gauge that can jump abruptly
> (e.g., 0 → 1M) and then plateau. *getRate* only looks at the first and last
> samples, so a spike that occurred early in the window is diluted across the
> entire window duration, producing an artificially low rate of change. The
> intermediate samples, which show the plateau, are completely ignored.
> h1. 3. Impact
> The inaccurate rate estimates cascade through the scaling formula,
> compounding at each step:
> 1. *TARGET_DATA_RATE underestimated for all vertices*: Since inputRateAvg is
> computed from accumulated counters via endpoint interpolation (which smooths
> out spikes and underestimates bursty throughput), and lagRate for sources is
> diluted by the same endpoint-only approach, the computed *TARGET_DATA_RATE =
> inputRate + lagRate* is systematically smaller than the true ingestion rate.
> This also means *SCALE_UP_RATE_THRESHOLD* and *SCALE_DOWN_RATE_THRESHOLD*,
> which are derived from *TARGET_DATA_RATE*, are too low.
> 2. *TRUE_PROCESSING_RATE overestimated for non-source vertices*: Non-source
> vertices that cannot rely on OBSERVED_TPR fall back to computing true
> processing rate from busyTimeAvg. When the underlying input rate is
> underestimated (due to endpoint-only interpolation), the ratio inputRate /
> busyTimeFraction produces an inflated TRUE_PROCESSING_RATE.
> 3. *scalingFactor is too small*: The scaling factor is derived from
> *TARGET_DATA_RATE / TRUE_PROCESSING_RATE*. With the numerator underestimated
> and the denominator overestimated, the resulting scaling factor is
> significantly smaller than it should be.
> 4. End-user impact:
> - *Excessive scaling iterations*: The autoscaler cannot reach the correct
> target parallelism in a single scaling decision. Instead, it takes multiple
> subsequent scalings to converge, each one undershooting because of the same
> systematic bias.
> - *Source-to-downstream parallelism inversion in FORWARD shipping
> strategy*: Because source vertices use the more accurate per-second metrics
> while downstream vertices use the less accurate accumulated counters, the
> source can end up with a higher parallelism than its downstream
> FORWARD-connected vertex, resulting in a topology violation that can cause
> runtime failures or suboptimal data distribution.
> h1. 4. Proposed solution
> h2. 4.1. Collect per-second rate metrics for non-source vertices
> Add *FlinkMetric.NUM_RECORDS_IN_PER_SEC* (*numRecordsInPerSecond*) and
> *FlinkMetric.NUM_RECORDS_OUT_PER_SEC* (*numRecordsOutPerSecond*) to the
> metric collection pipeline for non-source vertices in
> *ScalingMetricCollector*. Define corresponding
> *ScalingMetric.NUM_RECORDS_IN_PER_SECOND* and
> *ScalingMetric.NUM_RECORDS_OUT_PER_SECOND* entries, and store them in
> *ScalingMetrics.computeDataRateMetrics*.
> h2. 4.2. Prefer per-second metrics with accumulated-counter fallback
> Introduce *getAverageWithRateFallback(perSecondMetric, accumulatedMetric,
> ...)* in *ScalingMetricEvaluator*:
> - First tries *getAverage(perSecondMetric)*, which is the average of Flink's
> native per-second rate gauge across the metric window. This uses all samples,
> not just endpoints.
> - Falls back to *getRate(accumulatedMetric)* if the per-second metric is
> unavailable (NaN or Infinite) preserving backward compatibility with older
> Flink versions or custom sources that don't expose per-second metrics.
> h2. 4.3. Use average rate of change for LAG
> Replace *getRate(LAG, ...)* with *getAverageRate(LAG, ...)* which computes
> the average of per-interval deltas across the full metric window, rather than
> endpoint-only interpolation. This correctly weights sudden lag spikes in
> their respective intervals instead of diluting them across the entire window.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)