Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/542#discussion_r27452585 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java --- @@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, final ApplicationId appId, this.sessionFilesDir = sessionFilesDir; this.applicationId = appId; this.detached = detached; + this.flinkConfig = flinkConfig; + this.appId = appId; // get one application report manually intialAppReport = yarnClient.getApplicationReport(appId); String jobManagerHost = intialAppReport.getHost(); int jobManagerPort = intialAppReport.getRpcPort(); this.jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort); + } - if(!detached) { - // start actor system - LOG.info("Start actor system."); - InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM - actorSystem = AkkaUtils.createActorSystem(flinkConfig, - new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0))); + /** + * Connect the FlinkYarnCluster to the ApplicationMaster. + * + * Detached YARN sessions don't need to connect to the ApplicationMaster. + * Detached per job YARN sessions need to connect until the required number of TaskManagers have been started. + * + * @throws IOException + */ + public void connectToCluster() throws IOException { + if(isConnected) { + throw new IllegalStateException("Can not connect to the cluster again"); + } - // start application client - LOG.info("Start application client."); + // start actor system + LOG.info("Start actor system."); + InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM + actorSystem = AkkaUtils.createActorSystem(flinkConfig, + new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0))); - applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class), "applicationClient"); + // start application client + LOG.info("Start application client."); - // instruct ApplicationClient to start a periodical status polling - applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); + applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), "applicationClient"); + // instruct ApplicationClient to start a periodical status polling + applicationClient.tell(new Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient); - // add hook to ensure proper shutdown - Runtime.getRuntime().addShutdownHook(clientShutdownHook); - actorRunner = new Thread(new Runnable() { - @Override - public void run() { - // blocks until ApplicationMaster has been stopped - actorSystem.awaitTermination(); + actorRunner = new Thread(new Runnable() { + @Override + public void run() { + // blocks until ApplicationMaster has been stopped + actorSystem.awaitTermination(); - // get final application report - try { - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - - LOG.info("Application " + appId + " finished with state " + appReport - .getYarnApplicationState() + " and final state " + appReport - .getFinalApplicationStatus() + " at " + appReport.getFinishTime()); - - if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() - == YarnApplicationState.KILLED) { - LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics()); - LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve " - + "the full application log using this command:\n" - + "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n" - + "(It sometimes takes a few seconds until the logs are aggregated)"); - } - } catch (Exception e) { - LOG.warn("Error while getting final application report", e); - } + // get final application report + try { + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + + LOG.info("Application " + appId + " finished with state " + appReport + .getYarnApplicationState() + " and final state " + appReport + .getFinalApplicationStatus() + " at " + appReport.getFinishTime()); + + if (appReport.getYarnApplicationState() == YarnApplicationState.FAILED || appReport.getYarnApplicationState() + == YarnApplicationState.KILLED) { + LOG.warn("Application failed. Diagnostics " + appReport.getDiagnostics()); + LOG.warn("If log aggregation is activated in the Hadoop cluster, we recommend to retrieve " + + "the full application log using this command:\n" + + "\tyarn logs -applicationId " + appReport.getApplicationId() + "\n" + + "(It sometimes takes a few seconds until the logs are aggregated)"); } - }); - actorRunner.setDaemon(true); - actorRunner.start(); + } catch (Exception e) { + LOG.warn("Error while getting final application report", e); + } + } --- End diff -- misalign on indentation?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---