[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324585#comment-15324585
 ] 

ASF GitHub Bot commented on FLINK-3937:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2085#discussion_r66628410
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -980,110 +845,41 @@ protected ActorGateway 
getJobManagerGateway(CommandLineOptions options) throws E
        }
     
        /**
    -    * Retrieves a {@link Client} object from the given command line 
options and other parameters.
    +    * Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
         *
         * @param options Command line options which contain JobManager address
         * @param programName Program name
    -    * @param userParallelism Given user parallelism
         * @throws Exception
         */
    -   protected Client getClient(
    +   protected ClusterClient getClient(
                        CommandLineOptions options,
    -                   String programName,
    -                   int userParallelism,
    -                   boolean detachedMode)
    -           throws Exception {
    -           InetSocketAddress jobManagerAddress;
    -           int maxSlots = -1;
    +                   String programName) throws Exception {
     
    -           if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
    -                   logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
    -
    -                   // Default yarn application name to use, if nothing is 
specified on the command line
    -                   String applicationName = "Flink Application: " + 
programName;
    -
    -                   // user wants to run Flink in YARN cluster.
    -                   CommandLine commandLine = options.getCommandLine();
    -                   AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
    -                                                                           
                                .getFlinkYarnSessionCli()
    -                                                                           
                                .withDefaultApplicationName(applicationName)
    -                                                                           
                                .createFlinkYarnClient(commandLine);
    -
    -                   if (flinkYarnClient == null) {
    -                           throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
    -                   }
    -
    -                   // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
    -                   // from yarn options.
    -                   if (detachedMode) {
    -                           flinkYarnClient.setDetachedMode(true);
    -                   }
    -
    -                   // the number of slots available from YARN:
    -                   int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
    -                   if (yarnTmSlots == -1) {
    -                           yarnTmSlots = 1;
    -                   }
    -                   maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
    -                   if (userParallelism != -1) {
    -                           int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
    -                           logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
    -                                           "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
    -                                           "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
    -                                           "will get "+slotsPerTM+" 
slots.");
    -                           flinkYarnClient.setTaskManagerSlots(slotsPerTM);
    -                   }
    -
    -                   try {
    -                           yarnCluster = flinkYarnClient.deploy();
    -                           yarnCluster.connectToCluster();
    -                   }
    -                   catch (Exception e) {
    -                           throw new RuntimeException("Error deploying the 
YARN cluster", e);
    -                   }
    +           // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
    +           CustomCommandLine<?> activeCommandLine =
    +                   
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
    --- End diff --
    
    Well spotted. I think I'll move this logic to the implementation of the 
CustomCommandLine. 
    
    I don't quite understand your renaming suggestion, are you suggesting to 
break up the CustomCommandLine into CustomParser and CustomCLI?


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-3937
>                 URL: https://issues.apache.org/jira/browse/FLINK-3937
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Sebastian Klemke
>            Assignee: Maximilian Michels
>            Priority: Trivial
>         Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid <yarnApplicationId> option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to