[ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15327440#comment-15327440
 ] 

ASF GitHub Bot commented on FLINK-3937:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2085#discussion_r66795439
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -108,24 +131,83 @@ public FlinkYarnSessionCli(String shortPrefix, String 
longPrefix, boolean accept
                NAME = new Option(shortPrefix + "nm", longPrefix + "name", 
true, "Set a custom name for the application on YARN");
        }
     
    +
        /**
    -    * Creates a new Yarn Client.
    -    * @param cmd the command line to parse options from
    -    * @return an instance of the client or null if there was an error
    +    * Resumes from a Flink Yarn properties file
    +    * @param flinkConfiguration The flink configuration
    +    * @return True if the properties were loaded, false otherwise
         */
    -   public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
    +   private boolean resumeFromYarnProperties(Configuration 
flinkConfiguration) {
    +           // load the YARN properties
    +           File propertiesFile = new 
File(getYarnPropertiesLocation(flinkConfiguration));
    +           if (!propertiesFile.exists()) {
    +                   return false;
    +           }
    +
    +           logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
    +
    +           Properties yarnProperties = new Properties();
    +           try {
    +                   try (InputStream is = new 
FileInputStream(propertiesFile)) {
    +                           yarnProperties.load(is);
    +                   }
    +           }
    +           catch (IOException e) {
    +                   throw new RuntimeException("Cannot read the YARN 
properties file", e);
    +           }
    +
    +           // configure the default parallelism from YARN
    +           String propParallelism = 
yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
    +           if (propParallelism != null) { // maybe the property is not set
    +                   try {
    +                           int parallelism = 
Integer.parseInt(propParallelism);
    +                           
flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 
parallelism);
    +
    +                           logAndSysout("YARN properties set default 
parallelism to " + parallelism);
    +                   }
    +                   catch (NumberFormatException e) {
    +                           throw new RuntimeException("Error while parsing 
the YARN properties: " +
    +                                   "Property " + 
YARN_PROPERTIES_PARALLELISM + " is not an integer.");
    +                   }
    +           }
    +
    +           // get the JobManager address from the YARN properties
    +           String address = 
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
    +           InetSocketAddress jobManagerAddress;
    +           if (address != null) {
    +                   try {
    +                           jobManagerAddress = 
ClientUtils.parseHostPortAddress(address);
    +                           // store address in config from where it is 
retrieved by the retrieval service
    +                           
CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, 
jobManagerAddress);
    +                   }
    +                   catch (Exception e) {
    +                           throw new RuntimeException("YARN properties 
contain an invalid entry for JobManager address.", e);
    +                   }
    +
    +                   logAndSysout("Using JobManager address from YARN 
properties " + jobManagerAddress);
    +           }
     
    -           AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
    -           if (flinkYarnClient == null) {
    -                   return null;
    +           // handle the YARN client's dynamic properties
    +           String dynamicPropertiesEncoded = 
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
    +           Map<String, String> dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
    +           for (Map.Entry<String, String> dynamicProperty : 
dynamicProperties.entrySet()) {
    +                   flinkConfiguration.setString(dynamicProperty.getKey(), 
dynamicProperty.getValue());
                }
     
    +           return true;
    +   }
    +
    +   public YarnClusterDescriptor createDescriptor(String 
defaultApplicationName, CommandLine cmd) {
    +
    +
    +           YarnClusterDescriptor yarnClusterDescriptor = new 
YarnClusterDescriptor();
    --- End diff --
    
    A bit too many blank lines ;)


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-3937
>                 URL: https://issues.apache.org/jira/browse/FLINK-3937
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Sebastian Klemke
>            Assignee: Maximilian Michels
>            Priority: Trivial
>         Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid <yarnApplicationId> option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to