Repository: flink
Updated Branches:
  refs/heads/flink3712 [created] b368cb2d5


[FLINK-3712] Make all dynamic properties available to the CLI frontend

This closes #1863


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b368cb2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b368cb2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b368cb2d

Branch: refs/heads/flink3712
Commit: b368cb2d5c3bc67f1fa82f9c2b77b46ce1684962
Parents: a234719
Author: Robert Metzger <rmetz...@apache.org>
Authored: Thu Apr 7 16:44:48 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Mon Apr 11 11:41:12 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/client/CliFrontend.java |  6 +++++-
 .../apache/flink/client/FlinkYarnSessionCli.java  |  2 +-
 .../StandaloneLeaderRetrievalService.java         |  2 ++
 .../runtime/yarn/AbstractFlinkYarnClient.java     |  5 ++++-
 .../org/apache/flink/api/scala/FlinkShell.scala   |  2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java    |  2 +-
 .../apache/flink/yarn/YARNSessionFIFOITCase.java  |  2 +-
 .../apache/flink/yarn/FlinkYarnClientBase.java    | 18 ++++++++++++++----
 8 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index b5dfbe5..6d972bc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -1044,6 +1044,9 @@ public class CliFrontend {
 
                        jobManagerAddress = yarnCluster.getJobManagerAddress();
                        writeJobManagerAddressToConfig(jobManagerAddress);
+                       
+                       // overwrite the yarn client config (because the client 
parses the dynamic properties)
+                       
this.config.addAll(flinkYarnClient.getFlinkConfiguration());
 
                        logAndSysout("YARN cluster started");
                        logAndSysout("JobManager web interface address " + 
yarnCluster.getWebInterfaceURL());
@@ -1180,8 +1183,9 @@ public class CliFrontend {
                                        catch (Exception e) {
                                                return handleError(e);
                                        }
+                               } else {
+                                       return run(params);
                                }
-                               return run(params);
                        case ACTION_LIST:
                                return list(params);
                        case ACTION_INFO:

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 94de5c4..91f8df2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -142,7 +142,7 @@ public class FlinkYarnSessionCli {
                String confDirPath = 
CliFrontend.getConfigurationDirectoryFromEnv();
                GlobalConfiguration.loadConfiguration(confDirPath);
                Configuration flinkConfiguration = 
GlobalConfiguration.getConfiguration();
-               flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration);
+               flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
                flinkYarnClient.setConfigurationDirectory(confDirPath);
                File confFile = new File(confDirPath + File.separator + 
CONFIG_FILE_NAME);
                if (!confFile.exists()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index dbab41c..1be879c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -44,6 +44,7 @@ public class StandaloneLeaderRetrievalService implements 
LeaderRetrievalService
                this.jobManagerAddress = jobManagerAddress;
        }
 
+       @Override
        public void start(LeaderRetrievalListener listener) {
                Preconditions.checkNotNull(listener, "Listener must not be 
null.");
                Preconditions.checkState(leaderListener == null, 
"StandaloneLeaderRetrievalService can " +
@@ -55,5 +56,6 @@ public class StandaloneLeaderRetrievalService implements 
LeaderRetrievalService
                leaderListener.notifyLeaderAddress(jobManagerAddress, null);
        }
 
+       @Override
        public void stop() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
index 83a976d..c1498c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.runtime.yarn;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.Path;
 import java.io.File;
 import java.util.List;
@@ -43,7 +44,9 @@ public abstract class AbstractFlinkYarnClient {
        /**
         * Flink configuration
         */
-       public abstract void 
setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf);
+       public abstract void 
setFlinkConfiguration(org.apache.flink.configuration.Configuration conf);
+
+       public abstract Configuration getFlinkConfiguration();
 
        /**
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 6937e1b..2c2fbb3 100644
--- 
a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ 
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -223,7 +223,7 @@ object FlinkShell {
     val confFile = new File(confDirPath + File.separator + "flink-conf.yaml")
     val confPath = new Path(confFile.getAbsolutePath)
     GlobalConfiguration.loadConfiguration(confDirPath)
-    yarnClient.setFlinkConfigurationObject(flinkConfiguration)
+    yarnClient.setFlinkConfiguration(flinkConfiguration)
     yarnClient.setConfigurationDirectory(confDirPath)
     yarnClient.setConfigurationFilePath(confPath)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index f68b141..a93abf0 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -111,7 +111,7 @@ public class YARNHighAvailabilityITCase extends 
YarnTestBase {
 
                String fsStateHandlePath = tmp.getRoot().getPath();
 
-               
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+               
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
                
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
 +
                        zkServer.getConnectString() + 
"@@yarn.application-attempts=" + numberApplicationAttempts +
                        "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index d713b73..cb402a3 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -227,7 +227,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
                
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
                String confDirPath = System.getenv("FLINK_CONF_DIR");
                flinkYarnClient.setConfigurationDirectory(confDirPath);
-               
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+               
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
                flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + 
File.separator + "flink-conf.yaml"));
 
                // deploy

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
index ef02be3..6f81d09 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
@@ -124,7 +124,7 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
 
        private String dynamicPropertiesEncoded;
 
-       private List<File> shipFiles = new ArrayList<File>();
+       private List<File> shipFiles = new ArrayList<>();
        private org.apache.flink.configuration.Configuration flinkConfiguration;
 
        private boolean detached;
@@ -174,11 +174,16 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
        }
 
        @Override
-       public void 
setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) {
+       public void 
setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
                this.flinkConfiguration = conf;
        }
 
        @Override
+       public org.apache.flink.configuration.Configuration 
getFlinkConfiguration() {
+               return flinkConfiguration;
+       }
+
+       @Override
        public void setTaskManagerSlots(int slots) {
                if(slots <= 0) {
                        throw new IllegalArgumentException("Number of 
TaskManager slots must be positive");
@@ -209,6 +214,7 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
                flinkConfigurationPath = confPath;
        }
 
+       @Override
        public void setConfigurationDirectory(String configurationDirectory) {
                this.configurationDirectory = configurationDirectory;
        }
@@ -247,6 +253,7 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
                }
        }
 
+       @Override
        public void setDynamicPropertiesEncoded(String 
dynamicPropertiesEncoded) {
                this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
        }
@@ -303,6 +310,7 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
                return detached;
        }
 
+       @Override
        public AbstractFlinkYarnCluster deploy() throws Exception {
 
                UserGroupInformation.setConfiguration(conf);
@@ -542,7 +550,7 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
                LocalResource flinkConf = 
Records.newRecord(LocalResource.class);
                Path remotePathJar = Utils.setupLocalResource(fs, 
appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
                Path remotePathConf = Utils.setupLocalResource(fs, 
appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
-               Map<String, LocalResource> localResources = new HashMap<String, 
LocalResource>(2);
+               Map<String, LocalResource> localResources = new HashMap<>(2);
                localResources.put("flink.jar", appMasterJar);
                localResources.put("flink-conf.yaml", flinkConf);
 
@@ -578,7 +586,7 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
                fs.close();
 
                // Setup CLASSPATH for ApplicationMaster
-               Map<String, String> appMasterEnv = new HashMap<String, 
String>();
+               Map<String, String> appMasterEnv = new HashMap<>();
                // set user specified app master environment variables
                
appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
 flinkConfiguration));
                // set classpath from YARN configuration
@@ -728,6 +736,7 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
                return new ClusterResourceDescription(totalFreeMemory, 
containerLimit, nodeManagersFree);
        }
 
+       @Override
        public String getClusterDescription() throws Exception {
 
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -763,6 +772,7 @@ public abstract class FlinkYarnClientBase extends 
AbstractFlinkYarnClient {
                return baos.toString();
        }
 
+       @Override
        public String getSessionFilesDir() {
                return sessionFilesDir.toString();
        }

Reply via email to