[
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327440#comment-15327440
]
ASF GitHub Bot commented on FLINK-3937:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2085#discussion_r66795439
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -108,24 +131,83 @@ public FlinkYarnSessionCli(String shortPrefix, String
longPrefix, boolean accept
NAME = new Option(shortPrefix + "nm", longPrefix + "name",
true, "Set a custom name for the application on YARN");
}
+
/**
- * Creates a new Yarn Client.
- * @param cmd the command line to parse options from
- * @return an instance of the client or null if there was an error
+ * Resumes from a Flink Yarn properties file
+ * @param flinkConfiguration The flink configuration
+ * @return True if the properties were loaded, false otherwise
*/
- public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
+ private boolean resumeFromYarnProperties(Configuration
flinkConfiguration) {
+ // load the YARN properties
+ File propertiesFile = new
File(getYarnPropertiesLocation(flinkConfiguration));
+ if (!propertiesFile.exists()) {
+ return false;
+ }
+
+ logAndSysout("Found YARN properties file " +
propertiesFile.getAbsolutePath());
+
+ Properties yarnProperties = new Properties();
+ try {
+ try (InputStream is = new
FileInputStream(propertiesFile)) {
+ yarnProperties.load(is);
+ }
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Cannot read the YARN
properties file", e);
+ }
+
+ // configure the default parallelism from YARN
+ String propParallelism =
yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
+ if (propParallelism != null) { // maybe the property is not set
+ try {
+ int parallelism =
Integer.parseInt(propParallelism);
+
flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY,
parallelism);
+
+ logAndSysout("YARN properties set default
parallelism to " + parallelism);
+ }
+ catch (NumberFormatException e) {
+ throw new RuntimeException("Error while parsing
the YARN properties: " +
+ "Property " +
YARN_PROPERTIES_PARALLELISM + " is not an integer.");
+ }
+ }
+
+ // get the JobManager address from the YARN properties
+ String address =
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+ InetSocketAddress jobManagerAddress;
+ if (address != null) {
+ try {
+ jobManagerAddress =
ClientUtils.parseHostPortAddress(address);
+ // store address in config from where it is
retrieved by the retrieval service
+
CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration,
jobManagerAddress);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("YARN properties
contain an invalid entry for JobManager address.", e);
+ }
+
+ logAndSysout("Using JobManager address from YARN
properties " + jobManagerAddress);
+ }
- AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
- if (flinkYarnClient == null) {
- return null;
+ // handle the YARN client's dynamic properties
+ String dynamicPropertiesEncoded =
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+ Map<String, String> dynamicProperties =
getDynamicProperties(dynamicPropertiesEncoded);
+ for (Map.Entry<String, String> dynamicProperty :
dynamicProperties.entrySet()) {
+ flinkConfiguration.setString(dynamicProperty.getKey(),
dynamicProperty.getValue());
}
+ return true;
+ }
+
+ public YarnClusterDescriptor createDescriptor(String
defaultApplicationName, CommandLine cmd) {
+
+
+ YarnClusterDescriptor yarnClusterDescriptor = new
YarnClusterDescriptor();
--- End diff --
A bit too many blank lines ;)
> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> ------------------------------------------------------------------------------
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
> Issue Type: Improvement
> Reporter: Sebastian Klemke
> Assignee: Maximilian Michels
> Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop
> subcommands are hard to invoke if you only know the YARN application ID. As
> an improvement, I suggest adding a -yid <yarnApplicationId> option to the
> mentioned subcommands that can be used together with -m yarn-cluster. Flink
> cli would then retrieve JobManager RPC location from YARN ResourceManager.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)