This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 2529d272d29db12fe9835e3c8260a661ae011e28 Author: Igal Shilman <[email protected]> AuthorDate: Thu Apr 2 00:01:51 2020 +0200 [FLINK-16927][core] Add legacy scheduler property validation This commit requires that the value of `jobmanager.scheduler` configuration parameter is set to legacy. This closes #91 --- .../core/StatefulFunctionsConfigValidator.java | 10 ++++++++ .../flink/core/StatefulFunctionsConfigTest.java | 28 ++++++++++++++++++++++ .../flink/statefun/flink/harness/Harness.java | 3 +++ 3 files changed, 41 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java index 9a7b0d1..88cc943 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java @@ -26,6 +26,7 @@ import java.util.Locale; import java.util.Set; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; @@ -42,6 +43,7 @@ public final class StatefulFunctionsConfigValidator { static void validate(Configuration configuration) { validateParentFirstClassloaderPatterns(configuration); validateMaxConcurrentCheckpoints(configuration); + validateLegacyScheduler(configuration); } private static void validateParentFirstClassloaderPatterns(Configuration configuration) { @@ -64,6 +66,14 @@ public final class StatefulFunctionsConfigValidator { } } + private static void validateLegacyScheduler(Configuration configuration) { + String configuredScheduler = configuration.get(JobManagerOptions.SCHEDULER); + if (!"legacy".equalsIgnoreCase(configuredScheduler)) { + throw new StatefulFunctionsInvalidConfigException( + JobManagerOptions.SCHEDULER, "Currently the only supported scheduler is 'legacy'"); + } + } + private static Set<String> parentFirstClassloaderPatterns(Configuration configuration) { final String[] split = configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";"); diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java index 90dad85..cc7f8f3 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java @@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException; import org.apache.flink.statefun.flink.core.message.MessageFactoryType; @@ -44,6 +45,7 @@ public class StatefulFunctionsConfigTest { configuration.set( CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"); + configuration.set(JobManagerOptions.SCHEDULER, "legacy"); configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1); configuration.setString("statefun.module.global-config.key1", "value1"); configuration.setString("statefun.module.global-config.key2", "value2"); @@ -61,8 +63,34 @@ public class StatefulFunctionsConfigTest { } @Test(expected = StatefulFunctionsInvalidConfigException.class) + public void testMissingScheduler() { + Configuration configuration = validConfiguration(); + + configuration.removeConfig(JobManagerOptions.SCHEDULER); + + new StatefulFunctionsConfig(configuration); + } + + @Test(expected = StatefulFunctionsInvalidConfigException.class) public void invalidStrictFlinkConfigsThrows() { Configuration configuration = new Configuration(); new StatefulFunctionsConfig(configuration); } + + private static Configuration validConfiguration() { + Configuration configuration = new Configuration(); + configuration.set(StatefulFunctionsConfig.FLINK_JOB_NAME, "name"); + configuration.set( + StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, MessageFactoryType.WITH_KRYO_PAYLOADS); + configuration.set( + StatefulFunctionsConfig.TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING, + MemorySize.ofMebiBytes(100)); + configuration.set(StatefulFunctionsConfig.ASYNC_MAX_OPERATIONS_PER_TASK, 100); + configuration.set( + CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, + "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"); + configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1); + configuration.set(JobManagerOptions.SCHEDULER, "legacy"); + return configuration; + } } diff --git a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java index 6fe3856..bcab218 100644 --- a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java +++ b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.Objects; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.SchedulerNGFactoryFactory; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator; import org.apache.flink.statefun.flink.core.StatefulFunctionsJob; @@ -167,5 +169,6 @@ public class Harness { flinkConfig.set( ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, StatefulFunctionsConfigValidator.MAX_CONCURRENT_CHECKPOINTS); + flinkConfig.set(JobManagerOptions.SCHEDULER, SchedulerNGFactoryFactory.SCHEDULER_TYPE_LEGACY); } }
