This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 94ea5e6ca9a1a0c6dda6b832473e40578207e78b Author: godfreyhe <godfre...@163.com> AuthorDate: Sun Jun 14 10:51:05 2020 +0800 [FLINK-18161][sql-client] Fix configurations from flink-conf.yaml overwrite sql-client's properties This closes #12643 --- .../client/gateway/local/ExecutionContext.java | 64 +++++++++++++++++----- .../client/gateway/local/ExecutionContextTest.java | 37 ++++++++----- .../client/gateway/local/LocalExecutorITCase.java | 4 +- 3 files changed, 76 insertions(+), 29 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index 6198440..0a444eb2 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -19,6 +19,11 @@ package org.apache.flink.table.client.gateway.local; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.restartstrategy.RestartStrategies.FailureRateRestartStrategyConfiguration; +import org.apache.flink.api.common.restartstrategy.RestartStrategies.FallbackRestartStrategyConfiguration; +import org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration; +import org.apache.flink.api.common.restartstrategy.RestartStrategies.NoRestartStrategyConfiguration; +import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.dag.Pipeline; import org.apache.flink.api.java.ExecutionEnvironment; @@ -32,8 +37,12 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamPipelineOptions; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableConfig; @@ -101,6 +110,7 @@ import java.lang.reflect.Method; import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -524,15 +534,52 @@ public class ExecutionContext<ClusterID> { private TableConfig createTableConfig() { final TableConfig config = new TableConfig(); config.addConfiguration(flinkConfig); - environment.getConfiguration().asMap().forEach((k, v) -> - config.getConfiguration().setString(k, v)); + Configuration conf = config.getConfiguration(); + environment.getConfiguration().asMap().forEach(conf::setString); ExecutionEntry execution = environment.getExecution(); config.setIdleStateRetentionTime( Time.milliseconds(execution.getMinStateRetention()), Time.milliseconds(execution.getMaxStateRetention())); + + conf.set(CoreOptions.DEFAULT_PARALLELISM, execution.getParallelism()); + conf.set(PipelineOptions.MAX_PARALLELISM, execution.getMaxParallelism()); + conf.set(StreamPipelineOptions.TIME_CHARACTERISTIC, execution.getTimeCharacteristic()); + if (execution.getTimeCharacteristic() == TimeCharacteristic.EventTime) { + conf.set(PipelineOptions.AUTO_WATERMARK_INTERVAL, + Duration.ofMillis(execution.getPeriodicWatermarksInterval())); + } + + setRestartStrategy(conf); return config; } + private void setRestartStrategy(Configuration conf) { + RestartStrategyConfiguration restartStrategy = environment.getExecution().getRestartStrategy(); + if (restartStrategy instanceof NoRestartStrategyConfiguration) { + conf.set(RestartStrategyOptions.RESTART_STRATEGY, "none"); + } else if (restartStrategy instanceof FixedDelayRestartStrategyConfiguration) { + conf.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + FixedDelayRestartStrategyConfiguration fixedDelay = ((FixedDelayRestartStrategyConfiguration) restartStrategy); + conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, + fixedDelay.getRestartAttempts()); + conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, + Duration.ofMillis(fixedDelay.getDelayBetweenAttemptsInterval().toMilliseconds())); + } else if (restartStrategy instanceof FailureRateRestartStrategyConfiguration) { + conf.set(RestartStrategyOptions.RESTART_STRATEGY, "failure-rate"); + FailureRateRestartStrategyConfiguration failureRate = (FailureRateRestartStrategyConfiguration) restartStrategy; + conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, + failureRate.getMaxFailureRate()); + conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, + Duration.ofMillis(failureRate.getFailureInterval().toMilliseconds())); + conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY, + Duration.ofMillis(failureRate.getDelayBetweenAttemptsInterval().toMilliseconds())); + } else if (restartStrategy instanceof FallbackRestartStrategyConfiguration) { + // default is FallbackRestartStrategyConfiguration + // see ExecutionConfig.restartStrategyConfiguration + conf.removeConfig(RestartStrategyOptions.RESTART_STRATEGY); + } + } + private void createTableEnvironment( EnvironmentSettings settings, TableConfig config, @@ -555,7 +602,7 @@ public class ExecutionContext<ClusterID> { functionCatalog); } else if (environment.getExecution().isBatchPlanner()) { streamExecEnv = null; - execEnv = createExecutionEnvironment(); + execEnv = ExecutionEnvironment.getExecutionEnvironment(); executor = null; tableEnv = new BatchTableEnvironmentImpl( execEnv, @@ -630,18 +677,9 @@ public class ExecutionContext<ClusterID> { database.ifPresent(tableEnv::useDatabase); } - private ExecutionEnvironment createExecutionEnvironment() { - final ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment(); - execEnv.setRestartStrategy(environment.getExecution().getRestartStrategy()); - execEnv.setParallelism(environment.getExecution().getParallelism()); - return execEnv; - } - private StreamExecutionEnvironment createStreamExecutionEnvironment() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setRestartStrategy(environment.getExecution().getRestartStrategy()); - env.setParallelism(environment.getExecution().getParallelism()); - env.setMaxParallelism(environment.getExecution().getMaxParallelism()); + // for TimeCharacteristic validation in StreamTableEnvironmentImpl env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) { env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java index 7beee2a..d737746 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java @@ -18,13 +18,17 @@ package org.apache.flink.table.client.gateway.local; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.client.cli.DefaultCLI; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.python.PythonFunctionFactory; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamPipelineOptions; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -49,6 +53,7 @@ import org.apache.commons.cli.Options; import org.junit.Test; import java.net.URL; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -79,21 +84,25 @@ public class ExecutionContextTest { @Test public void testExecutionConfig() throws Exception { final ExecutionContext<?> context = createDefaultExecutionContext(); - final ExecutionConfig config = context.getExecutionConfig(); + final TableEnvironment tableEnv = context.getTableEnvironment(); + final TableConfig tableConfig = tableEnv.getConfig(); - assertEquals(99, config.getAutoWatermarkInterval()); + assertEquals(1_000, tableConfig.getMinIdleStateRetentionTime()); + assertEquals(600_000, tableConfig.getMaxIdleStateRetentionTime()); + Configuration conf = tableConfig.getConfiguration(); - final RestartStrategies.RestartStrategyConfiguration restartConfig = config.getRestartStrategy(); - assertTrue(restartConfig instanceof RestartStrategies.FailureRateRestartStrategyConfiguration); - final RestartStrategies.FailureRateRestartStrategyConfiguration failureRateStrategy = - (RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig; - assertEquals(10, failureRateStrategy.getMaxFailureRate()); - assertEquals(99_000, failureRateStrategy.getFailureInterval().toMilliseconds()); - assertEquals(1_000, failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds()); + assertEquals(1, conf.getInteger(CoreOptions.DEFAULT_PARALLELISM)); + assertEquals(16, conf.getInteger(PipelineOptions.MAX_PARALLELISM)); - final TableEnvironment tableEnv = context.getTableEnvironment(); - assertEquals(1_000, tableEnv.getConfig().getMinIdleStateRetentionTime()); - assertEquals(600_000, tableEnv.getConfig().getMaxIdleStateRetentionTime()); + assertEquals(TimeCharacteristic.EventTime, conf.get(StreamPipelineOptions.TIME_CHARACTERISTIC)); + assertEquals(Duration.ofMillis(99), conf.get(PipelineOptions.AUTO_WATERMARK_INTERVAL)); + + assertEquals("failure-rate", conf.getString(RestartStrategyOptions.RESTART_STRATEGY)); + assertEquals(10, conf.getInteger( + RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL)); + assertEquals(Duration.ofMillis(99_000), conf.get( + RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL)); + assertEquals(Duration.ofMillis(1_000), conf.get(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY)); } @Test diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index d4c7fac..6a7f9ca 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -404,8 +404,8 @@ public class LocalExecutorITCase extends TestLogger { expectedProperties.put("execution.periodic-watermarks-interval", "99"); expectedProperties.put("execution.parallelism", "1"); expectedProperties.put("execution.max-parallelism", "16"); - expectedProperties.put("execution.max-idle-state-retention", "0"); - expectedProperties.put("execution.min-idle-state-retention", "0"); + expectedProperties.put("execution.max-idle-state-retention", "600000"); + expectedProperties.put("execution.min-idle-state-retention", "1000"); expectedProperties.put("execution.result-mode", "table"); expectedProperties.put("execution.max-table-result-rows", "100"); expectedProperties.put("execution.restart-strategy.type", "failure-rate");