This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 83b5c29ed5be4e40592805ee377a4a15f92ce64b
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Tue Mar 3 18:12:03 2020 +0800

    [FLINK-16326] [core] Validate strictly required Flink configs in 
StatefulFunctionsConfig
---
 .../flink/core/StatefulFunctionsConfig.java        |  2 +
 .../core/StatefulFunctionsConfigValidator.java     | 76 ++++++++++++++++++++++
 .../StatefulFunctionsInvalidConfigException.java   | 29 +++++++++
 .../flink/core/StatefulFunctionsConfigTest.java    | 13 ++++
 4 files changed, 120 insertions(+)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index ce40be0..7486fa3 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -138,6 +138,8 @@ public class StatefulFunctionsConfig implements 
Serializable {
    * @param configuration a configuration to read the values from
    */
   public StatefulFunctionsConfig(Configuration configuration) {
+    StatefulFunctionsConfigValidator.validate(configuration);
+
     this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
     this.feedbackBufferSize = 
configuration.get(TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING);
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
new file mode 100644
index 0000000..9a7b0d1
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import 
org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+
+public final class StatefulFunctionsConfigValidator {
+
+  private StatefulFunctionsConfigValidator() {}
+
+  public static final List<String> PARENT_FIRST_CLASSLOADER_PATTERNS =
+      Collections.unmodifiableList(
+          Arrays.asList("org.apache.flink.statefun", "org.apache.kafka", 
"com.google.protobuf"));
+
+  public static final int MAX_CONCURRENT_CHECKPOINTS = 1;
+
+  static void validate(Configuration configuration) {
+    validateParentFirstClassloaderPatterns(configuration);
+    validateMaxConcurrentCheckpoints(configuration);
+  }
+
+  private static void validateParentFirstClassloaderPatterns(Configuration 
configuration) {
+    final Set<String> parentFirstClassloaderPatterns =
+        parentFirstClassloaderPatterns(configuration);
+    if 
(!parentFirstClassloaderPatterns.containsAll(PARENT_FIRST_CLASSLOADER_PATTERNS))
 {
+      throw new StatefulFunctionsInvalidConfigException(
+          CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+          "Must contain all of " + String.join(", ", 
PARENT_FIRST_CLASSLOADER_PATTERNS));
+    }
+  }
+
+  private static void validateMaxConcurrentCheckpoints(Configuration 
configuration) {
+    final int maxConcurrentCheckpoints =
+        
configuration.get(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS);
+    if (maxConcurrentCheckpoints != 1) {
+      throw new StatefulFunctionsInvalidConfigException(
+          ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
+          "Value must be 1, Stateful Functions does not support concurrent 
checkpoints.");
+    }
+  }
+
+  private static Set<String> parentFirstClassloaderPatterns(Configuration 
configuration) {
+    final String[] split =
+        
configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";");
+    final Set<String> parentFirstClassloaderPatterns = new 
HashSet<>(split.length);
+    for (String s : split) {
+      parentFirstClassloaderPatterns.add(s.trim().toLowerCase(Locale.ENGLISH));
+    }
+    return parentFirstClassloaderPatterns;
+  }
+}
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/exceptions/StatefulFunctionsInvalidConfigException.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/exceptions/StatefulFunctionsInvalidConfigException.java
new file mode 100644
index 0000000..13568cb
--- /dev/null
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/exceptions/StatefulFunctionsInvalidConfigException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.flink.core.exceptions;
+
+import org.apache.flink.configuration.ConfigOption;
+
+public final class StatefulFunctionsInvalidConfigException extends 
IllegalArgumentException {
+
+  private static final long serialVersionUID = 1L;
+
+  public StatefulFunctionsInvalidConfigException(ConfigOption<?> 
invalidConfig, String message) {
+    super(String.format("Invalid configuration: %s; %s", invalidConfig.key(), 
message));
+  }
+}
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index 89629d3..90dad85 100644
--- 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -18,8 +18,11 @@
 package org.apache.flink.statefun.flink.core;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.MemorySize;
+import 
org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
+import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,6 +41,10 @@ public class StatefulFunctionsConfigTest {
         StatefulFunctionsConfig.TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING,
         MemorySize.ofMebiBytes(100));
     configuration.set(StatefulFunctionsConfig.ASYNC_MAX_OPERATIONS_PER_TASK, 
100);
+    configuration.set(
+        CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+        "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
+    
configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
     configuration.setString("statefun.module.global-config.key1", "value1");
     configuration.setString("statefun.module.global-config.key2", "value2");
 
@@ -52,4 +59,10 @@ public class StatefulFunctionsConfigTest {
     Assert.assertThat(
         stateFunConfig.getGlobalConfigurations(), Matchers.hasEntry("key2", 
"value2"));
   }
+
+  @Test(expected = StatefulFunctionsInvalidConfigException.class)
+  public void invalidStrictFlinkConfigsThrows() {
+    Configuration configuration = new Configuration();
+    new StatefulFunctionsConfig(configuration);
+  }
 }

Reply via email to