This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 84c76839 [FLINK-37126] Add Validator for Autoscaler
84c76839 is described below

commit 84c76839591d7b49ed17a85cbc784d850093b407
Author: big face cat <[email protected]>
AuthorDate: Mon Nov 3 20:45:37 2025 +0800

    [FLINK-37126] Add Validator for Autoscaler
---
 .../standalone/StandaloneAutoscalerExecutor.java   |  19 +++-
 .../StandaloneAutoscalerValidatorTest.java         | 109 +++++++++++++++++++++
 .../autoscaler/validation/AutoscalerValidator.java |  98 ++++++++++++++++++
 .../operator/validation/DefaultValidator.java      |  66 +------------
 4 files changed, 230 insertions(+), 62 deletions(-)

diff --git 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
index 4a837438..7e13be92 100644
--- 
a/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
+++ 
b/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerExecutor.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.validation.AutoscalerValidator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -40,6 +41,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,6 +52,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
 import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_INTERVAL;
 import static 
org.apache.flink.autoscaler.standalone.config.AutoscalerStandaloneOptions.CONTROL_LOOP_PARALLELISM;
 
@@ -68,6 +71,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
     private final ScheduledExecutorService scheduledExecutorService;
     private final ExecutorService scalingThreadPool;
     private final UnmodifiableConfiguration baseConf;
+    private final AutoscalerValidator autoscalerValidator;
 
     /**
      * Maintain a set of job keys that during scaling, it should be accessed 
at {@link
@@ -103,6 +107,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
                         parallelism, new 
ExecutorThreadFactory("autoscaler-standalone-scaling"));
         this.scalingJobKeys = new HashSet<>();
         this.baseConf = new UnmodifiableConfiguration(conf);
+        this.autoscalerValidator = new AutoscalerValidator();
     }
 
     public void start() {
@@ -188,7 +193,19 @@ public class StandaloneAutoscalerExecutor<KEY, Context 
extends JobAutoScalerCont
     protected void scalingSingleJob(Context jobContext) {
         try {
             MDC.put("job.key", jobContext.getJobKey().toString());
-            autoScaler.scale(jobContext);
+            Optional<String> validationError =
+                    
autoscalerValidator.validateAutoscalerOptions(jobContext.getConfiguration());
+            if (validationError.isPresent()) {
+                eventHandler.handleEvent(
+                        jobContext,
+                        AutoScalerEventHandler.Type.Warning,
+                        "AutoScaler Options Validation",
+                        validationError.get(),
+                        null,
+                        baseConf.get(SCALING_EVENT_INTERVAL));
+            } else {
+                autoScaler.scale(jobContext);
+            }
         } catch (Throwable e) {
             LOG.error("Error while scaling job", e);
             eventHandler.handleException(jobContext, AUTOSCALER_ERROR, e);
diff --git 
a/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java
 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java
new file mode 100644
index 00000000..7ff6d4a2
--- /dev/null
+++ 
b/flink-autoscaler-standalone/src/test/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerValidatorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.standalone;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.JobAutoScaler;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class StandaloneAutoscalerValidatorTest {
+    private List<JobAutoScalerContext<JobID>> jobList;
+    private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector;
+    private ConcurrentHashMap<JobID, Integer> scaleCounter;
+    private Configuration correctConfiguration;
+    private Configuration invalidConfiguration;
+
+    @BeforeEach
+    void setUp() {
+        jobList = new ArrayList<>();
+        eventCollector = new TestingEventCollector<>();
+        scaleCounter = new ConcurrentHashMap<>();
+
+        correctConfiguration = new Configuration();
+        correctConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+
+        invalidConfiguration = new Configuration();
+        invalidConfiguration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
+        invalidConfiguration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 
-1.0);
+    }
+
+    @Test
+    void testAutoScalerWithInvalidConfig() throws Exception {
+        JobAutoScalerContext<JobID> validJob = 
createJobAutoScalerContext(correctConfiguration);
+        JobAutoScalerContext<JobID> invalidJob = 
createJobAutoScalerContext(invalidConfiguration);
+
+        jobList.add(validJob);
+        jobList.add(invalidJob);
+
+        final var jobAutoScaler =
+                new JobAutoScaler<JobID, JobAutoScalerContext<JobID>>() {
+                    @Override
+                    public void scale(JobAutoScalerContext<JobID> context)
+                            throws InterruptedException {
+                        scaleCounter.merge(context.getJobKey(), 1, 
Integer::sum);
+                    }
+
+                    @Override
+                    public void cleanup(JobAutoScalerContext<JobID> context) {
+                        // No cleanup required for the test
+                    }
+                };
+
+        try (var autoscalerExecutor =
+                new StandaloneAutoscalerExecutor<>(
+                        new Configuration(), baseConf -> jobList, 
eventCollector, jobAutoScaler)) {
+
+            List<CompletableFuture<Void>> scaledFutures = 
autoscalerExecutor.scaling();
+
+            for (CompletableFuture<Void> scaledFuture : scaledFutures) {
+                scaledFuture.get();
+            }
+
+            // Verification triggers two scaling tasks
+            assertThat(scaledFutures).hasSize(2);
+
+            // Only legally configured tasks are scaled
+            
assertThat(scaleCounter).hasSize(1).containsKey(validJob.getJobKey());
+
+            // Verification Event Collector captures an event
+            assertThat(eventCollector.events).hasSize(1);
+            assertThat(eventCollector.events)
+                    .allMatch(event -> event.getContext().equals(invalidJob));
+        }
+    }
+
+    private JobAutoScalerContext<JobID> 
createJobAutoScalerContext(Configuration configuration) {
+        JobID jobID = new JobID();
+        return new JobAutoScalerContext<>(
+                jobID, jobID, JobStatus.RUNNING, configuration, null, null);
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
new file mode 100644
index 00000000..185d6a20
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/validation/AutoscalerValidator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.validation;
+
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.utils.CalendarUtils;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
+import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
+
+/** Validator for Autoscaler. */
+public class AutoscalerValidator {
+
+    /**
+     * Validate autoscaler config and return optional error.
+     *
+     * @param flinkConf autoscaler config
+     * @return Optional error string, should be present iff validation 
resulted in an error
+     */
+    public Optional<String> validateAutoscalerOptions(Configuration flinkConf) 
{
+
+        if (!flinkConf.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
+            return Optional.empty();
+        }
+        return firstPresent(
+                validateNumber(flinkConf, 
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
+                validateNumber(flinkConf, 
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
+                validateNumber(flinkConf, UTILIZATION_TARGET, 0.0d, 1.0d),
+                validateNumber(flinkConf, 
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
+                validateNumber(flinkConf, UTILIZATION_MAX, 
flinkConf.get(UTILIZATION_TARGET), 1.0d),
+                validateNumber(flinkConf, UTILIZATION_MIN, 0.0d, 
flinkConf.get(UTILIZATION_TARGET)),
+                validateNumber(flinkConf, 
OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
+                CalendarUtils.validateExcludedPeriods(flinkConf));
+    }
+
+    @SafeVarargs
+    private static Optional<String> firstPresent(Optional<String>... errOpts) {
+        for (Optional<String> opt : errOpts) {
+            if (opt.isPresent()) {
+                return opt;
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static <T extends Number> Optional<String> validateNumber(
+            Configuration flinkConfiguration,
+            ConfigOption<T> autoScalerConfig,
+            Double min,
+            Double max) {
+        try {
+            var configValue = flinkConfiguration.get(autoScalerConfig);
+            if (configValue != null) {
+                double value = configValue.doubleValue();
+                if ((min != null && value < min) || (max != null && value > 
max)) {
+                    return Optional.of(
+                            String.format(
+                                    "The AutoScalerOption %s is invalid, it 
should be a value within the range [%s, %s]",
+                                    autoScalerConfig.key(),
+                                    min != null ? min.toString() : "-Infinity",
+                                    max != null ? max.toString() : 
"+Infinity"));
+                }
+            }
+            return Optional.empty();
+        } catch (IllegalArgumentException e) {
+            return Optional.of(
+                    String.format(
+                            "Invalid value in the autoscaler config %s", 
autoScalerConfig.key()));
+        }
+    }
+
+    private static <T extends Number> Optional<String> validateNumber(
+            Configuration flinkConfiguration, ConfigOption<T> 
autoScalerConfig, Double min) {
+        return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index f05f6803..225bc658 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -17,10 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
-import org.apache.flink.autoscaler.config.AutoScalerOptions;
-import org.apache.flink.autoscaler.utils.CalendarUtils;
+import org.apache.flink.autoscaler.validation.AutoscalerValidator;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -65,11 +63,6 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.OBSERVED_SCALABILITY_COEFFICIENT_MIN;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MAX;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_MIN;
-import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.UTILIZATION_TARGET;
-
 /** Default validator implementation for {@link FlinkDeployment}. */
 public class DefaultValidator implements FlinkResourceValidator {
 
@@ -88,9 +81,11 @@ public class DefaultValidator implements 
FlinkResourceValidator {
             Set.of(Constants.CONFIG_FILE_LOG4J_NAME, 
Constants.CONFIG_FILE_LOGBACK_NAME);
 
     private final FlinkConfigManager configManager;
+    private final AutoscalerValidator autoscalerValidator;
 
     public DefaultValidator(FlinkConfigManager configManager) {
         this.configManager = configManager;
+        this.autoscalerValidator = new AutoscalerValidator();
     }
 
     @Override
@@ -598,63 +593,12 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         return Optional.empty();
     }
 
-    public static Optional<String> validateAutoScalerFlinkConfiguration(
+    public Optional<String> validateAutoScalerFlinkConfiguration(
             Map<String, String> effectiveConfig) {
         if (effectiveConfig == null) {
             return Optional.empty();
         }
         Configuration flinkConfiguration = 
Configuration.fromMap(effectiveConfig);
-        if 
(!flinkConfiguration.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
-            return Optional.empty();
-        }
-        return firstPresent(
-                validateNumber(flinkConfiguration, 
AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.0d),
-                validateNumber(flinkConfiguration, 
AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.0d),
-                validateNumber(flinkConfiguration, UTILIZATION_TARGET, 0.0d, 
1.0d),
-                validateNumber(
-                        flinkConfiguration, 
AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.0d),
-                validateNumber(
-                        flinkConfiguration,
-                        UTILIZATION_MAX,
-                        flinkConfiguration.get(UTILIZATION_TARGET),
-                        1.0d),
-                validateNumber(
-                        flinkConfiguration,
-                        UTILIZATION_MIN,
-                        0.0d,
-                        flinkConfiguration.get(UTILIZATION_TARGET)),
-                validateNumber(flinkConfiguration, 
OBSERVED_SCALABILITY_COEFFICIENT_MIN, 0.01d, 1d),
-                CalendarUtils.validateExcludedPeriods(flinkConfiguration));
-    }
-
-    private static <T extends Number> Optional<String> validateNumber(
-            Configuration flinkConfiguration,
-            ConfigOption<T> autoScalerConfig,
-            Double min,
-            Double max) {
-        try {
-            var configValue = flinkConfiguration.get(autoScalerConfig);
-            if (configValue != null) {
-                double value = configValue.doubleValue();
-                if ((min != null && value < min) || (max != null && value > 
max)) {
-                    return Optional.of(
-                            String.format(
-                                    "The AutoScalerOption %s is invalid, it 
should be a value within the range [%s, %s]",
-                                    autoScalerConfig.key(),
-                                    min != null ? min.toString() : "-Infinity",
-                                    max != null ? max.toString() : 
"+Infinity"));
-                }
-            }
-            return Optional.empty();
-        } catch (IllegalArgumentException e) {
-            return Optional.of(
-                    String.format(
-                            "Invalid value in the autoscaler config %s", 
autoScalerConfig.key()));
-        }
-    }
-
-    private static <T extends Number> Optional<String> validateNumber(
-            Configuration flinkConfiguration, ConfigOption<T> 
autoScalerConfig, Double min) {
-        return validateNumber(flinkConfiguration, autoScalerConfig, min, null);
+        return 
autoscalerValidator.validateAutoscalerOptions(flinkConfiguration);
     }
 }

Reply via email to