Repository: flink Updated Branches: refs/heads/flink3712 [created] b368cb2d5
[FLINK-3712] Make all dynamic properties available to the CLI frontend This closes #1863 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b368cb2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b368cb2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b368cb2d Branch: refs/heads/flink3712 Commit: b368cb2d5c3bc67f1fa82f9c2b77b46ce1684962 Parents: a234719 Author: Robert Metzger <rmetz...@apache.org> Authored: Thu Apr 7 16:44:48 2016 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Mon Apr 11 11:41:12 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/client/CliFrontend.java | 6 +++++- .../apache/flink/client/FlinkYarnSessionCli.java | 2 +- .../StandaloneLeaderRetrievalService.java | 2 ++ .../runtime/yarn/AbstractFlinkYarnClient.java | 5 ++++- .../org/apache/flink/api/scala/FlinkShell.scala | 2 +- .../flink/yarn/YARNHighAvailabilityITCase.java | 2 +- .../apache/flink/yarn/YARNSessionFIFOITCase.java | 2 +- .../apache/flink/yarn/FlinkYarnClientBase.java | 18 ++++++++++++++---- 8 files changed, 29 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index b5dfbe5..6d972bc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -1044,6 +1044,9 @@ public class CliFrontend { jobManagerAddress = yarnCluster.getJobManagerAddress(); writeJobManagerAddressToConfig(jobManagerAddress); + + // overwrite the yarn client config (because the client parses the dynamic properties) + this.config.addAll(flinkYarnClient.getFlinkConfiguration()); logAndSysout("YARN cluster started"); logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL()); @@ -1180,8 +1183,9 @@ public class CliFrontend { catch (Exception e) { return handleError(e); } + } else { + return run(params); } - return run(params); case ACTION_LIST: return list(params); case ACTION_INFO: http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 94de5c4..91f8df2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -142,7 +142,7 @@ public class FlinkYarnSessionCli { String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); GlobalConfiguration.loadConfiguration(confDirPath); Configuration flinkConfiguration = GlobalConfiguration.getConfiguration(); - flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration); + flinkYarnClient.setFlinkConfiguration(flinkConfiguration); flinkYarnClient.setConfigurationDirectory(confDirPath); File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME); if (!confFile.exists()) { http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java index dbab41c..1be879c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java @@ -44,6 +44,7 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService this.jobManagerAddress = jobManagerAddress; } + @Override public void start(LeaderRetrievalListener listener) { Preconditions.checkNotNull(listener, "Listener must not be null."); Preconditions.checkState(leaderListener == null, "StandaloneLeaderRetrievalService can " + @@ -55,5 +56,6 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService leaderListener.notifyLeaderAddress(jobManagerAddress, null); } + @Override public void stop() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java index 83a976d..c1498c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java @@ -17,6 +17,7 @@ */ package org.apache.flink.runtime.yarn; +import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.Path; import java.io.File; import java.util.List; @@ -43,7 +44,9 @@ public abstract class AbstractFlinkYarnClient { /** * Flink configuration */ - public abstract void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf); + public abstract void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf); + + public abstract Configuration getFlinkConfiguration(); /** * http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala ---------------------------------------------------------------------- diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 6937e1b..2c2fbb3 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -223,7 +223,7 @@ object FlinkShell { val confFile = new File(confDirPath + File.separator + "flink-conf.yaml") val confPath = new Path(confFile.getAbsolutePath) GlobalConfiguration.loadConfiguration(confDirPath) - yarnClient.setFlinkConfigurationObject(flinkConfiguration) + yarnClient.setFlinkConfiguration(flinkConfiguration) yarnClient.setConfigurationDirectory(confDirPath) yarnClient.setConfigurationFilePath(confPath) http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index f68b141..a93abf0 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -111,7 +111,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase { String fsStateHandlePath = tmp.getRoot().getPath(); - flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index d713b73..cb402a3 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -227,7 +227,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles())); String confDirPath = System.getenv("FLINK_CONF_DIR"); flinkYarnClient.setConfigurationDirectory(confDirPath); - flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); // deploy http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java index ef02be3..6f81d09 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java @@ -124,7 +124,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { private String dynamicPropertiesEncoded; - private List<File> shipFiles = new ArrayList<File>(); + private List<File> shipFiles = new ArrayList<>(); private org.apache.flink.configuration.Configuration flinkConfiguration; private boolean detached; @@ -174,11 +174,16 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { } @Override - public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) { + public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) { this.flinkConfiguration = conf; } @Override + public org.apache.flink.configuration.Configuration getFlinkConfiguration() { + return flinkConfiguration; + } + + @Override public void setTaskManagerSlots(int slots) { if(slots <= 0) { throw new IllegalArgumentException("Number of TaskManager slots must be positive"); @@ -209,6 +214,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { flinkConfigurationPath = confPath; } + @Override public void setConfigurationDirectory(String configurationDirectory) { this.configurationDirectory = configurationDirectory; } @@ -247,6 +253,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { } } + @Override public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) { this.dynamicPropertiesEncoded = dynamicPropertiesEncoded; } @@ -303,6 +310,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { return detached; } + @Override public AbstractFlinkYarnCluster deploy() throws Exception { UserGroupInformation.setConfiguration(conf); @@ -542,7 +550,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { LocalResource flinkConf = Records.newRecord(LocalResource.class); Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory()); Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory()); - Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2); + Map<String, LocalResource> localResources = new HashMap<>(2); localResources.put("flink.jar", appMasterJar); localResources.put("flink-conf.yaml", flinkConf); @@ -578,7 +586,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { fs.close(); // Setup CLASSPATH for ApplicationMaster - Map<String, String> appMasterEnv = new HashMap<String, String>(); + Map<String, String> appMasterEnv = new HashMap<>(); // set user specified app master environment variables appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration)); // set classpath from YARN configuration @@ -728,6 +736,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree); } + @Override public String getClusterDescription() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -763,6 +772,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { return baos.toString(); } + @Override public String getSessionFilesDir() { return sessionFilesDir.toString(); }