Repository: flink Updated Branches: refs/heads/master efc344a4e -> ec6d97528
[FLINK-4079] YARN properties file used for per-job cluster Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec6d9752 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec6d9752 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec6d9752 Branch: refs/heads/master Commit: ec6d97528e8b21f191b7922e4810fd60804c8365 Parents: f4ac852 Author: Maximilian Michels <m...@apache.org> Authored: Thu Jun 16 12:03:04 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Fri Jun 17 10:37:58 2016 +0200 ---------------------------------------------------------------------- .../flink/yarn/cli/FlinkYarnSessionCli.java | 25 ++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ec6d9752/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 5eca4f1..c355f0a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -105,7 +105,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> /** * Dynamic properties allow the user to specify additional configuration values with -D, such as - * -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368 + * -D fs.overwrite-files=true -D taskmanager.network.numberOfBuffers=16368 */ private final Option DYNAMIC_PROPERTIES; @@ -155,10 +155,27 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> /** * Resumes from a Flink Yarn properties file + * @param cmdLine The command-line parameters * @param flinkConfiguration The flink configuration * @return True if the properties were loaded, false otherwise */ - private boolean resumeFromYarnProperties(Configuration flinkConfiguration) { + private boolean resumeFromYarnProperties(CommandLine cmdLine, Configuration flinkConfiguration) { + + String jobManagerOption = cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); + if (jobManagerOption != null) { + // don't resume from properties file if a JobManager has been specified + return false; + } + + for (Option option : cmdLine.getOptions()) { + if (ALL_OPTIONS.hasOption(option.getOpt())) { + if (!option.getOpt().equals(DETACHED.getOpt())) { + // don't resume from properties file if yarn options have been specified + return false; + } + } + } + // load the YARN properties File propertiesFile = getYarnPropertiesLocation(flinkConfiguration); if (!propertiesFile.exists()) { @@ -439,7 +456,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> public boolean isActive(CommandLine commandLine, Configuration configuration) { String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); - return yarnJobManager || resumeFromYarnProperties(configuration); + return yarnJobManager || resumeFromYarnProperties(commandLine, configuration); } @Override @@ -471,7 +488,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> yarnDescriptor.setFlinkConfiguration(config); return yarnDescriptor.retrieve(applicationID); // then try to load from yarn properties - } else if (resumeFromYarnProperties(config)) { + } else if (resumeFromYarnProperties(cmdLine, config)) { AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); yarnDescriptor.setFlinkConfiguration(config); return yarnDescriptor.retrieveFromConfig(config);