gyfora commented on code in PR #966:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/966#discussion_r2065705970
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism(
delayedScaleDown);
}
+ /**
+ * Calculates the scaling coefficient based on historical scaling data.
+ *
+ * <p>The scaling coefficient is computed using a weighted least squares
approach, where more
+ * recent data points and those with higher parallelism are given higher
weights. If there are
+ * not enough observations, or if the computed coefficient is invalid, a
default value of {@code
+ * 1.0} is returned, assuming linear scaling.
+ *
+ * @param history A {@code SortedMap} of {@code Instant} timestamps to
{@code ScalingSummary}
+ * @param minObservations The minimum number of observations required to
compute the scaling
+ * coefficient. If the number of historical entries is less than this
threshold, a default
+ * coefficient of {@code 1.0} is returned.
+ * @return The computed scaling coefficient.
+ */
+ @VisibleForTesting
+ protected static double calculateObservedScalingCoefficient(
+ SortedMap<Instant, ScalingSummary> history, int minObservations) {
+ /*
+ * The scaling coefficient is computed using a **weighted least
squares** approach
+ * to fit a linear model:
+ *
+ * R_i = β * P_i * α
+ *
+ * where:
+ * - R_i = observed processing rate
+ * - P_i = parallelism
+ * - β = baseline processing rate
+ * - α = scaling coefficient to optimize
+ *
+ * The optimization minimizes the **weighted sum of squared errors**:
+ *
+ * Loss = ∑ w_i * (R_i - β * α * P_i)^2
+ *
+ * Differentiating w.r.t. α and solving for α:
+ *
+ * α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β)
+ *
+ * We keep the system conservative for higher returns scenario by
clamping computed α within 1.0.
+ */
+
+ // not enough data to compute scaling coefficient. we assume linear
scaling.
+ if (history.isEmpty() || history.size() < minObservations) {
+ return 1.0;
+ }
+
+ var baselineProcessingRate =
AutoScalerUtils.computeBaselineProcessingRate(history);
+
+ if (Double.isNaN(baselineProcessingRate)) {
+ return 1.0;
+ }
+
+ Instant latestTimestamp = history.lastKey();
+
+ List<Double> parallelismList = new ArrayList<>();
+ List<Double> processingRateList = new ArrayList<>();
+ List<Double> weightList = new ArrayList<>();
+
+ for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
+ Instant timestamp = entry.getKey();
+ ScalingSummary summary = entry.getValue();
+ double parallelism = summary.getCurrentParallelism();
+ double processingRate =
summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+
+ if (Double.isNaN(processingRate)) {
+ LOG.warn(
+ "True processing rate is not available in scaling
history. Cannot compute scaling coefficient.");
+ return 1.0;
+ }
+
+ // Compute weight based on recency & parallelism
+ double timeDiff =
+ Duration.between(timestamp, latestTimestamp).getSeconds()
+ + 1; // Avoid division by zero
+ double weight = parallelism / timeDiff;
Review Comment:
we could also add an enum configuration with some strategies here if we feel
that would be required, but maybe an overkill initially
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -236,6 +248,97 @@ public ParallelismChange computeScaleTargetParallelism(
delayedScaleDown);
}
+ /**
+ * Calculates the scaling coefficient based on historical scaling data.
+ *
+ * <p>The scaling coefficient is computed using a weighted least squares
approach, where more
+ * recent data points and those with higher parallelism are given higher
weights. If there are
+ * not enough observations, or if the computed coefficient is invalid, a
default value of {@code
+ * 1.0} is returned, assuming linear scaling.
+ *
+ * @param history A {@code SortedMap} of {@code Instant} timestamps to
{@code ScalingSummary}
+ * @param minObservations The minimum number of observations required to
compute the scaling
+ * coefficient. If the number of historical entries is less than this
threshold, a default
+ * coefficient of {@code 1.0} is returned.
+ * @return The computed scaling coefficient.
+ */
+ @VisibleForTesting
+ protected static double calculateObservedScalingCoefficient(
+ SortedMap<Instant, ScalingSummary> history, int minObservations) {
+ /*
+ * The scaling coefficient is computed using a **weighted least
squares** approach
+ * to fit a linear model:
+ *
+ * R_i = β * P_i * α
+ *
+ * where:
+ * - R_i = observed processing rate
+ * - P_i = parallelism
+ * - β = baseline processing rate
+ * - α = scaling coefficient to optimize
+ *
+ * The optimization minimizes the **weighted sum of squared errors**:
+ *
+ * Loss = ∑ w_i * (R_i - β * α * P_i)^2
+ *
+ * Differentiating w.r.t. α and solving for α:
+ *
+ * α = ∑ (w_i * P_i * R_i) / (∑ (w_i * P_i^2) * β)
+ *
+ * We keep the system conservative for higher returns scenario by
clamping computed α within 1.0.
+ */
+
+ // not enough data to compute scaling coefficient. we assume linear
scaling.
+ if (history.isEmpty() || history.size() < minObservations) {
+ return 1.0;
+ }
+
+ var baselineProcessingRate =
AutoScalerUtils.computeBaselineProcessingRate(history);
+
+ if (Double.isNaN(baselineProcessingRate)) {
+ return 1.0;
+ }
+
+ Instant latestTimestamp = history.lastKey();
+
+ List<Double> parallelismList = new ArrayList<>();
+ List<Double> processingRateList = new ArrayList<>();
+ List<Double> weightList = new ArrayList<>();
+
+ for (Map.Entry<Instant, ScalingSummary> entry : history.entrySet()) {
+ Instant timestamp = entry.getKey();
+ ScalingSummary summary = entry.getValue();
+ double parallelism = summary.getCurrentParallelism();
+ double processingRate =
summary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage();
+
+ if (Double.isNaN(processingRate)) {
+ LOG.warn(
+ "True processing rate is not available in scaling
history. Cannot compute scaling coefficient.");
+ return 1.0;
+ }
+
+ // Compute weight based on recency & parallelism
+ double timeDiff =
+ Duration.between(timestamp, latestTimestamp).getSeconds()
+ + 1; // Avoid division by zero
+ double weight = parallelism / timeDiff;
Review Comment:
Why did you decide on this particular weighting approach? To be specific,
what's the benefit compared to:
- Not weighting
- Using weights based on the difference with the current parallelism
(locally weighted regression)
I think overall weighting makes sense but maybe weighing based on the
parallelism difference ( and time) makes more sense then simply parallelism
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -178,6 +184,12 @@ public ParallelismChange computeScaleTargetParallelism(
LOG.debug("Target processing capacity for {} is {}", vertex,
targetCapacity);
double scaleFactor = targetCapacity / averageTrueProcessingRate;
+ if (conf.get(OBSERVED_SCALABILITY_ENABLED)) {
+ double scalingCoefficient =
+ JobVertexScaler.calculateObservedScalingCoefficient(
+ history,
conf.get(OBSERVED_SCALABILITY_MIN_OBSERVATIONS));
+ scaleFactor = scaleFactor / scalingCoefficient;
Review Comment:
We should probably set some limit here, if the scaling coefficient is very
small then there is not really a point in scaling at all (similar to the
ineffective scaling detection logic, we could probably reuse the set threshold
from there)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]