Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5215#discussion_r159224695
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
    @@ -743,6 +690,142 @@ private void logAndSysout(String message) {
                System.out.println(message);
        }
     
    +   public static void main(final String[] args) throws Exception {
    +           final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", 
""); // no prefix for the YARN session
    +
    +           final String configurationDirectory = 
CliFrontend.getConfigurationDirectoryFromEnv();
    +
    +           final Configuration flinkConfiguration = 
GlobalConfiguration.loadConfiguration();
    +           SecurityUtils.install(new 
SecurityConfiguration(flinkConfiguration));
    +           int retCode = 
SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
    +                   @Override
    +                   public Integer call() {
    +                           return cli.run(args, flinkConfiguration, 
configurationDirectory);
    +                   }
    +           });
    +           System.exit(retCode);
    +   }
    +
    +   private static void runInteractiveCli(
    +           YarnClusterClient clusterClient,
    +           YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
    +           boolean readConsoleInput) {
    +           try (BufferedReader in = new BufferedReader(new 
InputStreamReader(System.in))) {
    +                   boolean continueRepl = true;
    +                   int numTaskmanagers = 0;
    +                   long unknownStatusSince = System.currentTimeMillis();
    +
    +                   while (continueRepl) {
    +
    +                           final ApplicationStatus applicationStatus = 
yarnApplicationStatusMonitor.getApplicationStatusNow();
    +
    +                           switch (applicationStatus) {
    +                                   case FAILED:
    +                                   case CANCELED:
    +                                           System.err.println("The Flink 
Yarn cluster has failed.");
    +                                           continueRepl = false;
    +                                           break;
    +                                   case UNKNOWN:
    +                                           if (unknownStatusSince < 0L) {
    +                                                   unknownStatusSince = 
System.currentTimeMillis();
    +                                           }
    +
    +                                           if ((System.currentTimeMillis() 
- unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) {
    +                                                   System.err.println("The 
Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
    +                                                   continueRepl = false;
    +                                           } else {
    +                                                   continueRepl = 
repStep(in, readConsoleInput);
    +                                           }
    +                                           break;
    +                                   case SUCCEEDED:
    +                                           if (unknownStatusSince > 0L) {
    +                                                   unknownStatusSince = 
-1L;
    +                                           }
    +
    +                                           // ------------------ check if 
there are updates by the cluster -----------
    +                                           try {
    +                                                   final 
GetClusterStatusResponse status = clusterClient.getClusterStatus();
    +
    +                                                   if (status != null && 
numTaskmanagers != status.numRegisteredTaskManagers()) {
    +                                                           
System.err.println("Number of connected TaskManagers changed to " +
    +                                                                   
status.numRegisteredTaskManagers() + ". " +
    +                                                                   "Slots 
available: " + status.totalNumberOfSlots());
    +                                                           numTaskmanagers 
= status.numRegisteredTaskManagers();
    +                                                   }
    +                                           } catch (Exception e) {
    +                                                   LOG.warn("Could not 
retrieve the current cluster status. Skipping current retrieval attempt ...", 
e);
    +                                           }
    +
    +                                           
printClusterMessages(clusterClient);
    +
    +                                           continueRepl = repStep(in, 
readConsoleInput);
    +                           }
    +                   }
    +           } catch (Exception e) {
    +                   LOG.warn("Exception while running the interactive 
command line interface.", e);
    +           }
    +   }
    +
    +   private static void printClusterMessages(YarnClusterClient 
clusterClient) {
    +           final List<String> messages = clusterClient.getNewMessages();
    +           if (messages != null && messages.size() > 0) {
    --- End diff --
    
    nit: ```if (!messages.isEmpty())``` should suffice because `messages` is 
never null


---

Reply via email to