[FLINK-8224] [flip6] Shutdown application when job terminated in job mode

This closes #5139.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4ecc7ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4ecc7ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4ecc7ff

Branch: refs/heads/master
Commit: a4ecc7ffe4ba16a68de06c1053c7916e6082b413
Parents: c1734f4
Author: shuai.xus <[email protected]>
Authored: Fri Dec 8 18:02:42 2017 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jan 25 15:33:29 2018 +0100

----------------------------------------------------------------------
 .../clusterframework/MesosResourceManager.java  |  4 +++-
 .../entrypoint/JobClusterEntrypoint.java        | 23 +++++++++++++++-----
 .../resourcemanager/ResourceManager.java        | 14 ++++++++----
 .../StandaloneResourceManager.java              |  6 +++--
 .../resourcemanager/TestingResourceManager.java |  4 +++-
 .../apache/flink/yarn/YarnResourceManager.java  |  6 +++--
 6 files changed, 42 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index cabb7d7..8b67257 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -75,6 +75,8 @@ import org.apache.mesos.SchedulerDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -371,7 +373,7 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
        @Override
        protected void shutDownApplication(
                        ApplicationStatus finalStatus,
-                       String optionalDiagnostics) throws 
ResourceManagerException {
+                       @Nullable String optionalDiagnostics) throws 
ResourceManagerException {
                LOG.info("Shutting down and unregistering as a Mesos 
framework.");
 
                Exception exception = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index cb1b086..ede8d13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -53,6 +54,7 @@ import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
 import akka.actor.ActorSystem;
 
@@ -258,8 +260,15 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                }
        }
 
-       private void shutDownAndTerminate(boolean cleanupHaData) {
+       private void shutDownAndTerminate(
+                       boolean cleanupHaData,
+                       ApplicationStatus status,
+                       @Nullable String optionalDiagnostics) {
                try {
+                       if (resourceManager != null) {
+                               resourceManager.shutDownCluster(status, 
optionalDiagnostics);
+                       }
+
                        shutDown(cleanupHaData);
                } catch (Throwable t) {
                        LOG.error("Could not properly shut down cluster 
entrypoint.", t);
@@ -292,23 +301,27 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                public void jobFinished(JobResult result) {
                        LOG.info("Job({}) finished.", jobId);
 
-                       shutDownAndTerminate(true);
+                       shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, 
null);
                }
 
                @Override
                public void jobFailed(JobResult result) {
                        
checkArgument(result.getSerializedThrowable().isPresent());
 
-                       LOG.info("Job({}) failed.", jobId, 
result.getSerializedThrowable().get().getMessage());
+                       final SerializedThrowable serializedThrowable = 
result.getSerializedThrowable().get();
+
+                       final String errorMessage = 
serializedThrowable.getMessage();
+
+                       LOG.info("Job({}) failed: {}.", jobId, errorMessage);
 
-                       shutDownAndTerminate(false);
+                       shutDownAndTerminate(true, ApplicationStatus.FAILED, 
errorMessage);
                }
 
                @Override
                public void jobFinishedByOther() {
                        LOG.info("Job({}) was finished by another JobManager.", 
jobId);
 
-                       shutDownAndTerminate(false);
+                       shutDownAndTerminate(false, ApplicationStatus.UNKNOWN, 
"Job was finished by another master");
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a0ff5f4..e5fef14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -63,6 +63,8 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -479,10 +481,12 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
         * Cleanup application and shut down cluster.
         *
         * @param finalStatus of the Flink application
-        * @param optionalDiagnostics for the Flink application
+        * @param optionalDiagnostics diagnostics message for the Flink 
application or {@code null}
         */
        @Override
-       public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
+       public void shutDownCluster(
+                       final ApplicationStatus finalStatus,
+                       @Nullable final String optionalDiagnostics) {
                log.info("Shut down cluster because application is in {}, 
diagnostics {}.", finalStatus, optionalDiagnostics);
 
                try {
@@ -930,10 +934,12 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
         * yet are returned.
         *
         * @param finalStatus The application status to report.
-        * @param optionalDiagnostics An optional diagnostics message.
+        * @param optionalDiagnostics A diagnostics message or {@code null}.
         * @throws ResourceManagerException if the application could not be 
shut down.
         */
-       protected abstract void shutDownApplication(ApplicationStatus 
finalStatus, String optionalDiagnostics) throws ResourceManagerException;
+       protected abstract void shutDownApplication(
+               ApplicationStatus finalStatus,
+               @Nullable String optionalDiagnostics) throws 
ResourceManagerException;
 
        /**
         * Allocates a resource using the resource profile.

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 624f31d..886a046 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -29,11 +29,13 @@ import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import javax.annotation.Nullable;
+
 /**
  * A standalone implementation of the resource manager. Used when the system 
is started in
  * standalone mode (via scripts), rather than via a resource framework like 
YARN or Mesos.
  *
- * This ResourceManager doesn't acquire new resources.
+ * <p>This ResourceManager doesn't acquire new resources.
  */
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
@@ -67,7 +69,7 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceID> {
        }
 
        @Override
-       protected void shutDownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+       protected void shutDownApplication(ApplicationStatus finalStatus, 
@Nullable String optionalDiagnostics) {
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
index 0d30822..2af024e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import javax.annotation.Nullable;
+
 /**
  * Simple {@link ResourceManager} implementation for testing purposes.
  */
@@ -54,7 +56,7 @@ public class TestingResourceManager extends 
ResourceManager<ResourceID> {
        }
 
        @Override
-       protected void shutDownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) throws ResourceManagerException {
+       protected void shutDownApplication(ApplicationStatus finalStatus, 
@Nullable String optionalDiagnostics) throws ResourceManagerException {
                // noop
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a4ecc7ff/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 0fa0dda..910172d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -252,11 +252,13 @@ public class YarnResourceManager extends 
ResourceManager<YarnWorkerNode> impleme
        }
 
        @Override
-       protected void shutDownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+       protected void shutDownApplication(
+               ApplicationStatus finalStatus,
+               @Nullable String optionalDiagnostics) {
 
                // first, de-register from YARN
                FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
-               log.info("Unregister application from the YARN Resource 
Manager");
+               log.info("Unregister application from the YARN Resource Manager 
with final status {}.", yarnStatus);
 
                try {
                        
resourceManagerClient.unregisterApplicationMaster(yarnStatus, 
optionalDiagnostics, "");

Reply via email to