[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324559#comment-15324559 ]
ASF GitHub Bot commented on FLINK-3937: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66626302 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -980,110 +845,41 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E } /** - * Retrieves a {@link Client} object from the given command line options and other parameters. + * Retrieves a {@link ClusterClient} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address * @param programName Program name - * @param userParallelism Given user parallelism * @throws Exception */ - protected Client getClient( + protected ClusterClient getClient( CommandLineOptions options, - String programName, - int userParallelism, - boolean detachedMode) - throws Exception { - InetSocketAddress jobManagerAddress; - int maxSlots = -1; + String programName) throws Exception { - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); - - // Default yarn application name to use, if nothing is specified on the command line - String applicationName = "Flink Application: " + programName; - - // user wants to run Flink in YARN cluster. - CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser - .getFlinkYarnSessionCli() - .withDefaultApplicationName(applicationName) - .createFlinkYarnClient(commandLine); - - if (flinkYarnClient == null) { - throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); - } - - // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded - // from yarn options. - if (detachedMode) { - flinkYarnClient.setDetachedMode(true); - } - - // the number of slots available from YARN: - int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if (yarnTmSlots == -1) { - yarnTmSlots = 1; - } - maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if (userParallelism != -1) { - int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + - "but the user requested a parallelism of " + userParallelism + " on YARN. " + - "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."); - flinkYarnClient.setTaskManagerSlots(slotsPerTM); - } - - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } - catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } + // Get the custom command-line (e.g. Standalone/Yarn/Mesos) + CustomCommandLine<?> activeCommandLine = + CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); - jobManagerAddress = yarnCluster.getJobManagerAddress(); - writeJobManagerAddressToConfig(jobManagerAddress); - - // overwrite the yarn client config (because the client parses the dynamic properties) - this.config.addAll(flinkYarnClient.getFlinkConfiguration()); - - logAndSysout("YARN cluster started"); - logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL()); - logAndSysout("Waiting until all TaskManagers have connected"); - - while(true) { - GetClusterStatusResponse status = yarnCluster.getClusterStatus(); - if (status != null) { - if (status.numRegisteredTaskManagers() < flinkYarnClient.getTaskManagerCount()) { - logAndSysout("TaskManager status (" + status.numRegisteredTaskManagers() + "/" - + flinkYarnClient.getTaskManagerCount() + ")"); - } else { - logAndSysout("All TaskManagers are connected"); - break; - } - } else { - logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); - } + ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config); - try { - Thread.sleep(500); - } - catch (InterruptedException e) { - LOG.error("Interrupted while waiting for TaskManagers"); - System.err.println("Thread is interrupted"); - Thread.currentThread().interrupt(); - } - } - } - else { - if(options.getJobManagerAddress() != null) { - jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); - writeJobManagerAddressToConfig(jobManagerAddress); + if (client != null) { + logAndSysout("Cluster retrieved"); + } else { --- End diff -- Yes, I have thought about letting the command-line decide whether a cluster can be created or retrieved (it would return an Enum based on the options supplied). I decided I would address this in a follow-up. > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > ------------------------------------------------------------------------------ > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement > Reporter: Sebastian Klemke > Assignee: Maximilian Michels > Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid <yarnApplicationId> option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)