[
https://issues.apache.org/jira/browse/FLINK-39306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dennis-Mircea Ciupitu updated FLINK-39306:
------------------------------------------
Description:
h1. 1. Summary
The autoscaler's metric evaluation pipeline does not use Flink's native
per-second rate metrics ({{{}numRecordsInPerSecond{}}},
{{{}numRecordsOutPerSecond{}}}) for non-source vertices, even though equivalent
source-specific variants ({{{}Source__.numRecordsInPerSecond*{}}}) are already
used for sources. Instead, non-source vertices rely exclusively on accumulated
counters ({{{}numRecordsIn{}}}, {{{}numRecordsOut{}}}) and compute rates via
endpoint-based interpolation ({{{}getRate{}}}), which only considers the first
and last values in the metric history window. This produces inaccurate rate
estimates (particularly in the presence of spikes or uneven throughput) and
propagates inconsistencies through the entire scaling decision chain.
The same endpoint-based interpolation problem also affects the {{LAG}} metric
for source vertices. {{getRate(LAG, ...)}} computes {{{}(last_lag - first_lag)
/ timeDiff{}}}, which dilutes sudden lag spikes across the entire window. For
example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag
rate because the spike between sample 1 and 2 is spread over the full window
duration, rather than being properly weighted in its interval.
h1. 2. Root cause
Non-source vertices never collect {{numRecordsInPerSecond}} /
{{{}numRecordsOutPerSecond{}}}: The {{ScalingMetricCollector}} only requests
the source-specific per-second metrics
({{{}SOURCE_TASK_NUM_RECORDS_IN_PER_SEC{}}}) for source vertices. For
non-source vertices, only accumulated counters are collected, and rates are
computed by {{getRate()}} (an endpoint-only interpolation: {{{}(last - first) /
timeDiff{}}}).
Endpoint-based {{getRate}} on {{{}LAG{}}}: {{LAG}} is a gauge that can jump
abruptly (e.g., 0 → 1M) and then plateau. {{getRate}} only looks at the first
and last samples, so a spike that occurred early in the window is diluted
across the entire window duration, producing an artificially low rate of
change. The intermediate samples, which show the plateau, are completely
ignored.
h1. 3. Impact
The inaccurate rate estimates cascade through the scaling formula, compounding
at each step:
1. {*}{{TARGET_DATA_RATE}} underestimated for all vertices{*}: Since
{{inputRateAvg}} is computed from accumulated counters via endpoint
interpolation (which smooths out spikes and underestimates bursty throughput),
and {{lagRate}} for sources is diluted by the same endpoint-only approach, the
computed {{TARGET_DATA_RATE = inputRate + lagRate}} is systematically smaller
than the true ingestion rate. This also means {{SCALE_UP_RATE_THRESHOLD}} and
{{{}SCALE_DOWN_RATE_THRESHOLD{}}}, which are derived from
{{{}TARGET_DATA_RATE{}}}, are too low.
2. {*}{{TRUE_PROCESSING_RATE}} overestimated for non-source vertices{*}:
Non-source vertices that cannot rely on {{OBSERVED_TPR}} fallback to computing
true processing rate from {{{}busyTimeAvg{}}}. When the underlying input rate
is underestimated (due to endpoint-only interpolation), the ratio {{inputRate}}
/ {{busyTimeFraction}} produces an inflated {{{}TRUE_PROCESSING_RATE{}}}.
3. {*}scalingFactor is too small{*}: The scaling factor is derived from
{{TARGET_DATA_RATE}} / {{{}TRUE_PROCESSING_RATE{}}}. With the numerator
underestimated and the denominator overestimated, the resulting scaling factor
is significantly smaller than it should be.
4. End-user impact:
- {*}Excessive scaling iterations{*}: The autoscaler cannot reach the correct
target parallelism in a single scaling decision. Instead, it takes multiple
subsequent scalings to converge, each one undershooting because of the same
systematic bias.
- {*}Source-to-downstream parallelism inversion in FORWARD shipping
strategy{*}: Because source vertices use the more accurate per-second metrics
while downstream vertices use the less accurate accumulated counters, the
source can end up with a higher parallelism than its downstream
FORWARD-connected vertex, resulting in a topology violation that can cause
runtime failures or suboptimal data distribution.
h1. 4. Proposed solution
h2. 4.1. Collect per-second rate metrics for non-source vertices
Add {{FlinkMetric.NUM_RECORDS_IN_PER_SEC}} ({{{}numRecordsInPerSecond{}}}) and
{{{}FlinkMetric.NUM_RECORDS_OUT_PER_SEC (numRecordsOutPerSecond{}}}) to the
metric collection pipeline for non-source vertices in
{{{}ScalingMetricCollector{}}}. Define corresponding
{{ScalingMetric.NUM_RECORDS_IN_PER_SECOND}} and
{{ScalingMetric.NUM_RECORDS_OUT_PER_SECOND}} entries, and store them in
{{{}ScalingMetrics.computeDataRateMetrics{}}}.
h2. 4.2. Prefer per-second metrics with accumulated-counter fallback
Introduce {{getAverageWithRateFallback(perSecondMetric, accumulatedMetric,
...)}} in {{{}ScalingMetricEvaluator{}}}:
- First tries {{{}getAverage(perSecondMetric){}}}, which is the average of
Flink's native per-second rate gauge across the metric window. This uses all
samples, not just endpoints.
- Falls back to {{getRate(accumulatedMetric)}} if the per-second metric is
unavailable (NaN or Infinite) preserving backward compatibility with older
Flink versions or custom sources that don't expose per-second metrics.
h2. 4.3. Use average rate of change for LAG
Replace {{getRate(LAG, ...)}} with {{getAverageRate(LAG, ...)}} which computes
the average of per-interval deltas across the full metric window, rather than
endpoint-only interpolation. This correctly weights sudden lag spikes in their
respective intervals instead of diluting them across the entire window.
was:
h1. 1. Summary
The autoscaler's metric evaluation pipeline does not use Flink's native
*per-second rate* metrics (*numRecordsInPerSecond*, *numRecordsOutPerSecond*)
for non-source vertices, even though equivalent source-specific variants
(*Source__*.numRecordsInPerSecond*) are already used for sources. Instead,
non-source vertices rely exclusively on accumulated counters (*numRecordsIn*,
*numRecordsOut*) and compute rates via endpoint-based interpolation
(*getRate*), which only considers the first and last values in the metric
history window. This produces inaccurate rate estimates (particularly in the
presence of spikes or uneven throughput) and propagates inconsistencies through
the entire scaling decision chain.
The same endpoint-based interpolation problem also affects the *LAG* metric for
source vertices. *getRate(LAG, ...)* computes *(last_lag - first_lag) /
timeDiff*, which dilutes sudden lag spikes across the entire window. For
example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag
rate because the spike between sample 1 and 2 is spread over the full window
duration, rather than being properly weighted in its interval.
h1. 2. Root cause
Non-source vertices never collect *numRecordsInPerSecond* /
*numRecordsOutPerSecond*: The *ScalingMetricCollector* only requests the
source-specific per-second metrics (*SOURCE_TASK_NUM_RECORDS_IN_PER_SEC*) for
source vertices. For non-source vertices, only accumulated counters are
collected, and rates are computed by *getRate()* (an endpoint-only
interpolation: *(last - first) / timeDiff*).
Endpoint-based *getRate* on *LAG*: *LAG* is a gauge that can jump abruptly
(e.g., 0 → 1M) and then plateau. *getRate* only looks at the first and last
samples, so a spike that occurred early in the window is diluted across the
entire window duration, producing an artificially low rate of change. The
intermediate samples, which show the plateau, are completely ignored.
h1. 3. Impact
The inaccurate rate estimates cascade through the scaling formula, compounding
at each step:
1. *TARGET_DATA_RATE underestimated for all vertices*: Since inputRateAvg is
computed from accumulated counters via endpoint interpolation (which smooths
out spikes and underestimates bursty throughput), and lagRate for sources is
diluted by the same endpoint-only approach, the computed *TARGET_DATA_RATE =
inputRate + lagRate* is systematically smaller than the true ingestion rate.
This also means *SCALE_UP_RATE_THRESHOLD* and *SCALE_DOWN_RATE_THRESHOLD*,
which are derived from *TARGET_DATA_RATE*, are too low.
2. *TRUE_PROCESSING_RATE overestimated for non-source vertices*: Non-source
vertices that cannot rely on OBSERVED_TPR fall back to computing true
processing rate from busyTimeAvg. When the underlying input rate is
underestimated (due to endpoint-only interpolation), the ratio inputRate /
busyTimeFraction produces an inflated TRUE_PROCESSING_RATE.
3. *scalingFactor is too small*: The scaling factor is derived from
*TARGET_DATA_RATE / TRUE_PROCESSING_RATE*. With the numerator underestimated
and the denominator overestimated, the resulting scaling factor is
significantly smaller than it should be.
4. End-user impact:
- *Excessive scaling iterations*: The autoscaler cannot reach the correct
target parallelism in a single scaling decision. Instead, it takes multiple
subsequent scalings to converge, each one undershooting because of the same
systematic bias.
- *Source-to-downstream parallelism inversion in FORWARD shipping strategy*:
Because source vertices use the more accurate per-second metrics while
downstream vertices use the less accurate accumulated counters, the source can
end up with a higher parallelism than its downstream FORWARD-connected vertex,
resulting in a topology violation that can cause runtime failures or suboptimal
data distribution.
h1. 4. Proposed solution
h2. 4.1. Collect per-second rate metrics for non-source vertices
Add *FlinkMetric.NUM_RECORDS_IN_PER_SEC* (*numRecordsInPerSecond*) and
*FlinkMetric.NUM_RECORDS_OUT_PER_SEC* (*numRecordsOutPerSecond*) to the metric
collection pipeline for non-source vertices in *ScalingMetricCollector*. Define
corresponding *ScalingMetric.NUM_RECORDS_IN_PER_SECOND* and
*ScalingMetric.NUM_RECORDS_OUT_PER_SECOND* entries, and store them in
*ScalingMetrics.computeDataRateMetrics*.
h2. 4.2. Prefer per-second metrics with accumulated-counter fallback
Introduce *getAverageWithRateFallback(perSecondMetric, accumulatedMetric, ...)*
in *ScalingMetricEvaluator*:
- First tries *getAverage(perSecondMetric)*, which is the average of Flink's
native per-second rate gauge across the metric window. This uses all samples,
not just endpoints.
- Falls back to *getRate(accumulatedMetric)* if the per-second metric is
unavailable (NaN or Infinite) preserving backward compatibility with older
Flink versions or custom sources that don't expose per-second metrics.
h2. 4.3. Use average rate of change for LAG
Replace *getRate(LAG, ...)* with *getAverageRate(LAG, ...)* which computes the
average of per-interval deltas across the full metric window, rather than
endpoint-only interpolation. This correctly weights sudden lag spikes in their
respective intervals instead of diluting them across the entire window.
> Non-source vertices do not use per-second rate metrics, producing inaccurate
> scaling decisions
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-39306
> URL: https://issues.apache.org/jira/browse/FLINK-39306
> Project: Flink
> Issue Type: Bug
> Components: Autoscaler, Kubernetes Operator
> Affects Versions: kubernetes-operator-1.14.0
> Reporter: Dennis-Mircea Ciupitu
> Priority: Major
> Labels: autoscaling, operator, pull-request-available
> Fix For: kubernetes-operator-1.16.0
>
>
> h1. 1. Summary
> The autoscaler's metric evaluation pipeline does not use Flink's native
> per-second rate metrics ({{{}numRecordsInPerSecond{}}},
> {{{}numRecordsOutPerSecond{}}}) for non-source vertices, even though
> equivalent source-specific variants ({{{}Source__.numRecordsInPerSecond*{}}})
> are already used for sources. Instead, non-source vertices rely exclusively
> on accumulated counters ({{{}numRecordsIn{}}}, {{{}numRecordsOut{}}}) and
> compute rates via endpoint-based interpolation ({{{}getRate{}}}), which only
> considers the first and last values in the metric history window. This
> produces inaccurate rate estimates (particularly in the presence of spikes or
> uneven throughput) and propagates inconsistencies through the entire scaling
> decision chain.
> The same endpoint-based interpolation problem also affects the {{LAG}} metric
> for source vertices. {{getRate(LAG, ...)}} computes {{{}(last_lag -
> first_lag) / timeDiff{}}}, which dilutes sudden lag spikes across the entire
> window. For example, a history of [0, 1M, 1M, ..., 1M] produces an
> artificially low lag rate because the spike between sample 1 and 2 is spread
> over the full window duration, rather than being properly weighted in its
> interval.
> h1. 2. Root cause
> Non-source vertices never collect {{numRecordsInPerSecond}} /
> {{{}numRecordsOutPerSecond{}}}: The {{ScalingMetricCollector}} only requests
> the source-specific per-second metrics
> ({{{}SOURCE_TASK_NUM_RECORDS_IN_PER_SEC{}}}) for source vertices. For
> non-source vertices, only accumulated counters are collected, and rates are
> computed by {{getRate()}} (an endpoint-only interpolation: {{{}(last - first)
> / timeDiff{}}}).
> Endpoint-based {{getRate}} on {{{}LAG{}}}: {{LAG}} is a gauge that can jump
> abruptly (e.g., 0 → 1M) and then plateau. {{getRate}} only looks at the first
> and last samples, so a spike that occurred early in the window is diluted
> across the entire window duration, producing an artificially low rate of
> change. The intermediate samples, which show the plateau, are completely
> ignored.
> h1. 3. Impact
> The inaccurate rate estimates cascade through the scaling formula,
> compounding at each step:
> 1. {*}{{TARGET_DATA_RATE}} underestimated for all vertices{*}: Since
> {{inputRateAvg}} is computed from accumulated counters via endpoint
> interpolation (which smooths out spikes and underestimates bursty
> throughput), and {{lagRate}} for sources is diluted by the same endpoint-only
> approach, the computed {{TARGET_DATA_RATE = inputRate + lagRate}} is
> systematically smaller than the true ingestion rate. This also means
> {{SCALE_UP_RATE_THRESHOLD}} and {{{}SCALE_DOWN_RATE_THRESHOLD{}}}, which are
> derived from {{{}TARGET_DATA_RATE{}}}, are too low.
> 2. {*}{{TRUE_PROCESSING_RATE}} overestimated for non-source vertices{*}:
> Non-source vertices that cannot rely on {{OBSERVED_TPR}} fallback to
> computing true processing rate from {{{}busyTimeAvg{}}}. When the underlying
> input rate is underestimated (due to endpoint-only interpolation), the ratio
> {{inputRate}} / {{busyTimeFraction}} produces an inflated
> {{{}TRUE_PROCESSING_RATE{}}}.
> 3. {*}scalingFactor is too small{*}: The scaling factor is derived from
> {{TARGET_DATA_RATE}} / {{{}TRUE_PROCESSING_RATE{}}}. With the numerator
> underestimated and the denominator overestimated, the resulting scaling
> factor is significantly smaller than it should be.
> 4. End-user impact:
> - {*}Excessive scaling iterations{*}: The autoscaler cannot reach the
> correct target parallelism in a single scaling decision. Instead, it takes
> multiple subsequent scalings to converge, each one undershooting because of
> the same systematic bias.
> - {*}Source-to-downstream parallelism inversion in FORWARD shipping
> strategy{*}: Because source vertices use the more accurate per-second metrics
> while downstream vertices use the less accurate accumulated counters, the
> source can end up with a higher parallelism than its downstream
> FORWARD-connected vertex, resulting in a topology violation that can cause
> runtime failures or suboptimal data distribution.
> h1. 4. Proposed solution
> h2. 4.1. Collect per-second rate metrics for non-source vertices
> Add {{FlinkMetric.NUM_RECORDS_IN_PER_SEC}} ({{{}numRecordsInPerSecond{}}})
> and {{{}FlinkMetric.NUM_RECORDS_OUT_PER_SEC (numRecordsOutPerSecond{}}}) to
> the metric collection pipeline for non-source vertices in
> {{{}ScalingMetricCollector{}}}. Define corresponding
> {{ScalingMetric.NUM_RECORDS_IN_PER_SECOND}} and
> {{ScalingMetric.NUM_RECORDS_OUT_PER_SECOND}} entries, and store them in
> {{{}ScalingMetrics.computeDataRateMetrics{}}}.
> h2. 4.2. Prefer per-second metrics with accumulated-counter fallback
> Introduce {{getAverageWithRateFallback(perSecondMetric, accumulatedMetric,
> ...)}} in {{{}ScalingMetricEvaluator{}}}:
> - First tries {{{}getAverage(perSecondMetric){}}}, which is the average of
> Flink's native per-second rate gauge across the metric window. This uses all
> samples, not just endpoints.
> - Falls back to {{getRate(accumulatedMetric)}} if the per-second metric is
> unavailable (NaN or Infinite) preserving backward compatibility with older
> Flink versions or custom sources that don't expose per-second metrics.
> h2. 4.3. Use average rate of change for LAG
> Replace {{getRate(LAG, ...)}} with {{getAverageRate(LAG, ...)}} which
> computes the average of per-interval deltas across the full metric window,
> rather than endpoint-only interpolation. This correctly weights sudden lag
> spikes in their respective intervals instead of diluting them across the
> entire window.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)