Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66624415 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -980,110 +845,41 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E } /** - * Retrieves a {@link Client} object from the given command line options and other parameters. + * Retrieves a {@link ClusterClient} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address * @param programName Program name - * @param userParallelism Given user parallelism * @throws Exception */ - protected Client getClient( + protected ClusterClient getClient( CommandLineOptions options, - String programName, - int userParallelism, - boolean detachedMode) - throws Exception { - InetSocketAddress jobManagerAddress; - int maxSlots = -1; + String programName) throws Exception { - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); - - // Default yarn application name to use, if nothing is specified on the command line - String applicationName = "Flink Application: " + programName; - - // user wants to run Flink in YARN cluster. - CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser - .getFlinkYarnSessionCli() - .withDefaultApplicationName(applicationName) - .createFlinkYarnClient(commandLine); - - if (flinkYarnClient == null) { - throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); - } - - // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded - // from yarn options. - if (detachedMode) { - flinkYarnClient.setDetachedMode(true); - } - - // the number of slots available from YARN: - int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if (yarnTmSlots == -1) { - yarnTmSlots = 1; - } - maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if (userParallelism != -1) { - int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + - "but the user requested a parallelism of " + userParallelism + " on YARN. " + - "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."); - flinkYarnClient.setTaskManagerSlots(slotsPerTM); - } - - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } - catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } + // Get the custom command-line (e.g. Standalone/Yarn/Mesos) + CustomCommandLine<?> activeCommandLine = + CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); --- End diff -- Why is this method located in the Parser? I thought the parser is only responsible for parsing the command line arguments? Maybe it makes more sense to rename this ``` loadCustomCommandLine("org.apache.flink.client.cli.DefaultCLI"); loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn"); ``` to parser (loadCustomParser(), and DefaultCLIParser, YarnCliParser?)
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---