[FLINK-4486] detached YarnSession: wait until cluster startup is complete

This closes #2423


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cdeb118
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cdeb118
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cdeb118

Branch: refs/heads/release-1.1
Commit: 4cdeb11854956ac6cf1189d7cfa43628fb3be328
Parents: 28da995
Author: Maximilian Michels <m...@apache.org>
Authored: Fri Aug 19 14:57:14 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Mon Aug 29 18:20:00 2016 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  17 ++-
 .../client/program/StandaloneClusterClient.java |   3 +
 .../apache/flink/yarn/YarnClusterClient.java    | 112 ++++++++++---------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   1 +
 4 files changed, 79 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index c3c666b..b8d3400 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -386,7 +386,10 @@ public abstract class ClusterClient {
         * @throws ProgramInvocationException
         */
        public JobExecutionResult run(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
-               LeaderRetrievalService leaderRetrievalService;
+
+               waitForClusterToBeReady();
+
+               final LeaderRetrievalService leaderRetrievalService;
                try {
                        leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig);
                } catch (Exception e) {
@@ -411,8 +414,10 @@ public abstract class ClusterClient {
         * @throws ProgramInvocationException
         */
        public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
-               ActorGateway jobManagerGateway;
 
+               waitForClusterToBeReady();
+
+               final ActorGateway jobManagerGateway;
                try {
                        jobManagerGateway = getJobManagerGateway();
                } catch (Exception e) {
@@ -655,6 +660,14 @@ public abstract class ClusterClient {
        // 
------------------------------------------------------------------------
 
        /**
+        * Blocks until the client has determined that the cluster is ready for 
Job submission.
+        *
+        * This is delayed until right before job submission to report any 
other errors first
+        * (e.g. invalid job definitions/errors in the user jar)
+        */
+       public abstract void waitForClusterToBeReady();
+
+       /**
         * Returns an URL (as a string) to the JobManager web interface
         */
        public abstract String getWebInterfaceURL();

http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
index d25c9d1..3343b69 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java
@@ -41,6 +41,9 @@ public class StandaloneClusterClient extends ClusterClient {
                super(config);
        }
 
+       @Override
+       public void waitForClusterToBeReady() {}
+
 
        @Override
        public String getWebInterfaceURL() {

http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index e76b7e8..75bfeed 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -90,7 +90,8 @@ public class YarnClusterClient extends ClusterClient {
 
        private boolean isConnected = true;
 
-       private final boolean perJobCluster;
+       /** Indicator whether this cluster has just been created */
+       private final boolean newlyCreatedCluster;
 
        /**
         * Create a new Flink on YARN cluster.
@@ -100,7 +101,7 @@ public class YarnClusterClient extends ClusterClient {
         * @param appReport the YARN application ID
         * @param flinkConfig Flink configuration
         * @param sessionFilesDir Location of files required for YARN session
-        * @param perJobCluster Indicator whether this cluster is only created 
for a single job and then shutdown
+        * @param newlyCreatedCluster Indicator whether this cluster has just 
been created
         * @throws IOException
         * @throws YarnException
         */
@@ -110,7 +111,7 @@ public class YarnClusterClient extends ClusterClient {
                final ApplicationReport appReport,
                org.apache.flink.configuration.Configuration flinkConfig,
                Path sessionFilesDir,
-               boolean perJobCluster) throws IOException, YarnException {
+               boolean newlyCreatedCluster) throws IOException, YarnException {
 
                super(flinkConfig);
 
@@ -123,13 +124,13 @@ public class YarnClusterClient extends ClusterClient {
                this.appReport = appReport;
                this.appId = appReport.getApplicationId();
                this.trackingURL = appReport.getTrackingUrl();
-               this.perJobCluster = perJobCluster;
+               this.newlyCreatedCluster = newlyCreatedCluster;
 
-               this.applicationClient = new LazApplicationClientLoader();
+               this.applicationClient = new 
LazApplicationClientLoader(flinkConfig, actorSystemLoader);
 
-               pollingRunner = new PollingThread(yarnClient, appId);
-               pollingRunner.setDaemon(true);
-               pollingRunner.start();
+               this.pollingRunner = new PollingThread(yarnClient, appId);
+               this.pollingRunner.setDaemon(true);
+               this.pollingRunner.start();
 
                Runtime.getRuntime().addShutdownHook(clientShutdownHook);
        }
@@ -199,7 +200,7 @@ public class YarnClusterClient extends ClusterClient {
        @Override
        protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
                if (isDetached()) {
-                       if (perJobCluster) {
+                       if (newlyCreatedCluster) {
                                stopAfterJob(jobGraph.getJobID());
                        }
                        return super.runDetached(jobGraph, classLoader);
@@ -342,8 +343,7 @@ public class YarnClusterClient extends ClusterClient {
         */
        @Override
        public void finalizeCluster() {
-               if (isDetached() || !perJobCluster) {
-                       // only disconnect if we are not running a per job 
cluster
+               if (isDetached() || !newlyCreatedCluster) {
                        disconnect();
                } else {
                        shutdownCluster();
@@ -369,20 +369,16 @@ public class YarnClusterClient extends ClusterClient {
                        // we are already in the shutdown hook
                }
 
-               if(actorSystemLoader.isLoaded()){
-                       LOG.info("Sending shutdown request to the Application 
Master");
-                       if(applicationClient.get() != ActorRef.noSender()) {
-                               try {
-                                       Future<Object> response =
-                                               
Patterns.ask(applicationClient.get(),
-                                                       new 
YarnMessages.LocalStopYarnSession(getApplicationStatus(),
-                                                                       "Flink 
YARN Client requested shutdown"),
-                                                       new 
Timeout(akkaDuration));
-                                       Await.ready(response, akkaDuration);
-                               } catch(Exception e) {
-                                       LOG.warn("Error while stopping YARN 
Application Client", e);
-                               }
-                       }
+               LOG.info("Sending shutdown request to the Application Master");
+               try {
+                       Future<Object> response =
+                               Patterns.ask(applicationClient.get(),
+                                       new 
YarnMessages.LocalStopYarnSession(getApplicationStatus(),
+                                                       "Flink YARN Client 
requested shutdown"),
+                                       new Timeout(akkaDuration));
+                       Await.ready(response, akkaDuration);
+               } catch(Exception e) {
+                       LOG.warn("Error while stopping YARN cluster.", e);
                }
 
                try {
@@ -518,14 +514,52 @@ public class YarnClusterClient extends ClusterClient {
                return super.isDetached() || clusterDescriptor.isDetachedMode();
        }
 
+       /**
+        * Blocks until all TaskManagers are connected to the JobManager.
+        */
+       @Override
+       public void waitForClusterToBeReady() {
+               logAndSysout("Waiting until all TaskManagers have connected");
+
+               for (GetClusterStatusResponse currentStatus, lastStatus = 
null;; lastStatus = currentStatus) {
+                       currentStatus = getClusterStatus();
+                       if (currentStatus != null && 
!currentStatus.equals(lastStatus)) {
+                               logAndSysout("TaskManager status (" + 
currentStatus.numRegisteredTaskManagers() + "/"
+                                       + 
clusterDescriptor.getTaskManagerCount() + ")");
+                               if (currentStatus.numRegisteredTaskManagers() 
>= clusterDescriptor.getTaskManagerCount()) {
+                                       logAndSysout("All TaskManagers are 
connected");
+                                       break;
+                               }
+                       } else if (lastStatus == null) {
+                               logAndSysout("No status updates from the YARN 
cluster received so far. Waiting ...");
+                       }
+
+                       try {
+                               Thread.sleep(250);
+                       } catch (InterruptedException e) {
+                               throw new RuntimeException("Interrupted while 
waiting for TaskManagers", e);
+                       }
+               }
+       }
+
        public ApplicationId getApplicationId() {
                return appId;
        }
 
-       protected class LazApplicationClientLoader {
+       private static class LazApplicationClientLoader {
+
+               private final org.apache.flink.configuration.Configuration 
flinkConfig;
+               private final LazyActorSystemLoader actorSystemLoader;
 
                private ActorRef applicationClient;
 
+               private LazApplicationClientLoader(
+                               org.apache.flink.configuration.Configuration 
flinkConfig,
+                               LazyActorSystemLoader actorSystemLoader) {
+                       this.flinkConfig = flinkConfig;
+                       this.actorSystemLoader = actorSystemLoader;
+               }
+
                /**
                 * Creates a new ApplicationClient actor or returns an existing 
one. May start an ActorSystem.
                 * @return ActorSystem
@@ -549,32 +583,6 @@ public class YarnClusterClient extends ClusterClient {
                                                flinkConfig,
                                                leaderRetrievalService),
                                        "applicationClient");
-
-                               if (perJobCluster) {
-
-                                       logAndSysout("Waiting until all 
TaskManagers have connected");
-
-                                       for (GetClusterStatusResponse 
currentStatus, lastStatus = null;; lastStatus = currentStatus) {
-                                               currentStatus = 
getClusterStatus();
-                                               if (currentStatus != null && 
!currentStatus.equals(lastStatus)) {
-                                                       
logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() 
+ "/"
-                                                               + 
clusterDescriptor.getTaskManagerCount() + ")");
-                                                       if 
(currentStatus.numRegisteredTaskManagers() >= 
clusterDescriptor.getTaskManagerCount()) {
-                                                               
logAndSysout("All TaskManagers are connected");
-                                                               break;
-                                                       }
-                                               } else if (lastStatus == null) {
-                                                       logAndSysout("No status 
updates from the YARN cluster received so far. Waiting ...");
-                                               }
-
-                                               try {
-                                                       Thread.sleep(250);
-                                               } catch (InterruptedException 
e) {
-                                                       LOG.error("Interrupted 
while waiting for TaskManagers");
-                                                       throw new 
RuntimeException("Interrupted while waiting for TaskManagers", e);
-                                               }
-                                       }
-                               }
                        }
 
                        return applicationClient;

http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index bee6a7a..28d8fb8 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -628,6 +628,7 @@ public class FlinkYarnSessionCli implements 
CustomCommandLine<YarnClusterClient>
                                                "yarn application -kill " + 
yarnCluster.getApplicationId() + System.lineSeparator() +
                                                "Please also note that the 
temporary files of the YARN session in {} will not be removed.",
                                                
yarnDescriptor.getSessionFilesDir());
+                               yarnCluster.waitForClusterToBeReady();
                                yarnCluster.disconnect();
                        } else {
                                runInteractiveCli(yarnCluster, 
acceptInteractiveInput);

Reply via email to