This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 83b5c29ed5be4e40592805ee377a4a15f92ce64b Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Tue Mar 3 18:12:03 2020 +0800 [FLINK-16326] [core] Validate strictly required Flink configs in StatefulFunctionsConfig --- .../flink/core/StatefulFunctionsConfig.java | 2 + .../core/StatefulFunctionsConfigValidator.java | 76 ++++++++++++++++++++++ .../StatefulFunctionsInvalidConfigException.java | 29 +++++++++ .../flink/core/StatefulFunctionsConfigTest.java | 13 ++++ 4 files changed, 120 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java index ce40be0..7486fa3 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java @@ -138,6 +138,8 @@ public class StatefulFunctionsConfig implements Serializable { * @param configuration a configuration to read the values from */ public StatefulFunctionsConfig(Configuration configuration) { + StatefulFunctionsConfigValidator.validate(configuration); + this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER); this.flinkJobName = configuration.get(FLINK_JOB_NAME); this.feedbackBufferSize = configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING); diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java new file mode 100644 index 0000000..9a7b0d1 --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.flink.core; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; + +public final class StatefulFunctionsConfigValidator { + + private StatefulFunctionsConfigValidator() {} + + public static final List<String> PARENT_FIRST_CLASSLOADER_PATTERNS = + Collections.unmodifiableList( + Arrays.asList("org.apache.flink.statefun", "org.apache.kafka", "com.google.protobuf")); + + public static final int MAX_CONCURRENT_CHECKPOINTS = 1; + + static void validate(Configuration configuration) { + validateParentFirstClassloaderPatterns(configuration); + validateMaxConcurrentCheckpoints(configuration); + } + + private static void validateParentFirstClassloaderPatterns(Configuration configuration) { + final Set<String> parentFirstClassloaderPatterns = + parentFirstClassloaderPatterns(configuration); + if (!parentFirstClassloaderPatterns.containsAll(PARENT_FIRST_CLASSLOADER_PATTERNS)) { + throw new StatefulFunctionsInvalidConfigException( + CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, + "Must contain all of " + String.join(", ", PARENT_FIRST_CLASSLOADER_PATTERNS)); + } + } + + private static void validateMaxConcurrentCheckpoints(Configuration configuration) { + final int maxConcurrentCheckpoints = + configuration.get(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS); + if (maxConcurrentCheckpoints != 1) { + throw new StatefulFunctionsInvalidConfigException( + ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, + "Value must be 1, Stateful Functions does not support concurrent checkpoints."); + } + } + + private static Set<String> parentFirstClassloaderPatterns(Configuration configuration) { + final String[] split = + configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";"); + final Set<String> parentFirstClassloaderPatterns = new HashSet<>(split.length); + for (String s : split) { + parentFirstClassloaderPatterns.add(s.trim().toLowerCase(Locale.ENGLISH)); + } + return parentFirstClassloaderPatterns; + } +} diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/exceptions/StatefulFunctionsInvalidConfigException.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/exceptions/StatefulFunctionsInvalidConfigException.java new file mode 100644 index 0000000..13568cb --- /dev/null +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/exceptions/StatefulFunctionsInvalidConfigException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.flink.core.exceptions; + +import org.apache.flink.configuration.ConfigOption; + +public final class StatefulFunctionsInvalidConfigException extends IllegalArgumentException { + + private static final long serialVersionUID = 1L; + + public StatefulFunctionsInvalidConfigException(ConfigOption<?> invalidConfig, String message) { + super(String.format("Invalid configuration: %s; %s", invalidConfig.key(), message)); + } +} diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java index 89629d3..90dad85 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java @@ -18,8 +18,11 @@ package org.apache.flink.statefun.flink.core; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException; import org.apache.flink.statefun.flink.core.message.MessageFactoryType; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -38,6 +41,10 @@ public class StatefulFunctionsConfigTest { StatefulFunctionsConfig.TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING, MemorySize.ofMebiBytes(100)); configuration.set(StatefulFunctionsConfig.ASYNC_MAX_OPERATIONS_PER_TASK, 100); + configuration.set( + CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, + "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"); + configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1); configuration.setString("statefun.module.global-config.key1", "value1"); configuration.setString("statefun.module.global-config.key2", "value2"); @@ -52,4 +59,10 @@ public class StatefulFunctionsConfigTest { Assert.assertThat( stateFunConfig.getGlobalConfigurations(), Matchers.hasEntry("key2", "value2")); } + + @Test(expected = StatefulFunctionsInvalidConfigException.class) + public void invalidStrictFlinkConfigsThrows() { + Configuration configuration = new Configuration(); + new StatefulFunctionsConfig(configuration); + } }