[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324559#comment-15324559
 ] 

ASF GitHub Bot commented on FLINK-3937:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2085#discussion_r66626302
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -980,110 +845,41 @@ protected ActorGateway 
getJobManagerGateway(CommandLineOptions options) throws E
        }
     
        /**
    -    * Retrieves a {@link Client} object from the given command line 
options and other parameters.
    +    * Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
         *
         * @param options Command line options which contain JobManager address
         * @param programName Program name
    -    * @param userParallelism Given user parallelism
         * @throws Exception
         */
    -   protected Client getClient(
    +   protected ClusterClient getClient(
                        CommandLineOptions options,
    -                   String programName,
    -                   int userParallelism,
    -                   boolean detachedMode)
    -           throws Exception {
    -           InetSocketAddress jobManagerAddress;
    -           int maxSlots = -1;
    +                   String programName) throws Exception {
     
    -           if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
    -                   logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
    -
    -                   // Default yarn application name to use, if nothing is 
specified on the command line
    -                   String applicationName = "Flink Application: " + 
programName;
    -
    -                   // user wants to run Flink in YARN cluster.
    -                   CommandLine commandLine = options.getCommandLine();
    -                   AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
    -                                                                           
                                .getFlinkYarnSessionCli()
    -                                                                           
                                .withDefaultApplicationName(applicationName)
    -                                                                           
                                .createFlinkYarnClient(commandLine);
    -
    -                   if (flinkYarnClient == null) {
    -                           throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
    -                   }
    -
    -                   // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
    -                   // from yarn options.
    -                   if (detachedMode) {
    -                           flinkYarnClient.setDetachedMode(true);
    -                   }
    -
    -                   // the number of slots available from YARN:
    -                   int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
    -                   if (yarnTmSlots == -1) {
    -                           yarnTmSlots = 1;
    -                   }
    -                   maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
    -                   if (userParallelism != -1) {
    -                           int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
    -                           logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
    -                                           "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
    -                                           "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
    -                                           "will get "+slotsPerTM+" 
slots.");
    -                           flinkYarnClient.setTaskManagerSlots(slotsPerTM);
    -                   }
    -
    -                   try {
    -                           yarnCluster = flinkYarnClient.deploy();
    -                           yarnCluster.connectToCluster();
    -                   }
    -                   catch (Exception e) {
    -                           throw new RuntimeException("Error deploying the 
YARN cluster", e);
    -                   }
    +           // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
    +           CustomCommandLine<?> activeCommandLine =
    +                   
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
     
    -                   jobManagerAddress = yarnCluster.getJobManagerAddress();
    -                   writeJobManagerAddressToConfig(jobManagerAddress);
    -                   
    -                   // overwrite the yarn client config (because the client 
parses the dynamic properties)
    -                   
this.config.addAll(flinkYarnClient.getFlinkConfiguration());
    -
    -                   logAndSysout("YARN cluster started");
    -                   logAndSysout("JobManager web interface address " + 
yarnCluster.getWebInterfaceURL());
    -                   logAndSysout("Waiting until all TaskManagers have 
connected");
    -
    -                   while(true) {
    -                           GetClusterStatusResponse status = 
yarnCluster.getClusterStatus();
    -                           if (status != null) {
    -                                   if (status.numRegisteredTaskManagers() 
< flinkYarnClient.getTaskManagerCount()) {
    -                                           logAndSysout("TaskManager 
status (" + status.numRegisteredTaskManagers() + "/"
    -                                                   + 
flinkYarnClient.getTaskManagerCount() + ")");
    -                                   } else {
    -                                           logAndSysout("All TaskManagers 
are connected");
    -                                           break;
    -                                   }
    -                           } else {
    -                                   logAndSysout("No status updates from 
the YARN cluster received so far. Waiting ...");
    -                           }
    +           ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config);
     
    -                           try {
    -                                   Thread.sleep(500);
    -                           }
    -                           catch (InterruptedException e) {
    -                                   LOG.error("Interrupted while waiting 
for TaskManagers");
    -                                   System.err.println("Thread is 
interrupted");
    -                                   Thread.currentThread().interrupt();
    -                           }
    -                   }
    -           }
    -           else {
    -                   if(options.getJobManagerAddress() != null) {
    -                           jobManagerAddress = 
ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
    -                           
writeJobManagerAddressToConfig(jobManagerAddress);
    +           if (client != null) {
    +                   logAndSysout("Cluster retrieved");
    +           } else {
    --- End diff --
    
    Yes, I have thought about letting the command-line decide whether a cluster 
can be created or retrieved (it would return an Enum based on the options 
supplied). I decided I would address this in a follow-up.


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-3937
>                 URL: https://issues.apache.org/jira/browse/FLINK-3937
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Sebastian Klemke
>            Assignee: Maximilian Michels
>            Priority: Trivial
>         Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid <yarnApplicationId> option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to