[FLINK-4486] detached YarnSession: wait until cluster startup is complete This closes #2423
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4cdeb118 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4cdeb118 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4cdeb118 Branch: refs/heads/release-1.1 Commit: 4cdeb11854956ac6cf1189d7cfa43628fb3be328 Parents: 28da995 Author: Maximilian Michels <m...@apache.org> Authored: Fri Aug 19 14:57:14 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Mon Aug 29 18:20:00 2016 +0200 ---------------------------------------------------------------------- .../flink/client/program/ClusterClient.java | 17 ++- .../client/program/StandaloneClusterClient.java | 3 + .../apache/flink/yarn/YarnClusterClient.java | 112 ++++++++++--------- .../flink/yarn/cli/FlinkYarnSessionCli.java | 1 + 4 files changed, 79 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index c3c666b..b8d3400 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -386,7 +386,10 @@ public abstract class ClusterClient { * @throws ProgramInvocationException */ public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - LeaderRetrievalService leaderRetrievalService; + + waitForClusterToBeReady(); + + final LeaderRetrievalService leaderRetrievalService; try { leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(flinkConfig); } catch (Exception e) { @@ -411,8 +414,10 @@ public abstract class ClusterClient { * @throws ProgramInvocationException */ public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { - ActorGateway jobManagerGateway; + waitForClusterToBeReady(); + + final ActorGateway jobManagerGateway; try { jobManagerGateway = getJobManagerGateway(); } catch (Exception e) { @@ -655,6 +660,14 @@ public abstract class ClusterClient { // ------------------------------------------------------------------------ /** + * Blocks until the client has determined that the cluster is ready for Job submission. + * + * This is delayed until right before job submission to report any other errors first + * (e.g. invalid job definitions/errors in the user jar) + */ + public abstract void waitForClusterToBeReady(); + + /** * Returns an URL (as a string) to the JobManager web interface */ public abstract String getWebInterfaceURL(); http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java index d25c9d1..3343b69 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StandaloneClusterClient.java @@ -41,6 +41,9 @@ public class StandaloneClusterClient extends ClusterClient { super(config); } + @Override + public void waitForClusterToBeReady() {} + @Override public String getWebInterfaceURL() { http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index e76b7e8..75bfeed 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -90,7 +90,8 @@ public class YarnClusterClient extends ClusterClient { private boolean isConnected = true; - private final boolean perJobCluster; + /** Indicator whether this cluster has just been created */ + private final boolean newlyCreatedCluster; /** * Create a new Flink on YARN cluster. @@ -100,7 +101,7 @@ public class YarnClusterClient extends ClusterClient { * @param appReport the YARN application ID * @param flinkConfig Flink configuration * @param sessionFilesDir Location of files required for YARN session - * @param perJobCluster Indicator whether this cluster is only created for a single job and then shutdown + * @param newlyCreatedCluster Indicator whether this cluster has just been created * @throws IOException * @throws YarnException */ @@ -110,7 +111,7 @@ public class YarnClusterClient extends ClusterClient { final ApplicationReport appReport, org.apache.flink.configuration.Configuration flinkConfig, Path sessionFilesDir, - boolean perJobCluster) throws IOException, YarnException { + boolean newlyCreatedCluster) throws IOException, YarnException { super(flinkConfig); @@ -123,13 +124,13 @@ public class YarnClusterClient extends ClusterClient { this.appReport = appReport; this.appId = appReport.getApplicationId(); this.trackingURL = appReport.getTrackingUrl(); - this.perJobCluster = perJobCluster; + this.newlyCreatedCluster = newlyCreatedCluster; - this.applicationClient = new LazApplicationClientLoader(); + this.applicationClient = new LazApplicationClientLoader(flinkConfig, actorSystemLoader); - pollingRunner = new PollingThread(yarnClient, appId); - pollingRunner.setDaemon(true); - pollingRunner.start(); + this.pollingRunner = new PollingThread(yarnClient, appId); + this.pollingRunner.setDaemon(true); + this.pollingRunner.start(); Runtime.getRuntime().addShutdownHook(clientShutdownHook); } @@ -199,7 +200,7 @@ public class YarnClusterClient extends ClusterClient { @Override protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { - if (perJobCluster) { + if (newlyCreatedCluster) { stopAfterJob(jobGraph.getJobID()); } return super.runDetached(jobGraph, classLoader); @@ -342,8 +343,7 @@ public class YarnClusterClient extends ClusterClient { */ @Override public void finalizeCluster() { - if (isDetached() || !perJobCluster) { - // only disconnect if we are not running a per job cluster + if (isDetached() || !newlyCreatedCluster) { disconnect(); } else { shutdownCluster(); @@ -369,20 +369,16 @@ public class YarnClusterClient extends ClusterClient { // we are already in the shutdown hook } - if(actorSystemLoader.isLoaded()){ - LOG.info("Sending shutdown request to the Application Master"); - if(applicationClient.get() != ActorRef.noSender()) { - try { - Future<Object> response = - Patterns.ask(applicationClient.get(), - new YarnMessages.LocalStopYarnSession(getApplicationStatus(), - "Flink YARN Client requested shutdown"), - new Timeout(akkaDuration)); - Await.ready(response, akkaDuration); - } catch(Exception e) { - LOG.warn("Error while stopping YARN Application Client", e); - } - } + LOG.info("Sending shutdown request to the Application Master"); + try { + Future<Object> response = + Patterns.ask(applicationClient.get(), + new YarnMessages.LocalStopYarnSession(getApplicationStatus(), + "Flink YARN Client requested shutdown"), + new Timeout(akkaDuration)); + Await.ready(response, akkaDuration); + } catch(Exception e) { + LOG.warn("Error while stopping YARN cluster.", e); } try { @@ -518,14 +514,52 @@ public class YarnClusterClient extends ClusterClient { return super.isDetached() || clusterDescriptor.isDetachedMode(); } + /** + * Blocks until all TaskManagers are connected to the JobManager. + */ + @Override + public void waitForClusterToBeReady() { + logAndSysout("Waiting until all TaskManagers have connected"); + + for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) { + currentStatus = getClusterStatus(); + if (currentStatus != null && !currentStatus.equals(lastStatus)) { + logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/" + + clusterDescriptor.getTaskManagerCount() + ")"); + if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) { + logAndSysout("All TaskManagers are connected"); + break; + } + } else if (lastStatus == null) { + logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); + } + + try { + Thread.sleep(250); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting for TaskManagers", e); + } + } + } + public ApplicationId getApplicationId() { return appId; } - protected class LazApplicationClientLoader { + private static class LazApplicationClientLoader { + + private final org.apache.flink.configuration.Configuration flinkConfig; + private final LazyActorSystemLoader actorSystemLoader; private ActorRef applicationClient; + private LazApplicationClientLoader( + org.apache.flink.configuration.Configuration flinkConfig, + LazyActorSystemLoader actorSystemLoader) { + this.flinkConfig = flinkConfig; + this.actorSystemLoader = actorSystemLoader; + } + /** * Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem. * @return ActorSystem @@ -549,32 +583,6 @@ public class YarnClusterClient extends ClusterClient { flinkConfig, leaderRetrievalService), "applicationClient"); - - if (perJobCluster) { - - logAndSysout("Waiting until all TaskManagers have connected"); - - for (GetClusterStatusResponse currentStatus, lastStatus = null;; lastStatus = currentStatus) { - currentStatus = getClusterStatus(); - if (currentStatus != null && !currentStatus.equals(lastStatus)) { - logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/" - + clusterDescriptor.getTaskManagerCount() + ")"); - if (currentStatus.numRegisteredTaskManagers() >= clusterDescriptor.getTaskManagerCount()) { - logAndSysout("All TaskManagers are connected"); - break; - } - } else if (lastStatus == null) { - logAndSysout("No status updates from the YARN cluster received so far. Waiting ..."); - } - - try { - Thread.sleep(250); - } catch (InterruptedException e) { - LOG.error("Interrupted while waiting for TaskManagers"); - throw new RuntimeException("Interrupted while waiting for TaskManagers", e); - } - } - } } return applicationClient; http://git-wip-us.apache.org/repos/asf/flink/blob/4cdeb118/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index bee6a7a..28d8fb8 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -628,6 +628,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient> "yarn application -kill " + yarnCluster.getApplicationId() + System.lineSeparator() + "Please also note that the temporary files of the YARN session in {} will not be removed.", yarnDescriptor.getSessionFilesDir()); + yarnCluster.waitForClusterToBeReady(); yarnCluster.disconnect(); } else { runInteractiveCli(yarnCluster, acceptInteractiveInput);