This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94ea5e6ca9a1a0c6dda6b832473e40578207e78b
Author: godfreyhe <godfre...@163.com>
AuthorDate: Sun Jun 14 10:51:05 2020 +0800

    [FLINK-18161][sql-client] Fix configurations from flink-conf.yaml overwrite 
sql-client's properties
    
    This closes #12643
---
 .../client/gateway/local/ExecutionContext.java     | 64 +++++++++++++++++-----
 .../client/gateway/local/ExecutionContextTest.java | 37 ++++++++-----
 .../client/gateway/local/LocalExecutorITCase.java  |  4 +-
 3 files changed, 76 insertions(+), 29 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index 6198440..0a444eb2 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -19,6 +19,11 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.FailureRateRestartStrategyConfiguration;
+import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.FallbackRestartStrategyConfiguration;
+import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.FixedDelayRestartStrategyConfiguration;
+import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.NoRestartStrategyConfiguration;
+import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -32,8 +37,12 @@ import 
org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
@@ -101,6 +110,7 @@ import java.lang.reflect.Method;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -524,15 +534,52 @@ public class ExecutionContext<ClusterID> {
        private TableConfig createTableConfig() {
                final TableConfig config = new TableConfig();
                config.addConfiguration(flinkConfig);
-               environment.getConfiguration().asMap().forEach((k, v) ->
-                               config.getConfiguration().setString(k, v));
+               Configuration conf = config.getConfiguration();
+               environment.getConfiguration().asMap().forEach(conf::setString);
                ExecutionEntry execution = environment.getExecution();
                config.setIdleStateRetentionTime(
                                
Time.milliseconds(execution.getMinStateRetention()),
                                
Time.milliseconds(execution.getMaxStateRetention()));
+
+               conf.set(CoreOptions.DEFAULT_PARALLELISM, 
execution.getParallelism());
+               conf.set(PipelineOptions.MAX_PARALLELISM, 
execution.getMaxParallelism());
+               conf.set(StreamPipelineOptions.TIME_CHARACTERISTIC, 
execution.getTimeCharacteristic());
+               if (execution.getTimeCharacteristic() == 
TimeCharacteristic.EventTime) {
+                       conf.set(PipelineOptions.AUTO_WATERMARK_INTERVAL,
+                                       
Duration.ofMillis(execution.getPeriodicWatermarksInterval()));
+               }
+
+               setRestartStrategy(conf);
                return config;
        }
 
+       private void setRestartStrategy(Configuration conf) {
+               RestartStrategyConfiguration restartStrategy = 
environment.getExecution().getRestartStrategy();
+               if (restartStrategy instanceof NoRestartStrategyConfiguration) {
+                       conf.set(RestartStrategyOptions.RESTART_STRATEGY, 
"none");
+               } else if (restartStrategy instanceof 
FixedDelayRestartStrategyConfiguration) {
+                       conf.set(RestartStrategyOptions.RESTART_STRATEGY, 
"fixed-delay");
+                       FixedDelayRestartStrategyConfiguration fixedDelay = 
((FixedDelayRestartStrategyConfiguration) restartStrategy);
+                       
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+                                       fixedDelay.getRestartAttempts());
+                       
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+                                       
Duration.ofMillis(fixedDelay.getDelayBetweenAttemptsInterval().toMilliseconds()));
+               } else if (restartStrategy instanceof 
FailureRateRestartStrategyConfiguration) {
+                       conf.set(RestartStrategyOptions.RESTART_STRATEGY, 
"failure-rate");
+                       FailureRateRestartStrategyConfiguration failureRate = 
(FailureRateRestartStrategyConfiguration) restartStrategy;
+                       
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL,
+                                       failureRate.getMaxFailureRate());
+                       
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
+                                       
Duration.ofMillis(failureRate.getFailureInterval().toMilliseconds()));
+                       
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
+                                       
Duration.ofMillis(failureRate.getDelayBetweenAttemptsInterval().toMilliseconds()));
+               } else if (restartStrategy instanceof 
FallbackRestartStrategyConfiguration) {
+                       // default is FallbackRestartStrategyConfiguration
+                       // see ExecutionConfig.restartStrategyConfiguration
+                       
conf.removeConfig(RestartStrategyOptions.RESTART_STRATEGY);
+               }
+       }
+
        private void createTableEnvironment(
                        EnvironmentSettings settings,
                        TableConfig config,
@@ -555,7 +602,7 @@ public class ExecutionContext<ClusterID> {
                                        functionCatalog);
                } else if (environment.getExecution().isBatchPlanner()) {
                        streamExecEnv = null;
-                       execEnv = createExecutionEnvironment();
+                       execEnv = 
ExecutionEnvironment.getExecutionEnvironment();
                        executor = null;
                        tableEnv = new BatchTableEnvironmentImpl(
                                        execEnv,
@@ -630,18 +677,9 @@ public class ExecutionContext<ClusterID> {
                database.ifPresent(tableEnv::useDatabase);
        }
 
-       private ExecutionEnvironment createExecutionEnvironment() {
-               final ExecutionEnvironment execEnv = 
ExecutionEnvironment.getExecutionEnvironment();
-               
execEnv.setRestartStrategy(environment.getExecution().getRestartStrategy());
-               
execEnv.setParallelism(environment.getExecution().getParallelism());
-               return execEnv;
-       }
-
        private StreamExecutionEnvironment createStreamExecutionEnvironment() {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               
env.setRestartStrategy(environment.getExecution().getRestartStrategy());
-               env.setParallelism(environment.getExecution().getParallelism());
-               
env.setMaxParallelism(environment.getExecution().getMaxParallelism());
+               // for TimeCharacteristic validation in 
StreamTableEnvironmentImpl
                
env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
                if (env.getStreamTimeCharacteristic() == 
TimeCharacteristic.EventTime) {
                        
env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index 7beee2a..d737746 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.table.client.gateway.local;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.python.PythonFunctionFactory;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamPipelineOptions;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
@@ -49,6 +53,7 @@ import org.apache.commons.cli.Options;
 import org.junit.Test;
 
 import java.net.URL;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -79,21 +84,25 @@ public class ExecutionContextTest {
        @Test
        public void testExecutionConfig() throws Exception {
                final ExecutionContext<?> context = 
createDefaultExecutionContext();
-               final ExecutionConfig config = context.getExecutionConfig();
+               final TableEnvironment tableEnv = context.getTableEnvironment();
+               final TableConfig tableConfig = tableEnv.getConfig();
 
-               assertEquals(99, config.getAutoWatermarkInterval());
+               assertEquals(1_000, tableConfig.getMinIdleStateRetentionTime());
+               assertEquals(600_000, 
tableConfig.getMaxIdleStateRetentionTime());
+               Configuration conf = tableConfig.getConfiguration();
 
-               final RestartStrategies.RestartStrategyConfiguration 
restartConfig = config.getRestartStrategy();
-               assertTrue(restartConfig instanceof 
RestartStrategies.FailureRateRestartStrategyConfiguration);
-               final RestartStrategies.FailureRateRestartStrategyConfiguration 
failureRateStrategy =
-                       
(RestartStrategies.FailureRateRestartStrategyConfiguration) restartConfig;
-               assertEquals(10, failureRateStrategy.getMaxFailureRate());
-               assertEquals(99_000, 
failureRateStrategy.getFailureInterval().toMilliseconds());
-               assertEquals(1_000, 
failureRateStrategy.getDelayBetweenAttemptsInterval().toMilliseconds());
+               assertEquals(1, 
conf.getInteger(CoreOptions.DEFAULT_PARALLELISM));
+               assertEquals(16, 
conf.getInteger(PipelineOptions.MAX_PARALLELISM));
 
-               final TableEnvironment tableEnv = context.getTableEnvironment();
-               assertEquals(1_000, 
tableEnv.getConfig().getMinIdleStateRetentionTime());
-               assertEquals(600_000, 
tableEnv.getConfig().getMaxIdleStateRetentionTime());
+               assertEquals(TimeCharacteristic.EventTime, 
conf.get(StreamPipelineOptions.TIME_CHARACTERISTIC));
+               assertEquals(Duration.ofMillis(99), 
conf.get(PipelineOptions.AUTO_WATERMARK_INTERVAL));
+
+               assertEquals("failure-rate", 
conf.getString(RestartStrategyOptions.RESTART_STRATEGY));
+               assertEquals(10, conf.getInteger(
+                               
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL));
+               assertEquals(Duration.ofMillis(99_000), conf.get(
+                               
RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL));
+               assertEquals(Duration.ofMillis(1_000), 
conf.get(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY));
        }
 
        @Test
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index d4c7fac..6a7f9ca 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -404,8 +404,8 @@ public class LocalExecutorITCase extends TestLogger {
                        
expectedProperties.put("execution.periodic-watermarks-interval", "99");
                        expectedProperties.put("execution.parallelism", "1");
                        expectedProperties.put("execution.max-parallelism", 
"16");
-                       
expectedProperties.put("execution.max-idle-state-retention", "0");
-                       
expectedProperties.put("execution.min-idle-state-retention", "0");
+                       
expectedProperties.put("execution.max-idle-state-retention", "600000");
+                       
expectedProperties.put("execution.min-idle-state-retention", "1000");
                        expectedProperties.put("execution.result-mode", 
"table");
                        
expectedProperties.put("execution.max-table-result-rows", "100");
                        
expectedProperties.put("execution.restart-strategy.type", "failure-rate");

Reply via email to