This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 34edcc45 [FLINK-39826][autoscaler] Strengthen autoscaler configuration
validation (#1129)
34edcc45 is described below
commit 34edcc45851d523e97edb6f5ccc0ceb945a32dd7
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Thu Jun 4 12:25:28 2026 +0300
[FLINK-39826][autoscaler] Strengthen autoscaler configuration validation
(#1129)
---
.../autoscaler/validation/AutoscalerValidator.java | 61 ++++++--
.../validation/AutoscalerValidatorTest.java | 159 +++++++++++++++++++++
.../operator/validation/DefaultValidator.java | 28 +++-
.../operator/validation/DefaultValidatorTest.java | 101 ++++++++-----
4 files changed, 300 insertions(+), 49 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
index 185d6a20..265449f5 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
@@ -25,11 +25,6 @@ import org.apache.flink.configuration.Configuration;
import java.util.Optional;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
-
/** Validator for Autoscaler. */
public class AutoscalerValidator {
@@ -41,17 +36,46 @@ public class AutoscalerValidator {
*/
public Optional<String> validateAutoscalerOptions(Configuration flinkConf)
{
- if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
+ if (!flinkConf.get(AutoScalerOptions.AUTOSCALER_ENABLED)) {
return Optional.empty();
}
return firstPresent(
validateNumber(flinkConf,
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
validateNumber(flinkConf,
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
- validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
+ validateNumber(flinkConf,
AutoScalerOptions.UTILIZATION_TARGET, 0.0d, 1.0d),
validateNumber(flinkConf,
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
- validateNumber(flinkConf, UTILIZATION_MAX,
flinkConf.get(UTILIZATION_TARGET), 1.0d),
- validateNumber(flinkConf, UTILIZATION_MIN, 0.0d,
flinkConf.get(UTILIZATION_TARGET)),
- validateNumber(flinkConf,
OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
+ validateNumber(
+ flinkConf,
+ AutoScalerOptions.UTILIZATION_MAX,
+ flinkConf.get(AutoScalerOptions.UTILIZATION_TARGET),
+ 1.0d),
+ validateNumber(
+ flinkConf,
+ AutoScalerOptions.UTILIZATION_MIN,
+ 0.0d,
+ flinkConf.get(AutoScalerOptions.UTILIZATION_TARGET)),
+ validateNumber(flinkConf,
AutoScalerOptions.GC_PRESSURE_THRESHOLD, 0.0d, 1.0d),
+ validateNumber(flinkConf,
AutoScalerOptions.HEAP_USAGE_THRESHOLD, 0.0d, 1.0d),
+ // The following options only take effect when their feature
is enabled, so they are
+ // only validated in that case.
+ validateNumberIfEnabled(
+ flinkConf,
+ AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED,
+ AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN,
+ 0.01d,
+ 1d),
+ validateNumberIfEnabled(
+ flinkConf,
+
AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
+ AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD,
+ 0.0d,
+ 1.0d),
+ validateNumberIfEnabled(
+ flinkConf,
+ AutoScalerOptions.MEMORY_TUNING_ENABLED,
+ AutoScalerOptions.MEMORY_TUNING_OVERHEAD,
+ 0.0d,
+ 1.0d),
CalendarUtils.validateExcludedPeriods(flinkConf));
}
@@ -91,6 +115,23 @@ public class AutoscalerValidator {
}
}
+ /**
+ * Validates a numeric option only when the feature it belongs to is
enabled. When the feature
+ * is disabled the option has no effect, so an out-of-range value is
harmless and is not
+ * reported.
+ */
+ private static <T extends Number> Optional<String> validateNumberIfEnabled(
+ Configuration flinkConfiguration,
+ ConfigOption<Boolean> enabledConfig,
+ ConfigOption<T> autoScalerConfig,
+ Double min,
+ Double max) {
+ if (!flinkConfiguration.get(enabledConfig)) {
+ return Optional.empty();
+ }
+ return validateNumber(flinkConfiguration, autoScalerConfig, min, max);
+ }
+
private static <T extends Number> Optional<String> validateNumber(
Configuration flinkConfiguration, ConfigOption<T>
autoScalerConfig, Double min) {
return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/validation/AutoscalerValidatorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/validation/AutoscalerValidatorTest.java
new file mode 100644
index 00000000..c5c3a328
--- /dev/null
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/validation/AutoscalerValidatorTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.validation;
+
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import javax.annotation.Nullable;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AutoscalerValidator}. */
+class AutoscalerValidatorTest {
+
+ private final AutoscalerValidator validator = new AutoscalerValidator();
+
+ private static Configuration enabledConf() {
+ var conf = new Configuration();
+ conf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+ return conf;
+ }
+
+ @Test
+ void testDisabledAutoscalerSkipsValidation() {
+ // Out-of-range value is ignored when the autoscaler is disabled.
+ var conf = new Configuration();
+ conf.set(AutoScalerOptions.AUTOSCALER_ENABLED, false);
+ conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD, 2.0d);
+
+ assertThat(validator.validateAutoscalerOptions(conf)).isEmpty();
+ }
+
+ @Test
+ void testDefaultsAreValid() {
+
assertThat(validator.validateAutoscalerOptions(enabledConf())).isEmpty();
+ }
+
+ /**
+ * Numeric options together with the lower/upper bound enforced and, where
applicable, the
+ * feature flag that must be enabled for the option to be validated.
+ */
+ static Stream<Arguments> boundedOptions() {
+ return Stream.of(
+ // Always validated when the autoscaler is enabled.
+ Arguments.of(AutoScalerOptions.GC_PRESSURE_THRESHOLD, null,
0.0d, 1.0d),
+ Arguments.of(AutoScalerOptions.HEAP_USAGE_THRESHOLD, null,
0.0d, 1.0d),
+ // Only validated when their feature is enabled.
+ Arguments.of(
+ AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD,
+
AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
+ 0.0d,
+ 1.0d),
+ Arguments.of(
+ AutoScalerOptions.MEMORY_TUNING_OVERHEAD,
+ AutoScalerOptions.MEMORY_TUNING_ENABLED,
+ 0.0d,
+ 1.0d),
+ Arguments.of(
+ AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN,
+ AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED,
+ 0.01d,
+ 1.0d));
+ }
+
+ private static Configuration confWithFeatureEnabled(
+ @Nullable ConfigOption<Boolean> enableFlag) {
+ var conf = enabledConf();
+ if (enableFlag != null) {
+ conf.set(enableFlag, true);
+ }
+ return conf;
+ }
+
+ @ParameterizedTest
+ @MethodSource("boundedOptions")
+ void testValueAboveMaxIsRejected(
+ ConfigOption<Double> option,
+ @Nullable ConfigOption<Boolean> enableFlag,
+ double min,
+ double max) {
+ var conf = confWithFeatureEnabled(enableFlag);
+ conf.set(option, max + 0.0001d);
+
+ assertThat(validator.validateAutoscalerOptions(conf))
+ .hasValueSatisfying(error ->
assertThat(error).contains(option.key()));
+ }
+
+ @ParameterizedTest
+ @MethodSource("boundedOptions")
+ void testValueBelowMinIsRejected(
+ ConfigOption<Double> option,
+ @Nullable ConfigOption<Boolean> enableFlag,
+ double min,
+ double max) {
+ var conf = confWithFeatureEnabled(enableFlag);
+ conf.set(option, min - 0.0001d);
+
+ assertThat(validator.validateAutoscalerOptions(conf))
+ .hasValueSatisfying(error ->
assertThat(error).contains(option.key()));
+ }
+
+ @ParameterizedTest
+ @MethodSource("boundedOptions")
+ void testBoundaryValuesAreAccepted(
+ ConfigOption<Double> option,
+ @Nullable ConfigOption<Boolean> enableFlag,
+ double min,
+ double max) {
+ var lower = confWithFeatureEnabled(enableFlag);
+ lower.set(option, min);
+ assertThat(validator.validateAutoscalerOptions(lower)).isEmpty();
+
+ var upper = confWithFeatureEnabled(enableFlag);
+ upper.set(option, max);
+ assertThat(validator.validateAutoscalerOptions(upper)).isEmpty();
+ }
+
+ @ParameterizedTest
+ @MethodSource("boundedOptions")
+ void testOutOfRangeIgnoredWhenFeatureDisabled(
+ ConfigOption<Double> option,
+ @Nullable ConfigOption<Boolean> enableFlag,
+ double min,
+ double max) {
+ if (enableFlag == null) {
+ // Not a feature-gated option, nothing to assert here.
+ return;
+ }
+ var conf = enabledConf();
+ // Feature flag left disabled on purpose.
+ conf.set(option, max + 1.0d);
+
+ assertThat(validator.validateAutoscalerOptions(conf)).isEmpty();
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index d0cef03a..046bdac2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.validation;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.validation.AutoscalerValidator;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
@@ -675,6 +676,31 @@ public class DefaultValidator implements
FlinkResourceValidator {
return Optional.empty();
}
Configuration flinkConfiguration =
Configuration.fromMap(effectiveConfig);
- return
autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
+ return firstPresent(
+
autoscalerValidator.validateAutoscalerOptions(flinkConfiguration),
+ validateMetricsWindow(flinkConfiguration));
+ }
+
+ private Optional<String> validateMetricsWindow(Configuration conf) {
+ if (!conf.get(AutoScalerOptions.AUTOSCALER_ENABLED)) {
+ return Optional.empty();
+ }
+ // The autoscaler collects one metric sample per reconcile loop and
requires at least two
+ // samples within the metric window to evaluate scaling. If the window
is smaller than the
+ // reconcile interval, the window is trimmed down to a single sample
on every loop and
+ // autoscaling is never applied.
+ var metricsWindow = conf.get(AutoScalerOptions.METRICS_WINDOW);
+ var reconcileInterval =
+
conf.get(KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL);
+ if (metricsWindow.compareTo(reconcileInterval) < 0) {
+ return Optional.of(
+ String.format(
+ "The autoscaler metric window (%s=%s) must not be
smaller than the operator reconcile interval (%s=%s), otherwise fewer than two
metric samples are retained and autoscaling is never applied.",
+ AutoScalerOptions.METRICS_WINDOW.key(),
+ metricsWindow,
+
KubernetesOperatorConfigOptions.OPERATOR_RECONCILE_INTERVAL.key(),
+ reconcileInterval));
+ }
+ return Optional.empty();
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 25082625..77d073b1 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -487,15 +487,11 @@ public class DefaultValidatorTest {
});
testError(
- dep -> {
-
dep.getSpec().getJobManager().getResource().setEphemeralStorage("abc");
- },
+ dep ->
dep.getSpec().getJobManager().getResource().setEphemeralStorage("abc"),
"JobManager resource ephemeral storage parse error: Character
a is neither a decimal digit number, decimal point, nor \"e\" notation
exponential mark.");
testError(
- dep -> {
-
dep.getSpec().getTaskManager().getResource().setEphemeralStorage("abc");
- },
+ dep ->
dep.getSpec().getTaskManager().getResource().setEphemeralStorage("abc"),
"TaskManager resource ephemeral storage parse error: Character
a is neither a decimal digit number, decimal point, nor \"e\" notation
exponential mark.");
}
@@ -854,9 +850,7 @@ public class DefaultValidatorTest {
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(),
"test-checkpoint-dir"));
},
- flinkDeployment -> {
- flinkDeployment.getSpec().setFlinkConfiguration(Map.of());
- },
+ flinkDeployment ->
flinkDeployment.getSpec().setFlinkConfiguration(Map.of()),
null);
}
@@ -963,17 +957,34 @@ public class DefaultValidatorTest {
public void testAutoScalerDeploymentWithInvalidScalingCoefficientMin() {
var result =
testAutoScalerConfiguration(
- flinkConf ->
- flinkConf.put(
-
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN
- .key(),
- "1.2"));
+ flinkConf -> {
+ // The coefficient is only validated when observed
scalability is on.
+ flinkConf.put(
+
AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED.key(), "true");
+ flinkConf.put(
+
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN.key(),
+ "1.2");
+ });
assertErrorContains(
result,
getFormattedErrorMessage(
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d));
}
+ @Test
+ public void
testInvalidScalingCoefficientMinIgnoredWhenObservedScalabilityDisabled() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf -> {
+ flinkConf.put(
+
AutoScalerOptions.OBSERVED_SCALABILITY_ENABLED.key(), "false");
+ flinkConf.put(
+
AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN.key(),
+ "1.2");
+ });
+ assertErrorNotContains(result);
+ }
+
@Test
public void testNonEnabledAutoScalerDeploymentJob() {
var result =
@@ -1009,6 +1020,37 @@ public class DefaultValidatorTest {
testSessionJobAutoScalerConfiguration(flinkConf ->
{}).ifPresent(Assertions::fail);
}
+ @Test
+ public void testMetricsWindowSmallerThanReconcileIntervalIsRejected() {
+ // The default reconcile interval is 60s. A smaller metric window
means fewer than two
+ // samples are retained per loop and autoscaling never runs, so it
must be rejected.
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf ->
flinkConf.put(AutoScalerOptions.METRICS_WINDOW.key(), "30 s"));
+ Assertions.assertTrue(result.isPresent());
+
Assertions.assertTrue(result.get().contains(AutoScalerOptions.METRICS_WINDOW.key()));
+ }
+
+ @Test
+ public void testMetricsWindowLargerThanReconcileIntervalIsAccepted() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf ->
+
flinkConf.put(AutoScalerOptions.METRICS_WINDOW.key(), "5 min"));
+ assertErrorNotContains(result);
+ }
+
+ @Test
+ public void testSmallMetricsWindowIgnoredWhenAutoscalerDisabled() {
+ var result =
+ testAutoScalerConfiguration(
+ flinkConf -> {
+
flinkConf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
+
flinkConf.put(AutoScalerOptions.METRICS_WINDOW.key(), "1 s");
+ });
+ assertErrorNotContains(result);
+ }
+
@Test
public void testValidateSessionJobWithInvalidNegativeScaleDownFactor() {
var result =
@@ -1130,9 +1172,7 @@ public class DefaultValidatorTest {
deploymentResult =
testAutoScalerConfiguration(
- flinkConf -> {
-
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.8");
- });
+ flinkConf ->
flinkConf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.8"));
assertErrorContains(
deploymentResult,
getFormattedErrorMessage(
@@ -1142,9 +1182,8 @@ public class DefaultValidatorTest {
deploymentResult =
testAutoScalerConfiguration(
- flinkConf -> {
-
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5");
- });
+ flinkConf ->
+
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5"));
assertErrorContains(
deploymentResult,
@@ -1172,9 +1211,7 @@ public class DefaultValidatorTest {
sessionResult =
testSessionJobAutoScalerConfiguration(
- flinkConf -> {
-
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6");
- });
+ flinkConf ->
flinkConf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "0.6"));
assertErrorContains(
sessionResult,
getFormattedErrorMessage(
@@ -1184,9 +1221,8 @@ public class DefaultValidatorTest {
sessionResult =
testSessionJobAutoScalerConfiguration(
- flinkConf -> {
-
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5");
- });
+ flinkConf ->
+
flinkConf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "1.5"));
assertErrorContains(
sessionResult,
@@ -1226,7 +1262,7 @@ public class DefaultValidatorTest {
conf.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
conf.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "100000.0");
conf.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "0.6");
-
conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED.key(),
"0.1");
+ conf.put(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD.key(),
"0.1");
conf.put(AutoScalerOptions.UTILIZATION_TARGET.key(), "0.7");
conf.put(AutoScalerOptions.UTILIZATION_MAX.key(), "1.0");
conf.put(AutoScalerOptions.UTILIZATION_MIN.key(), "0.3");
@@ -1242,17 +1278,6 @@ public class DefaultValidatorTest {
max != null ? max.toString() : "+Infinity");
}
- private static String getFormattedNumberOrderErrorMessage(
- ConfigOption<Double> configValueLeft, ConfigOption<Double>
configValueRight) {
- return String.format(
- "The AutoScalerOption %s or %s is invalid, %s must be less
than or equal to the value of "
- + "%s",
- configValueLeft.key(),
- configValueRight.key(),
- configValueLeft.key(),
- configValueRight.key());
- }
-
private static String getFormattedErrorMessage(ConfigOption<Double>
configValue, Double min) {
return getFormattedErrorMessage(configValue, min, null);
}