[FLINK-2226][YARN] fail application on failed single-job cluster job Failing jobs executed in the YARN cluster mode leave the application container in the "SUCCEEDED" final state. While for long-running Flink YARN clusters where multiple jobs are run, this is fine, for single jobs it is appropriate to mark the application as failed.
This closes #838. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44b969e0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44b969e0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44b969e0 Branch: refs/heads/release-0.9 Commit: 44b969e0a58347f0262bfc5bc9626362078b373b Parents: 57810b5 Author: Maximilian Michels <m...@apache.org> Authored: Mon Jun 15 16:37:35 2015 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Jun 17 00:51:16 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 8 ++--- .../flink/client/FlinkYarnSessionCli.java | 6 ++-- .../org/apache/flink/client/program/Client.java | 1 + .../flink/runtime/jobgraph/JobStatus.java | 2 +- .../runtime/yarn/AbstractFlinkYarnCluster.java | 2 +- .../flink/yarn/YARNSessionFIFOITCase.java | 2 +- .../org/apache/flink/yarn/FlinkYarnClient.java | 4 +-- .../org/apache/flink/yarn/FlinkYarnCluster.java | 32 +++++++++++++------- .../apache/flink/yarn/ApplicationClient.scala | 2 +- .../flink/yarn/ApplicationMasterActor.scala | 10 ++++-- 10 files changed, 43 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 3e61a3b..e1bacde 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -265,6 +265,7 @@ public class CliFrontend { return handleError(t); } + int exitCode = 1; try { int userParallelism = options.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); @@ -276,15 +277,14 @@ public class CliFrontend { "To use another parallelism, set it at the ./bin/flink client."); userParallelism = client.getMaxSlots(); } - int exitCode = 0; // check if detached per job yarn cluster is used to start flink if(yarnCluster != null && yarnCluster.isDetached()) { logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill "+yarnCluster.getApplicationId()+"\n" + + "yarn application -kill " + yarnCluster.getApplicationId() + "\n" + "Please also note that the temporary files of the YARN session in the home directoy will not be removed."); - executeProgram(program, client, userParallelism, false); + exitCode = executeProgram(program, client, userParallelism, false); } else { // regular (blocking) execution. exitCode = executeProgram(program, client, userParallelism, true); @@ -314,7 +314,7 @@ public class CliFrontend { finally { if (yarnCluster != null && !yarnCluster.isDetached()) { logAndSysout("Shutting down YARN cluster"); - yarnCluster.shutdown(); + yarnCluster.shutdown(exitCode != 0); } if (program != null) { program.deleteExtractedLibraries(); http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 0fa7173..c11edc7 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -302,7 +302,7 @@ public class FlinkYarnSessionCli { if (yarnCluster.hasFailed()) { System.err.println("The YARN cluster has failed"); - yarnCluster.shutdown(); + yarnCluster.shutdown(true); } // wait until CLIENT_POLLING_INTERVALL is over or the user entered something. @@ -439,7 +439,7 @@ public class FlinkYarnSessionCli { if (!yarnCluster.hasBeenStopped()) { LOG.info("Command Line Interface requested session shutdown"); - yarnCluster.shutdown(); + yarnCluster.shutdown(false); } try { @@ -458,7 +458,7 @@ public class FlinkYarnSessionCli { public void stop() { if (yarnCluster != null) { LOG.info("Command line interface is shutting down the yarnCluster"); - yarnCluster.shutdown(); + yarnCluster.shutdown(false); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index e219a38..c544e8d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -318,6 +318,7 @@ public class Client { ContextEnvironment.enableLocalExecution(true); } + // Job id has been set in the Client passed to the ContextEnvironment return new JobSubmissionResult(lastJobId); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java index 667a68e..eb7d017 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java @@ -51,7 +51,7 @@ public enum JobStatus { private final boolean terminalState; - private JobStatus(boolean terminalState) { + JobStatus(boolean terminalState) { this.terminalState = terminalState; } http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java index 398709e..c2e897f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java @@ -30,7 +30,7 @@ public abstract class AbstractFlinkYarnCluster { public abstract String getWebInterfaceURL(); - public abstract void shutdown(); + public abstract void shutdown(boolean failApplication); public abstract boolean hasBeenStopped(); http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index ccde5d8..dd32b0d 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -647,7 +647,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase { LOG.info("Shutting down cluster. All tests passed"); // shutdown cluster - yarnCluster.shutdown(); + yarnCluster.shutdown(false); LOG.info("Finished testJavaAPI()"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java index 118f4ad..f82f013 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java @@ -435,11 +435,11 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient { + "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC); } - if( taskManagerMemoryMb > freeClusterMem.containerLimit) { + if(taskManagerMemoryMb > freeClusterMem.containerLimit) { LOG.warn("The requested amount of memory for the TaskManagers ("+taskManagerMemoryMb+"MB) is more than " + "the largest possible YARN container: "+freeClusterMem.containerLimit + NOTE_RSC); } - if( jobManagerMemoryMb > freeClusterMem.containerLimit) { + if(jobManagerMemoryMb > freeClusterMem.containerLimit) { LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than " + "the largest possible YARN container: " + freeClusterMem.containerLimit + NOTE_RSC); } http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java index 6dd84d6..e408edb 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -297,12 +297,12 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { } ApplicationReport lastReport = pollingRunner.getLastReport(); if(lastReport == null) { - LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster. that didn't receive a status so far." + + LOG.warn("FlinkYarnCluster.hasFailed() has been called on a cluster that didn't receive a status so far." + "The system might be in an erroneous state"); return false; } else { YarnApplicationState appState = lastReport.getYarnApplicationState(); - boolean status= (appState == YarnApplicationState.FAILED || + boolean status = (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED); if(status) { LOG.warn("YARN reported application state {}", appState); @@ -381,12 +381,13 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { // -------------------------- Shutdown handling ------------------------ private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false); - @Override - public void shutdown() { - shutdownInternal(true); - } - private void shutdownInternal(boolean removeShutdownHook) { + /** + * Shutdown the YARN cluster. + * @param failApplication whether we should fail the YARN application (in case of errors in Flink) + */ + @Override + public void shutdown(boolean failApplication) { if(!isConnected) { throw new IllegalStateException("The cluster has been connected to the ApplicationMaster."); } @@ -394,16 +395,25 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { if(hasBeenShutDown.getAndSet(true)) { return; } - // the session is being stopped explicitly. - if(removeShutdownHook) { + + try { Runtime.getRuntime().removeShutdownHook(clientShutdownHook); + } catch (IllegalStateException e) { + // we are already in the shutdown hook } + if(actorSystem != null){ LOG.info("Sending shutdown request to the Application Master"); if(applicationClient != ActorRef.noSender()) { try { + FinalApplicationStatus finalStatus; + if (failApplication) { + finalStatus = FinalApplicationStatus.FAILED; + } else { + finalStatus = FinalApplicationStatus.SUCCEEDED; + } Future<Object> response = Patterns.ask(applicationClient, - new Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED, + new Messages.StopYarnSession(finalStatus, "Flink YARN Client requested shutdown"), new Timeout(akkaDuration)); @@ -457,7 +467,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { @Override public void run() { LOG.info("Shutting down FlinkYarnCluster from the client shutdown hook"); - shutdownInternal(false); + shutdown(true); } } http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index d6760ec..ec980d0 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -137,7 +137,7 @@ class ApplicationClient(flinkConfig: Configuration) case LocalGetYarnClusterStatus => sender() ! latestClusterStatus - // Forward message to Application Master + // Forward message to Application Master case msg: StopAMAfterJob => yarnJobManager foreach { _ forward msg http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala index 999610f..411808b 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala @@ -26,6 +26,7 @@ import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.configuration.ConfigConstants import org.apache.flink.runtime.ActorLogMessages +import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, JobNotFound, RequestJobStatus} import org.apache.flink.runtime.messages.Messages.Acknowledge @@ -171,8 +172,13 @@ trait ApplicationMasterActor extends ActorLogMessages { if(jobStatus.status.isTerminalState) { log.info(s"Job with ID ${jobStatus.jobID} is in terminal state ${jobStatus.status}. " + s"Shutting down YARN session") - self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED, - s"The monitored job with ID ${jobStatus.jobID} has finished.") + if (jobStatus.status == JobStatus.FINISHED) { + self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED, + s"The monitored job with ID ${jobStatus.jobID} has finished.") + } else { + self ! StopYarnSession(FinalApplicationStatus.FAILED, + s"The monitored job with ID ${jobStatus.jobID} has failed to complete.") + } } else { log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state ${jobStatus.status}") }