This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 84c76839 [FLINK-37126] Add Validator for Autoscaler
84c76839 is described below
commit 84c76839591d7b49ed17a85cbc784d850093b407
Author: big face cat <[email protected]>
AuthorDate: Mon Nov 3 20:45:37 2025 +0800
[FLINK-37126] Add Validator for Autoscaler
---
.../standalone/StandaloneAutoscalerExecutor.java | 19 +++-
.../StandaloneAutoscalerValidatorTest.java | 109 +++++++++++++++++++++
.../autoscaler/validation/AutoscalerValidator.java | 98 ++++++++++++++++++
.../operator/validation/DefaultValidator.java | 66 +------------
4 files changed, 230 insertions(+), 62 deletions(-)
diff --git
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index 4a837438..7e13be92 100644
---
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.validation.AutoscalerValidator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
import static
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;
@@ -68,6 +71,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context
extends JobAutoScalerCont
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService scalingThreadPool;
private final UnmodifiableConfiguration baseConf;
+ private final AutoscalerValidator autoscalerValidator;
/**
* Maintain a set of job keys that during scaling, it should be accessed
at {@link
@@ -103,6 +107,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context
extends JobAutoScalerCont
parallelism, new
ExecutorThreadFactory("autoscaler-standalone-scaling"));
this.scalingJobKeys = new HashSet<>();
this.baseConf = new UnmodifiableConfiguration(conf);
+ this.autoscalerValidator = new AutoscalerValidator();
}
public void start() {
@@ -188,7 +193,19 @@ public class StandaloneAutoscalerExecutor<KEY, Context
extends JobAutoScalerCont
protected void scalingSingleJob(Context jobContext) {
try {
MDC.put("job.key", jobContext.getJobKey().toString());
- autoScaler.scale(jobContext);
+ Optional<String> validationError =
+
autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration());
+ if (validationError.isPresent()) {
+ eventHandler.handleEvent(
+ jobContext,
+ AutoScalerEventHandler.Type.Warning,
+ "AutoScaler Options Validation",
+ validationError.get(),
+ null,
+ baseConf.get(SCALING_EVENT_INTERVAL));
+ } else {
+ autoScaler.scale(jobContext);
+ }
} catch (Throwable e) {
LOG.error("Error while scaling job", e);
eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e);
diff --git
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java
new file mode 100644
index 00000000..7ff6d4a2
--- /dev/null
+++
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class StandaloneAutoscalerValidatorTest {
+ private List<JobAutoScalerContext<JobID>> jobList;
+ private TestingEventCollector<JobID, JobAutoScalerContext<JobID>>
eventCollector;
+ private ConcurrentHashMap<JobID, Integer> scaleCounter;
+ private Configuration correctConfiguration;
+ private Configuration invalidConfiguration;
+
+ @BeforeEach
+ void setUp() {
+ jobList = new ArrayList<>();
+ eventCollector = new TestingEventCollector<>();
+ scaleCounter = new ConcurrentHashMap<>();
+
+ correctConfiguration = new Configuration();
+ correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+
+ invalidConfiguration = new Configuration();
+ invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+ invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR,
-1.0);
+ }
+
+ @Test
+ void testAutoScalerWithInvalidConfig() throws Exception {
+ JobAutoScalerContext<JobID> validJob =
createJobAutoScalerContext(correctConfiguration);
+ JobAutoScalerContext<JobID> invalidJob =
createJobAutoScalerContext(invalidConfiguration);
+
+ jobList.add(validJob);
+ jobList.add(invalidJob);
+
+ final var jobAutoScaler =
+ new JobAutoScaler<JobID, JobAutoScalerContext<JobID>>() {
+ @Override
+ public void scale(JobAutoScalerContext<JobID> context)
+ throws InterruptedException {
+ scaleCounter.merge(context.getJobKey(), 1,
Integer::sum);
+ }
+
+ @Override
+ public void cleanup(JobAutoScalerContext<JobID> context) {
+ // No cleanup required for the test
+ }
+ };
+
+ try (var autoscalerExecutor =
+ new StandaloneAutoscalerExecutor<>(
+ new Configuration(), baseConf -> jobList,
eventCollector, jobAutoScaler)) {
+
+ List<CompletableFuture<Void>> scaledFutures =
autoscalerExecutor.scaling();
+
+ for (CompletableFuture<Void> scaledFuture : scaledFutures) {
+ scaledFuture.get();
+ }
+
+ // Verification triggers two scaling tasks
+ assertThat(scaledFutures).hasSize(2);
+
+ // Only legally configured tasks are scaled
+
assertThat(scaleCounter).hasSize(1).containsKey(validJob.getJobKey());
+
+ // Verification Event Collector captures an event
+ assertThat(eventCollector.events).hasSize(1);
+ assertThat(eventCollector.events)
+ .allMatch(event -> event.getContext().equals(invalidJob));
+ }
+ }
+
+ private JobAutoScalerContext<JobID>
createJobAutoScalerContext(Configuration configuration) {
+ JobID jobID = new JobID();
+ return new JobAutoScalerContext<>(
+ jobID, jobID, JobStatus.RUNNING, configuration, null, null);
+ }
+}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
new file mode 100644
index 00000000..185d6a20
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.validation;
+
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.utils.CalendarUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Optional;
+
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
+import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
+
+/** Validator for Autoscaler. */
+public class AutoscalerValidator {
+
+ /**
+ * Validate autoscaler config and return optional error.
+ *
+ * @param flinkConf autoscaler config
+ * @return Optional error string, should be present iff validation
resulted in an error
+ */
+ public Optional<String> validateAutoscalerOptions(Configuration flinkConf)
{
+
+ if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
+ return Optional.empty();
+ }
+ return firstPresent(
+ validateNumber(flinkConf,
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
+ validateNumber(flinkConf,
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
+ validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
+ validateNumber(flinkConf,
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
+ validateNumber(flinkConf, UTILIZATION_MAX,
flinkConf.get(UTILIZATION_TARGET), 1.0d),
+ validateNumber(flinkConf, UTILIZATION_MIN, 0.0d,
flinkConf.get(UTILIZATION_TARGET)),
+ validateNumber(flinkConf,
OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
+ CalendarUtils.validateExcludedPeriods(flinkConf));
+ }
+
+ @SafeVarargs
+ private static Optional<String> firstPresent(Optional<String>... errOpts) {
+ for (Optional<String> opt : errOpts) {
+ if (opt.isPresent()) {
+ return opt;
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static <T extends Number> Optional<String> validateNumber(
+ Configuration flinkConfiguration,
+ ConfigOption<T> autoScalerConfig,
+ Double min,
+ Double max) {
+ try {
+ var configValue = flinkConfiguration.get(autoScalerConfig);
+ if (configValue != null) {
+ double value = configValue.doubleValue();
+ if ((min != null && value < min) || (max != null && value >
max)) {
+ return Optional.of(
+ String.format(
+ "The AutoScalerOption %s is invalid, it
should be a value within the range [%s, %s]",
+ autoScalerConfig.key(),
+ min != null ? min.toString() : "-Infinity",
+ max != null ? max.toString() :
"+Infinity"));
+ }
+ }
+ return Optional.empty();
+ } catch (IllegalArgumentException e) {
+ return Optional.of(
+ String.format(
+ "Invalid value in the autoscaler config %s",
autoScalerConfig.key()));
+ }
+ }
+
+ private static <T extends Number> Optional<String> validateNumber(
+ Configuration flinkConfiguration, ConfigOption<T>
autoScalerConfig, Double min) {
+ return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index f05f6803..225bc658 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -17,10 +17,8 @@
package org.apache.flink.kubernetes.operator.validation;
-import org.apache.flink.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.autoscaler.utils.CalendarUtils;
+import org.apache.flink.autoscaler.validation.AutoscalerValidator;
import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -65,11 +63,6 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
-import static
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
-
/** Default validator implementation for {@link FlinkDeployment}. */
public class DefaultValidator implements FlinkResourceValidator {
@@ -88,9 +81,11 @@ public class DefaultValidator implements
FlinkResourceValidator {
Set.of(Constants.CONFIG_FILE_LOG4J_NAME,
Constants.CONFIG_FILE_LOGBACK_NAME);
private final FlinkConfigManager configManager;
+ private final AutoscalerValidator autoscalerValidator;
public DefaultValidator(FlinkConfigManager configManager) {
this.configManager = configManager;
+ this.autoscalerValidator = new AutoscalerValidator();
}
@Override
@@ -598,63 +593,12 @@ public class DefaultValidator implements
FlinkResourceValidator {
return Optional.empty();
}
- public static Optional<String> validateAutoScalerFlinkConfiguration(
+ public Optional<String> validateAutoScalerFlinkConfiguration(
Map<String, String> effectiveConfig) {
if (effectiveConfig == null) {
return Optional.empty();
}
Configuration flinkConfiguration =
Configuration.fromMap(effectiveConfig);
- if
(!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
- return Optional.empty();
- }
- return firstPresent(
- validateNumber(flinkConfiguration,
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
- validateNumber(flinkConfiguration,
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
- validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d,
1.0d),
- validateNumber(
- flinkConfiguration,
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
- validateNumber(
- flinkConfiguration,
- UTILIZATION_MAX,
- flinkConfiguration.get(UTILIZATION_TARGET),
- 1.0d),
- validateNumber(
- flinkConfiguration,
- UTILIZATION_MIN,
- 0.0d,
- flinkConfiguration.get(UTILIZATION_TARGET)),
- validateNumber(flinkConfiguration,
OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
- CalendarUtils.validateExcludedPeriods(flinkConfiguration));
- }
-
- private static <T extends Number> Optional<String> validateNumber(
- Configuration flinkConfiguration,
- ConfigOption<T> autoScalerConfig,
- Double min,
- Double max) {
- try {
- var configValue = flinkConfiguration.get(autoScalerConfig);
- if (configValue != null) {
- double value = configValue.doubleValue();
- if ((min != null && value < min) || (max != null && value >
max)) {
- return Optional.of(
- String.format(
- "The AutoScalerOption %s is invalid, it
should be a value within the range [%s, %s]",
- autoScalerConfig.key(),
- min != null ? min.toString() : "-Infinity",
- max != null ? max.toString() :
"+Infinity"));
- }
- }
- return Optional.empty();
- } catch (IllegalArgumentException e) {
- return Optional.of(
- String.format(
- "Invalid value in the autoscaler config %s",
autoScalerConfig.key()));
- }
- }
-
- private static <T extends Number> Optional<String> validateNumber(
- Configuration flinkConfiguration, ConfigOption<T>
autoScalerConfig, Double min) {
- return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
+ return
autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
}
}