dawidwys commented on a change in pull request #12643:
URL: https://github.com/apache/flink/pull/12643#discussion_r440643939



##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
##########
@@ -522,6 +531,52 @@ private void initializeTableEnvironment(@Nullable 
SessionState sessionState) {
                }
        }
 
+       private TableConfig createTableConfig() {
+               final TableConfig config = new TableConfig();
+               config.addConfiguration(flinkConfig);
+               Configuration conf = config.getConfiguration();
+               environment.getConfiguration().asMap().forEach(conf::setString);
+               ExecutionEntry execution = environment.getExecution();
+               config.setIdleStateRetentionTime(
+                               
Time.milliseconds(execution.getMinStateRetention()),
+                               
Time.milliseconds(execution.getMaxStateRetention()));
+
+               conf.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
execution.getParallelism());

Review comment:
       Please use `set` method instead of `setInteger/String` etc.

##########
File path: 
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
##########
@@ -522,6 +531,52 @@ private void initializeTableEnvironment(@Nullable 
SessionState sessionState) {
                }
        }
 
+       private TableConfig createTableConfig() {
+               final TableConfig config = new TableConfig();
+               config.addConfiguration(flinkConfig);
+               Configuration conf = config.getConfiguration();
+               environment.getConfiguration().asMap().forEach(conf::setString);
+               ExecutionEntry execution = environment.getExecution();
+               config.setIdleStateRetentionTime(
+                               
Time.milliseconds(execution.getMinStateRetention()),
+                               
Time.milliseconds(execution.getMaxStateRetention()));
+
+               conf.setInteger(CoreOptions.DEFAULT_PARALLELISM, 
execution.getParallelism());
+               conf.setInteger(PipelineOptions.MAX_PARALLELISM, 
execution.getMaxParallelism());
+               conf.set(StreamPipelineOptions.TIME_CHARACTERISTIC, 
execution.getTimeCharacteristic());
+               if (execution.getTimeCharacteristic() == 
TimeCharacteristic.EventTime) {
+                       conf.set(PipelineOptions.AUTO_WATERMARK_INTERVAL,
+                                       
Duration.ofMillis(execution.getPeriodicWatermarksInterval()));
+               }
+
+               RestartStrategyConfiguration restartStrategy = 
environment.getExecution().getRestartStrategy();
+               if (restartStrategy instanceof NoRestartStrategyConfiguration) {
+                       conf.setString(RestartStrategyOptions.RESTART_STRATEGY, 
"none");
+               } else if (restartStrategy instanceof 
FixedDelayRestartStrategyConfiguration) {
+                       conf.setString(RestartStrategyOptions.RESTART_STRATEGY, 
"fixed-delay");
+                       FixedDelayRestartStrategyConfiguration fixedDelay = 
((FixedDelayRestartStrategyConfiguration) restartStrategy);
+                       
conf.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
+                                       fixedDelay.getRestartAttempts());
+                       
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
+                                       
Duration.ofMillis(fixedDelay.getDelayBetweenAttemptsInterval().toMilliseconds()));
+               } else if (restartStrategy instanceof 
FailureRateRestartStrategyConfiguration) {
+                       conf.setString(RestartStrategyOptions.RESTART_STRATEGY, 
"failure-rate");
+                       FailureRateRestartStrategyConfiguration failureRate = 
(FailureRateRestartStrategyConfiguration) restartStrategy;
+                       
conf.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL,
+                                       failureRate.getMaxFailureRate());
+                       
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL,
+                                       
Duration.ofMillis(failureRate.getFailureInterval().toMilliseconds()));
+                       
conf.set(RestartStrategyOptions.RESTART_STRATEGY_FAILURE_RATE_DELAY,
+                                       
Duration.ofMillis(failureRate.getDelayBetweenAttemptsInterval().toMilliseconds()));
+               } else if (restartStrategy instanceof 
FallbackRestartStrategyConfiguration) {
+                       // default is FallbackRestartStrategyConfiguration
+                       // see ExecutionConfig.restartStrategyConfiguration
+                       
conf.removeConfig(RestartStrategyOptions.RESTART_STRATEGY);
+               }

Review comment:
       extract to a separate method




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to