Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2085#discussion_r66765910
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -980,110 +845,41 @@ protected ActorGateway 
getJobManagerGateway(CommandLineOptions options) throws E
        }
     
        /**
    -    * Retrieves a {@link Client} object from the given command line 
options and other parameters.
    +    * Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
         *
         * @param options Command line options which contain JobManager address
         * @param programName Program name
    -    * @param userParallelism Given user parallelism
         * @throws Exception
         */
    -   protected Client getClient(
    +   protected ClusterClient getClient(
                        CommandLineOptions options,
    -                   String programName,
    -                   int userParallelism,
    -                   boolean detachedMode)
    -           throws Exception {
    -           InetSocketAddress jobManagerAddress;
    -           int maxSlots = -1;
    +                   String programName) throws Exception {
     
    -           if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
    -                   logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
    -
    -                   // Default yarn application name to use, if nothing is 
specified on the command line
    -                   String applicationName = "Flink Application: " + 
programName;
    -
    -                   // user wants to run Flink in YARN cluster.
    -                   CommandLine commandLine = options.getCommandLine();
    -                   AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
    -                                                                           
                                .getFlinkYarnSessionCli()
    -                                                                           
                                .withDefaultApplicationName(applicationName)
    -                                                                           
                                .createFlinkYarnClient(commandLine);
    -
    -                   if (flinkYarnClient == null) {
    -                           throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
    -                   }
    -
    -                   // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
    -                   // from yarn options.
    -                   if (detachedMode) {
    -                           flinkYarnClient.setDetachedMode(true);
    -                   }
    -
    -                   // the number of slots available from YARN:
    -                   int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
    -                   if (yarnTmSlots == -1) {
    -                           yarnTmSlots = 1;
    -                   }
    -                   maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
    -                   if (userParallelism != -1) {
    -                           int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
    -                           logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
    -                                           "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
    -                                           "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
    -                                           "will get "+slotsPerTM+" 
slots.");
    -                           flinkYarnClient.setTaskManagerSlots(slotsPerTM);
    -                   }
    -
    -                   try {
    -                           yarnCluster = flinkYarnClient.deploy();
    -                           yarnCluster.connectToCluster();
    -                   }
    -                   catch (Exception e) {
    -                           throw new RuntimeException("Error deploying the 
YARN cluster", e);
    -                   }
    +           // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
    +           CustomCommandLine<?> activeCommandLine =
    +                   
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
    --- End diff --
    
    Okay, moving the logic is addressing my concerns.
    
    I checked the classes again and my renaming suggestions don't make much 
sense ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to