[ 
https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316528#comment-16316528
 ] 

ASF GitHub Bot commented on FLINK-8343:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5229#discussion_r160166040
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -534,215 +557,235 @@ private Configuration 
applyYarnProperties(Configuration configuration) throws Fl
                return effectiveConfiguration;
        }
     
    -   public int run(
    -                   String[] args,
    -                   Configuration configuration,
    -                   String configurationDirectory) {
    +   public int run(String[] args) throws CliArgsException, FlinkException {
                //
                //      Command Line Options
                //
    -           Options options = new Options();
    -           addGeneralOptions(options);
    -           addRunOptions(options);
    +           final CommandLine cmd = parseCommandLineOptions(args, true);
     
    -           CommandLineParser parser = new PosixParser();
    -           CommandLine cmd;
    -           try {
    -                   cmd = parser.parse(options, args);
    -           } catch (Exception e) {
    -                   System.out.println(e.getMessage());
    -                   printUsage();
    -                   return 1;
    -           }
    +           final AbstractYarnClusterDescriptor yarnClusterDescriptor = 
createClusterDescriptor(cmd);
     
    -           // Query cluster for metrics
    -           if (cmd.hasOption(query.getOpt())) {
    -                   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
    -                           configuration,
    -                           configurationDirectory,
    -                           cmd.hasOption(flip6.getOpt()));
    -                   String description;
    -                   try {
    -                           description = 
yarnDescriptor.getClusterDescription();
    -                   } catch (Exception e) {
    -                           System.err.println("Error while querying the 
YARN cluster for available resources: " + e.getMessage());
    -                           e.printStackTrace(System.err);
    -                           return 1;
    -                   }
    -                   System.out.println(description);
    -                   return 0;
    -           } else if (cmd.hasOption(applicationId.getOpt())) {
    +           try {
    +                   // Query cluster for metrics
    +                   if (cmd.hasOption(query.getOpt())) {
    +                           final String description = 
yarnClusterDescriptor.getClusterDescription();
    +                           System.out.println(description);
    +                           return 0;
    +                   } else {
    +                           final ClusterClient clusterClient;
    +                           final ApplicationId yarnApplicationId;
     
    -                   AbstractYarnClusterDescriptor yarnDescriptor = 
getClusterDescriptor(
    -                           configuration,
    -                           configurationDirectory,
    -                           cmd.hasOption(flip6.getOpt()));
    +                           if (cmd.hasOption(applicationId.getOpt())) {
    +                                   yarnApplicationId = 
ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt()));
     
    -                   //configure ZK namespace depending on the value passed
    -                   String zkNamespace = 
cmd.hasOption(zookeeperNamespace.getOpt()) ?
    -                                                                   
cmd.getOptionValue(zookeeperNamespace.getOpt())
    -                                                                   : 
yarnDescriptor.getFlinkConfiguration()
    -                                                                   
.getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt()));
    -                   LOG.info("Going to use the ZK namespace: {}", 
zkNamespace);
    -                   
yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace);
    +                                   clusterClient = 
yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
    +                           } else {
    +                                   final ClusterSpecification 
clusterSpecification = getClusterSpecification(cmd);
     
    -                   try {
    -                           yarnCluster = 
yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt()));
    -                   } catch (Exception e) {
    -                           throw new RuntimeException("Could not retrieve 
existing Yarn application", e);
    -                   }
    +                                   clusterClient = 
yarnClusterDescriptor.deploySessionCluster(clusterSpecification);
     
    -                   if (detachedMode) {
    -                           LOG.info("The Flink YARN client has been 
started in detached mode. In order to stop " +
    -                                   "Flink on YARN, use the following 
command or a YARN web interface to stop it:\n" +
    -                                   "yarn application -kill " + 
applicationId.getOpt());
    -                           yarnCluster.disconnect();
    -                   } else {
    -                           ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    -
    -                           try (YarnApplicationStatusMonitor 
yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
    -                                           yarnDescriptor.getYarnClient(),
    -                                           yarnCluster.getApplicationId(),
    -                                           new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
    -                                   runInteractiveCli(
    -                                           yarnCluster,
    -                                           yarnApplicationStatusMonitor,
    -                                           true);
    -                           } catch (Exception e) {
    -                                   LOG.info("Could not properly close the 
Yarn application status monitor.", e);
    -                           } finally {
    -                                   // shut down the scheduled executor 
service
    -                                   ExecutorUtils.gracefulShutdown(
    -                                           1000L,
    -                                           TimeUnit.MILLISECONDS,
    -                                           scheduledExecutorService);
    -                           }
    -                   }
    -           } else {
    +                                   //------------------ ClusterClient 
deployed, handle connection details
    +                                   yarnApplicationId = 
ConverterUtils.toApplicationId(clusterClient.getClusterIdentifier());
     
    -                   try (AbstractYarnClusterDescriptor yarnDescriptor = 
createClusterDescriptor(cmd)){
    -                           final ClusterSpecification clusterSpecification;
    +                                   String jobManagerAddress =
    +                                           
clusterClient.getJobManagerAddress().getAddress().getHostName() +
    +                                                   ':' + 
clusterClient.getJobManagerAddress().getPort();
     
    -                           try {
    -                                   clusterSpecification = 
getClusterSpecification(cmd);
    -                           } catch (FlinkException e) {
    -                                   System.err.println("Error while 
creating the cluster specification: " + e.getMessage());
    -                                   e.printStackTrace();
    -                                   return 1;
    -                           }
    +                                   System.out.println("Flink JobManager is 
now running on " + jobManagerAddress);
    +                                   System.out.println("JobManager Web 
Interface: " + clusterClient.getWebInterfaceURL());
     
    -                           try {
    -                                   yarnCluster = 
yarnDescriptor.deploySessionCluster(clusterSpecification);
    -                           } catch (Exception e) {
    -                                   System.err.println("Error while 
deploying YARN cluster: " + e.getMessage());
    -                                   e.printStackTrace(System.err);
    -                                   return 1;
    -                           }
    -                           //------------------ ClusterClient deployed, 
handle connection details
    -                           String jobManagerAddress =
    -                                   
yarnCluster.getJobManagerAddress().getAddress().getHostName() +
    -                                           ":" + 
yarnCluster.getJobManagerAddress().getPort();
    -
    -                           System.out.println("Flink JobManager is now 
running on " + jobManagerAddress);
    -                           System.out.println("JobManager Web Interface: " 
+ yarnCluster.getWebInterfaceURL());
    -
    -                           // file that we write into the conf/ dir 
containing the jobManager address and the dop.
    -                           File yarnPropertiesFile = 
getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION));
    -
    -                           Properties yarnProps = new Properties();
    -                           yarnProps.setProperty(YARN_APPLICATION_ID_KEY, 
yarnCluster.getApplicationId().toString());
    -                           if 
(clusterSpecification.getSlotsPerTaskManager() != -1) {
    -                                   String parallelism =
    -                                           
Integer.toString(clusterSpecification.getSlotsPerTaskManager() * 
clusterSpecification.getNumberTaskManagers());
    -                                   
yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism);
    +                                   writeYarnPropertiesFile(
    +                                           yarnApplicationId,
    +                                           
clusterSpecification.getNumberTaskManagers() * 
clusterSpecification.getSlotsPerTaskManager(),
    +                                           
yarnClusterDescriptor.getDynamicPropertiesEncoded());
                                }
    -                           // add dynamic properties
    -                           if 
(yarnDescriptor.getDynamicPropertiesEncoded() != null) {
    -                                   
yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
    -                                           
yarnDescriptor.getDynamicPropertiesEncoded());
    -                           }
    -                           writeYarnProperties(yarnProps, 
yarnPropertiesFile);
    -
    -                           //------------------ ClusterClient running, let 
user control it ------------
     
                                if (detachedMode) {
    -                                   // print info and quit:
                                        LOG.info("The Flink YARN client has 
been started in detached mode. In order to stop " +
                                                "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
    -                                           "yarn application -kill " + 
yarnCluster.getApplicationId());
    -                                   yarnCluster.waitForClusterToBeReady();
    -                                   yarnCluster.disconnect();
    +                                           "yarn application -kill " + 
applicationId.getOpt());
                                } else {
    -
                                        ScheduledThreadPoolExecutor 
scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
    --- End diff --
    
    I would prefer `Executors.newSingleThreadScheduledExecutor();` because the 
`coreSize` will always be `1`.


> Add support for job cluster deployment
> --------------------------------------
>
>                 Key: FLINK-8343
>                 URL: https://issues.apache.org/jira/browse/FLINK-8343
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> For Flip-6 we have to enable a different job cluster deployment. The 
> difference is that we directly submit the job when we deploy the Flink 
> cluster instead of following a two step approach (first deployment and then 
> submission).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to