[ https://issues.apache.org/jira/browse/FLINK-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316528#comment-16316528 ]
ASF GitHub Bot commented on FLINK-8343: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5229#discussion_r160166040 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -534,215 +557,235 @@ private Configuration applyYarnProperties(Configuration configuration) throws Fl return effectiveConfiguration; } - public int run( - String[] args, - Configuration configuration, - String configurationDirectory) { + public int run(String[] args) throws CliArgsException, FlinkException { // // Command Line Options // - Options options = new Options(); - addGeneralOptions(options); - addRunOptions(options); + final CommandLine cmd = parseCommandLineOptions(args, true); - CommandLineParser parser = new PosixParser(); - CommandLine cmd; - try { - cmd = parser.parse(options, args); - } catch (Exception e) { - System.out.println(e.getMessage()); - printUsage(); - return 1; - } + final AbstractYarnClusterDescriptor yarnClusterDescriptor = createClusterDescriptor(cmd); - // Query cluster for metrics - if (cmd.hasOption(query.getOpt())) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); - String description; - try { - description = yarnDescriptor.getClusterDescription(); - } catch (Exception e) { - System.err.println("Error while querying the YARN cluster for available resources: " + e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - System.out.println(description); - return 0; - } else if (cmd.hasOption(applicationId.getOpt())) { + try { + // Query cluster for metrics + if (cmd.hasOption(query.getOpt())) { + final String description = yarnClusterDescriptor.getClusterDescription(); + System.out.println(description); + return 0; + } else { + final ClusterClient clusterClient; + final ApplicationId yarnApplicationId; - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor( - configuration, - configurationDirectory, - cmd.hasOption(flip6.getOpt())); + if (cmd.hasOption(applicationId.getOpt())) { + yarnApplicationId = ConverterUtils.toApplicationId(cmd.getOptionValue(applicationId.getOpt())); - //configure ZK namespace depending on the value passed - String zkNamespace = cmd.hasOption(zookeeperNamespace.getOpt()) ? - cmd.getOptionValue(zookeeperNamespace.getOpt()) - : yarnDescriptor.getFlinkConfiguration() - .getString(HA_CLUSTER_ID, cmd.getOptionValue(applicationId.getOpt())); - LOG.info("Going to use the ZK namespace: {}", zkNamespace); - yarnDescriptor.getFlinkConfiguration().setString(HA_CLUSTER_ID, zkNamespace); + clusterClient = yarnClusterDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt())); + } else { + final ClusterSpecification clusterSpecification = getClusterSpecification(cmd); - try { - yarnCluster = yarnDescriptor.retrieve(cmd.getOptionValue(applicationId.getOpt())); - } catch (Exception e) { - throw new RuntimeException("Could not retrieve existing Yarn application", e); - } + clusterClient = yarnClusterDescriptor.deploySessionCluster(clusterSpecification); - if (detachedMode) { - LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + - "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill " + applicationId.getOpt()); - yarnCluster.disconnect(); - } else { - ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); - - try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor( - yarnDescriptor.getYarnClient(), - yarnCluster.getApplicationId(), - new ScheduledExecutorServiceAdapter(scheduledExecutorService))) { - runInteractiveCli( - yarnCluster, - yarnApplicationStatusMonitor, - true); - } catch (Exception e) { - LOG.info("Could not properly close the Yarn application status monitor.", e); - } finally { - // shut down the scheduled executor service - ExecutorUtils.gracefulShutdown( - 1000L, - TimeUnit.MILLISECONDS, - scheduledExecutorService); - } - } - } else { + //------------------ ClusterClient deployed, handle connection details + yarnApplicationId = ConverterUtils.toApplicationId(clusterClient.getClusterIdentifier()); - try (AbstractYarnClusterDescriptor yarnDescriptor = createClusterDescriptor(cmd)){ - final ClusterSpecification clusterSpecification; + String jobManagerAddress = + clusterClient.getJobManagerAddress().getAddress().getHostName() + + ':' + clusterClient.getJobManagerAddress().getPort(); - try { - clusterSpecification = getClusterSpecification(cmd); - } catch (FlinkException e) { - System.err.println("Error while creating the cluster specification: " + e.getMessage()); - e.printStackTrace(); - return 1; - } + System.out.println("Flink JobManager is now running on " + jobManagerAddress); + System.out.println("JobManager Web Interface: " + clusterClient.getWebInterfaceURL()); - try { - yarnCluster = yarnDescriptor.deploySessionCluster(clusterSpecification); - } catch (Exception e) { - System.err.println("Error while deploying YARN cluster: " + e.getMessage()); - e.printStackTrace(System.err); - return 1; - } - //------------------ ClusterClient deployed, handle connection details - String jobManagerAddress = - yarnCluster.getJobManagerAddress().getAddress().getHostName() + - ":" + yarnCluster.getJobManagerAddress().getPort(); - - System.out.println("Flink JobManager is now running on " + jobManagerAddress); - System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); - - // file that we write into the conf/ dir containing the jobManager address and the dop. - File yarnPropertiesFile = getYarnPropertiesLocation(configuration.getValue(YarnConfigOptions.PROPERTIES_FILE_LOCATION)); - - Properties yarnProps = new Properties(); - yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString()); - if (clusterSpecification.getSlotsPerTaskManager() != -1) { - String parallelism = - Integer.toString(clusterSpecification.getSlotsPerTaskManager() * clusterSpecification.getNumberTaskManagers()); - yarnProps.setProperty(YARN_PROPERTIES_PARALLELISM, parallelism); + writeYarnPropertiesFile( + yarnApplicationId, + clusterSpecification.getNumberTaskManagers() * clusterSpecification.getSlotsPerTaskManager(), + yarnClusterDescriptor.getDynamicPropertiesEncoded()); } - // add dynamic properties - if (yarnDescriptor.getDynamicPropertiesEncoded() != null) { - yarnProps.setProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, - yarnDescriptor.getDynamicPropertiesEncoded()); - } - writeYarnProperties(yarnProps, yarnPropertiesFile); - - //------------------ ClusterClient running, let user control it ------------ if (detachedMode) { - // print info and quit: LOG.info("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill " + yarnCluster.getApplicationId()); - yarnCluster.waitForClusterToBeReady(); - yarnCluster.disconnect(); + "yarn application -kill " + applicationId.getOpt()); } else { - ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1); --- End diff -- I would prefer `Executors.newSingleThreadScheduledExecutor();` because the `coreSize` will always be `1`. > Add support for job cluster deployment > -------------------------------------- > > Key: FLINK-8343 > URL: https://issues.apache.org/jira/browse/FLINK-8343 > Project: Flink > Issue Type: Sub-task > Components: Client > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > For Flip-6 we have to enable a different job cluster deployment. The > difference is that we directly submit the job when we deploy the Flink > cluster instead of following a two step approach (first deployment and then > submission). -- This message was sent by Atlassian JIRA (v6.4.14#64029)