[
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327533#comment-15327533
]
ASF GitHub Bot commented on FLINK-3937:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2085#discussion_r66805492
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[]
nodeManagers, int toAllocate) {
return false;
}
- @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
- @Override
- public boolean isDetached() {
+ public boolean isDetachedMode() {
return detached;
}
+
+ /**
+ * Gets a Hadoop Yarn client
+ * @return Returns a YarnClient which has to be shutdown manually
+ */
+ private static YarnClient getYarnClient(Configuration conf) {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+ return yarnClient;
+ }
+
+ /**
+ * Retrieves the Yarn application and cluster from the config
+ * @param config The config with entries to retrieve the cluster
+ * @return YarnClusterClient
+ * @deprecated This should be removed in the future
+ */
+ public YarnClusterClient
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws
Exception {
+ String jobManagerHost =
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+ int jobManagerPort =
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+ if (jobManagerHost != null && jobManagerPort != -1) {
+
+ YarnClient yarnClient = getYarnClient(conf);
+ List<ApplicationReport> applicationReports =
yarnClient.getApplications();
+ for (ApplicationReport report : applicationReports) {
+ if (report.getHost().equals(jobManagerHost) &&
report.getRpcPort() == jobManagerPort) {
+ LOG.info("Found application '{}' " +
+ "with JobManager host name '{}'
and port '{}' from Yarn properties file.",
+ report.getApplicationId(),
jobManagerHost, jobManagerPort);
+ return
retrieve(report.getApplicationId().toString());
+ }
+ }
+
--- End diff --
Good idea.
> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> ------------------------------------------------------------------------------
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
> Issue Type: Improvement
> Reporter: Sebastian Klemke
> Assignee: Maximilian Michels
> Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop
> subcommands are hard to invoke if you only know the YARN application ID. As
> an improvement, I suggest adding a -yid <yarnApplicationId> option to the
> mentioned subcommands that can be used together with -m yarn-cluster. Flink
> cli would then retrieve JobManager RPC location from YARN ResourceManager.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)