[FLINK-2226][YARN] fail application on failed single-job cluster job

Failing jobs executed in the YARN cluster mode leave the application
container in the "SUCCEEDED" final state. While for long-running Flink
YARN clusters where multiple jobs are run, this is fine, for single jobs
it is appropriate to mark the application as failed.

This closes #838.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/44b969e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/44b969e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/44b969e0

Branch: refs/heads/release-0.9
Commit: 44b969e0a58347f0262bfc5bc9626362078b373b
Parents: 57810b5
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Jun 15 16:37:35 2015 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Jun 17 00:51:16 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  8 ++---
 .../flink/client/FlinkYarnSessionCli.java       |  6 ++--
 .../org/apache/flink/client/program/Client.java |  1 +
 .../flink/runtime/jobgraph/JobStatus.java       |  2 +-
 .../runtime/yarn/AbstractFlinkYarnCluster.java  |  2 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  2 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  4 +--
 .../org/apache/flink/yarn/FlinkYarnCluster.java | 32 +++++++++++++-------
 .../apache/flink/yarn/ApplicationClient.scala   |  2 +-
 .../flink/yarn/ApplicationMasterActor.scala     | 10 ++++--
 10 files changed, 43 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 3e61a3b..e1bacde 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -265,6 +265,7 @@ public class CliFrontend {
                        return handleError(t);
                }
 
+               int exitCode = 1;
                try {
                        int userParallelism = options.getParallelism();
                        LOG.debug("User parallelism is set to {}", 
userParallelism);
@@ -276,15 +277,14 @@ public class CliFrontend {
                                                "To use another parallelism, 
set it at the ./bin/flink client.");
                                userParallelism = client.getMaxSlots();
                        }
-                       int exitCode = 0;
 
                        // check if detached per job yarn cluster is used to 
start flink
                        if(yarnCluster != null && yarnCluster.isDetached()) {
                                logAndSysout("The Flink YARN client has been 
started in detached mode. In order to stop " +
                                                "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
-                                               "yarn application -kill 
"+yarnCluster.getApplicationId()+"\n" +
+                                               "yarn application -kill " + 
yarnCluster.getApplicationId() + "\n" +
                                                "Please also note that the 
temporary files of the YARN session in the home directoy will not be removed.");
-                               executeProgram(program, client, 
userParallelism, false);
+                               exitCode = executeProgram(program, client, 
userParallelism, false);
                        } else {
                                // regular (blocking) execution.
                                exitCode = executeProgram(program, client, 
userParallelism, true);
@@ -314,7 +314,7 @@ public class CliFrontend {
                finally {
                        if (yarnCluster != null && !yarnCluster.isDetached()) {
                                logAndSysout("Shutting down YARN cluster");
-                               yarnCluster.shutdown();
+                               yarnCluster.shutdown(exitCode != 0);
                        }
                        if (program != null) {
                                program.deleteExtractedLibraries();

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 0fa7173..c11edc7 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -302,7 +302,7 @@ public class FlinkYarnSessionCli {
 
                                if (yarnCluster.hasFailed()) {
                                        System.err.println("The YARN cluster 
has failed");
-                                       yarnCluster.shutdown();
+                                       yarnCluster.shutdown(true);
                                }
 
                                // wait until CLIENT_POLLING_INTERVALL is over 
or the user entered something.
@@ -439,7 +439,7 @@ public class FlinkYarnSessionCli {
 
                                if (!yarnCluster.hasBeenStopped()) {
                                        LOG.info("Command Line Interface 
requested session shutdown");
-                                       yarnCluster.shutdown();
+                                       yarnCluster.shutdown(false);
                                }
 
                                try {
@@ -458,7 +458,7 @@ public class FlinkYarnSessionCli {
        public void stop() {
                if (yarnCluster != null) {
                        LOG.info("Command line interface is shutting down the 
yarnCluster");
-                       yarnCluster.shutdown();
+                       yarnCluster.shutdown(false);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e219a38..c544e8d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -318,6 +318,7 @@ public class Client {
                                ContextEnvironment.enableLocalExecution(true);
                        }
 
+                       // Job id has been set in the Client passed to the 
ContextEnvironment
                        return new JobSubmissionResult(lastJobId);
                }
                else {

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index 667a68e..eb7d017 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -51,7 +51,7 @@ public enum JobStatus {
        
        private final boolean terminalState;
        
-       private JobStatus(boolean terminalState) {
+       JobStatus(boolean terminalState) {
                this.terminalState = terminalState;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
index 398709e..c2e897f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnCluster.java
@@ -30,7 +30,7 @@ public abstract class AbstractFlinkYarnCluster {
 
        public abstract String getWebInterfaceURL();
 
-       public abstract void shutdown();
+       public abstract void shutdown(boolean failApplication);
 
        public abstract boolean hasBeenStopped();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index ccde5d8..dd32b0d 100644
--- 
a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ 
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -647,7 +647,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
                LOG.info("Shutting down cluster. All tests passed");
                // shutdown cluster
-               yarnCluster.shutdown();
+               yarnCluster.shutdown(false);
                LOG.info("Finished testJavaAPI()");
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index 118f4ad..f82f013 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -435,11 +435,11 @@ public class FlinkYarnClient extends 
AbstractFlinkYarnClient {
                                        + "There are currently only " + 
freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);
 
                }
-               if( taskManagerMemoryMb > freeClusterMem.containerLimit) {
+               if(taskManagerMemoryMb > freeClusterMem.containerLimit) {
                        LOG.warn("The requested amount of memory for the 
TaskManagers ("+taskManagerMemoryMb+"MB) is more than "
                                        + "the largest possible YARN container: 
"+freeClusterMem.containerLimit + NOTE_RSC);
                }
-               if( jobManagerMemoryMb > freeClusterMem.containerLimit) {
+               if(jobManagerMemoryMb > freeClusterMem.containerLimit) {
                        LOG.warn("The requested amount of memory for the 
JobManager (" + jobManagerMemoryMb + "MB) is more than "
                                        + "the largest possible YARN container: 
" + freeClusterMem.containerLimit + NOTE_RSC);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 6dd84d6..e408edb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -297,12 +297,12 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                }
                ApplicationReport lastReport = pollingRunner.getLastReport();
                if(lastReport == null) {
-                       LOG.warn("FlinkYarnCluster.hasFailed() has been called 
on a cluster. that didn't receive a status so far." +
+                       LOG.warn("FlinkYarnCluster.hasFailed() has been called 
on a cluster that didn't receive a status so far." +
                                        "The system might be in an erroneous 
state");
                        return false;
                } else {
                        YarnApplicationState appState = 
lastReport.getYarnApplicationState();
-                       boolean status= (appState == 
YarnApplicationState.FAILED ||
+                       boolean status = (appState == 
YarnApplicationState.FAILED ||
                                        appState == 
YarnApplicationState.KILLED);
                        if(status) {
                                LOG.warn("YARN reported application state {}", 
appState);
@@ -381,12 +381,13 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
        // -------------------------- Shutdown handling ------------------------
 
        private AtomicBoolean hasBeenShutDown = new AtomicBoolean(false);
-       @Override
-       public void shutdown() {
-               shutdownInternal(true);
-       }
 
-       private void shutdownInternal(boolean removeShutdownHook) {
+       /**
+        * Shutdown the YARN cluster.
+        * @param failApplication whether we should fail the YARN application 
(in case of errors in Flink)
+        */
+       @Override
+       public void shutdown(boolean failApplication) {
                if(!isConnected) {
                        throw new IllegalStateException("The cluster has been 
connected to the ApplicationMaster.");
                }
@@ -394,16 +395,25 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                if(hasBeenShutDown.getAndSet(true)) {
                        return;
                }
-               // the session is being stopped explicitly.
-               if(removeShutdownHook) {
+
+               try {
                        
Runtime.getRuntime().removeShutdownHook(clientShutdownHook);
+               } catch (IllegalStateException e) {
+                       // we are already in the shutdown hook
                }
+
                if(actorSystem != null){
                        LOG.info("Sending shutdown request to the Application 
Master");
                        if(applicationClient != ActorRef.noSender()) {
                                try {
+                                       FinalApplicationStatus finalStatus;
+                                       if (failApplication) {
+                                               finalStatus = 
FinalApplicationStatus.FAILED;
+                                       } else {
+                                               finalStatus = 
FinalApplicationStatus.SUCCEEDED;
+                                       }
                                        Future<Object> response = 
Patterns.ask(applicationClient,
-                                                       new 
Messages.StopYarnSession(FinalApplicationStatus.SUCCEEDED,
+                                                       new 
Messages.StopYarnSession(finalStatus,
                                                                        "Flink 
YARN Client requested shutdown"),
                                                        new 
Timeout(akkaDuration));
 
@@ -457,7 +467,7 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                @Override
                public void run() {
                        LOG.info("Shutting down FlinkYarnCluster from the 
client shutdown hook");
-                       shutdownInternal(false);
+                       shutdown(true);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index d6760ec..ec980d0 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -137,7 +137,7 @@ class ApplicationClient(flinkConfig: Configuration)
     case LocalGetYarnClusterStatus =>
       sender() ! latestClusterStatus
 
-      // Forward message to Application Master
+    // Forward message to Application Master
     case msg: StopAMAfterJob =>
       yarnJobManager foreach {
         _ forward msg

http://git-wip-us.apache.org/repos/asf/flink/blob/44b969e0/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 999610f..411808b 100644
--- 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -26,6 +26,7 @@ import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.runtime.ActorLogMessages
+import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, 
JobNotFound, RequestJobStatus}
 import org.apache.flink.runtime.messages.Messages.Acknowledge
@@ -171,8 +172,13 @@ trait ApplicationMasterActor extends ActorLogMessages {
           if(jobStatus.status.isTerminalState) {
             log.info(s"Job with ID ${jobStatus.jobID} is in terminal state 
${jobStatus.status}. " +
               s"Shutting down YARN session")
-            self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
-              s"The monitored job with ID ${jobStatus.jobID} has finished.")
+            if (jobStatus.status == JobStatus.FINISHED) {
+              self ! StopYarnSession(FinalApplicationStatus.SUCCEEDED,
+                s"The monitored job with ID ${jobStatus.jobID} has finished.")
+            } else {
+              self ! StopYarnSession(FinalApplicationStatus.FAILED,
+                s"The monitored job with ID ${jobStatus.jobID} has failed to 
complete.")
+            }
           } else {
             log.debug(s"Monitored job with ID ${jobStatus.jobID} is in state 
${jobStatus.status}")
           }

Reply via email to