Repository: flink
Updated Branches:
  refs/heads/master efc344a4e -> ec6d97528


[FLINK-4079] YARN properties file used for per-job cluster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec6d9752
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec6d9752
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec6d9752

Branch: refs/heads/master
Commit: ec6d97528e8b21f191b7922e4810fd60804c8365
Parents: f4ac852
Author: Maximilian Michels <m...@apache.org>
Authored: Thu Jun 16 12:03:04 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Fri Jun 17 10:37:58 2016 +0200

----------------------------------------------------------------------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 25 ++++++++++++++++----
 1 file changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec6d9752/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 5eca4f1..c355f0a 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -105,7 +105,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
 
        /**
         * Dynamic properties allow the user to specify additional 
configuration values with -D, such as
-        *  -Dfs.overwrite-files=true  
-Dtaskmanager.network.numberOfBuffers=16368
+        *  -D fs.overwrite-files=true  -D 
taskmanager.network.numberOfBuffers=16368
         */
        private final Option DYNAMIC_PROPERTIES;
 
@@ -155,10 +155,27 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
 
        /**
         * Resumes from a Flink Yarn properties file
+        * @param cmdLine The command-line parameters
         * @param flinkConfiguration The flink configuration
         * @return True if the properties were loaded, false otherwise
         */
-       private boolean resumeFromYarnProperties(Configuration 
flinkConfiguration) {
+       private boolean resumeFromYarnProperties(CommandLine cmdLine, 
Configuration flinkConfiguration) {
+
+               String jobManagerOption = 
cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
+               if (jobManagerOption != null) {
+                       // don't resume from properties file if a JobManager 
has been specified
+                       return false;
+               }
+
+               for (Option option : cmdLine.getOptions()) {
+                       if (ALL_OPTIONS.hasOption(option.getOpt())) {
+                               if (!option.getOpt().equals(DETACHED.getOpt())) 
{
+                                       // don't resume from properties file if 
yarn options have been specified
+                                       return false;
+                               }
+                       }
+               }
+
                // load the YARN properties
                File propertiesFile = 
getYarnPropertiesLocation(flinkConfiguration);
                if (!propertiesFile.exists()) {
@@ -439,7 +456,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
        public boolean isActive(CommandLine commandLine, Configuration 
configuration) {
                String jobManagerOption = 
commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
                boolean yarnJobManager = ID.equals(jobManagerOption);
-               return yarnJobManager || 
resumeFromYarnProperties(configuration);
+               return yarnJobManager || resumeFromYarnProperties(commandLine, 
configuration);
        }
 
        @Override
@@ -471,7 +488,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                        yarnDescriptor.setFlinkConfiguration(config);
                        return yarnDescriptor.retrieve(applicationID);
                // then try to load from yarn properties
-               } else if (resumeFromYarnProperties(config)) {
+               } else if (resumeFromYarnProperties(cmdLine, config)) {
                        AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor();
                        yarnDescriptor.setFlinkConfiguration(config);
                        return yarnDescriptor.retrieveFromConfig(config);

Reply via email to