[ https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308120#comment-16308120 ]
ASF GitHub Bot commented on FLINK-8328: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5215#discussion_r159224695 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -743,6 +690,142 @@ private void logAndSysout(String message) { System.out.println(message); } + public static void main(final String[] args) throws Exception { + final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session + + final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv(); + + final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(); + SecurityUtils.install(new SecurityConfiguration(flinkConfiguration)); + int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() { + @Override + public Integer call() { + return cli.run(args, flinkConfiguration, configurationDirectory); + } + }); + System.exit(retCode); + } + + private static void runInteractiveCli( + YarnClusterClient clusterClient, + YarnApplicationStatusMonitor yarnApplicationStatusMonitor, + boolean readConsoleInput) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) { + boolean continueRepl = true; + int numTaskmanagers = 0; + long unknownStatusSince = System.currentTimeMillis(); + + while (continueRepl) { + + final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow(); + + switch (applicationStatus) { + case FAILED: + case CANCELED: + System.err.println("The Flink Yarn cluster has failed."); + continueRepl = false; + break; + case UNKNOWN: + if (unknownStatusSince < 0L) { + unknownStatusSince = System.currentTimeMillis(); + } + + if ((System.currentTimeMillis() - unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) { + System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster."); + continueRepl = false; + } else { + continueRepl = repStep(in, readConsoleInput); + } + break; + case SUCCEEDED: + if (unknownStatusSince > 0L) { + unknownStatusSince = -1L; + } + + // ------------------ check if there are updates by the cluster ----------- + try { + final GetClusterStatusResponse status = clusterClient.getClusterStatus(); + + if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to " + + status.numRegisteredTaskManagers() + ". " + + "Slots available: " + status.totalNumberOfSlots()); + numTaskmanagers = status.numRegisteredTaskManagers(); + } + } catch (Exception e) { + LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e); + } + + printClusterMessages(clusterClient); + + continueRepl = repStep(in, readConsoleInput); + } + } + } catch (Exception e) { + LOG.warn("Exception while running the interactive command line interface.", e); + } + } + + private static void printClusterMessages(YarnClusterClient clusterClient) { + final List<String> messages = clusterClient.getNewMessages(); + if (messages != null && messages.size() > 0) { --- End diff -- nit: ```if (!messages.isEmpty())``` should suffice because `messages` is never null > Pull Yarn ApplicationStatus polling out of YarnClusterClient > ------------------------------------------------------------ > > Key: FLINK-8328 > URL: https://issues.apache.org/jira/browse/FLINK-8328 > Project: Flink > Issue Type: Sub-task > Components: Client > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to > pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}. > I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has > also the benefit of separating concerns better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)