This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch executor-impl in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e90d6ae6989143855d56ed80462678142f56bae Author: Kostas Kloudas <[email protected]> AuthorDate: Thu Nov 14 15:08:25 2019 +0100 [hotfix] Merge configurations in arguemnt list of runProgram() in CLIFrontend --- .../org/apache/flink/client/cli/CliFrontend.java | 44 ++++++++++++++-------- .../flink/client/cli/ExecutionConfigAccessor.java | 5 ++- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 38243fc..5ed2901 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -208,33 +208,45 @@ public class CliFrontend { throw new CliArgsException("Could not build the program from JAR file.", e); } - final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine); - final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine); - - final List<URL> jobJars = program.getJobJarAndDependencies(); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars); - final Configuration executionConfig = executionParameters.getConfiguration(); + final Configuration effectiveConfiguration = getEffectiveConfiguration( + commandLine, + programOptions, + program.getJobJarAndDependencies()); try { - runProgram(executorConfig, executionConfig, program); + runProgram(effectiveConfiguration, program); } finally { program.deleteExtractedLibraries(); } } + private Configuration getEffectiveConfiguration( + final CommandLine commandLine, + final ProgramOptions programOptions, + final List<URL> jobJars) throws FlinkException { + + final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine)); + final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions( + checkNotNull(programOptions), + checkNotNull(jobJars)); + + final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine); + final Configuration effectiveConfiguration = new Configuration(executorConfig); + return executionParameters.applyToConfiguration(effectiveConfiguration); + } + private <ClusterID> void runProgram( - Configuration executorConfig, - Configuration executionConfig, + Configuration configuration, PackagedProgram program) throws ProgramInvocationException, FlinkException { - final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig); + final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration); checkNotNull(clusterClientFactory); - final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig); + final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration); try { - final ClusterID clusterId = clusterClientFactory.getClusterId(executorConfig); - final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(executionConfig); + final ClusterID clusterId = clusterClientFactory.getClusterId(configuration); + final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration); final ClusterClient<ClusterID> client; // directly deploy the job if the cluster is started in job mode and detached @@ -243,7 +255,7 @@ public class CliFrontend { final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism); - final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig); + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); client = clusterDescriptor.deployJobCluster( clusterSpecification, jobGraph, @@ -264,7 +276,7 @@ public class CliFrontend { } else { // also in job mode we have to deploy a session cluster because the job // might consist of multiple parts (e.g. when using collect) - final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig); + final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration); client = clusterDescriptor.deploySessionCluster(clusterSpecification); // if not running in detached mode, add a shutdown hook to shut down cluster if client exits // there's a race-condition here if cli is killed before shutdown hook is installed @@ -279,7 +291,7 @@ public class CliFrontend { int userParallelism = executionParameters.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); - executeProgram(executionConfig, program, client); + executeProgram(configuration, program, client); } finally { if (clusterId == null && !executionParameters.getDetachedMode()) { // terminate the cluster only if we have started it before and if it's not detached diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java index 9e570e1..f55560b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java @@ -77,8 +77,9 @@ public class ExecutionConfigAccessor { return new ExecutionConfigAccessor(configuration); } - public Configuration getConfiguration() { - return configuration; + Configuration applyToConfiguration(final Configuration baseConfiguration) { + baseConfiguration.addAll(configuration); + return baseConfiguration; } public List<URL> getJars() {
