This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70b48d63e5aed6411ccbdb443af0222515331867
Author: Sebastian Klemke <pac...@nerdheim.de>
AuthorDate: Sat Aug 11 22:43:48 2018 +0200

    [FLINK-9891] Added hook to shutdown cluster if a session was created in 
per-job mode.
    
    This closes #6540.
---
 .../java/org/apache/flink/client/cli/CliFrontend.java  | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 2e78e4a..780f814 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -249,13 +250,22 @@ public class CliFrontend {
                                        LOG.info("Could not properly shut down 
the client.", e);
                                }
                        } else {
+                               final Thread shutdownHook;
                                if (clusterId != null) {
                                        client = 
clusterDescriptor.retrieve(clusterId);
+                                       shutdownHook = null;
                                } else {
                                        // also in job mode we have to deploy a 
session cluster because the job
                                        // might consist of multiple parts 
(e.g. when using collect)
                                        final ClusterSpecification 
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
                                        client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
+                                       // if not running in detached mode, add 
a shutdown hook to shut down cluster if client exits
+                                       // there's a race-condition here if cli 
is killed before shutdown hook is installed
+                                       if (!runOptions.getDetachedMode()) {
+                                               shutdownHook = 
ShutdownHookUtil.addShutdownHook(client::shutDownCluster, 
client.getClass().getSimpleName(), LOG);
+                                       } else {
+                                               shutdownHook = null;
+                                       }
                                }
 
                                try {
@@ -278,12 +288,12 @@ public class CliFrontend {
 
                                        executeProgram(program, client, 
userParallelism);
                                } finally {
-                                       if (clusterId == null && 
!client.isDetached()) {
+                                       if (shutdownHook != null) {
                                                // terminate the cluster only 
if we have started it before and if it's not detached
                                                try {
-                                                       
client.shutDownCluster();
-                                               } catch (final Exception e) {
-                                                       LOG.info("Could not 
properly terminate the Flink cluster.", e);
+                                                       shutdownHook.run();
+                                               } finally {
+                                                       
ShutdownHookUtil.removeShutdownHook(shutdownHook, 
client.getClass().getSimpleName(), LOG);
                                                }
                                        }
 

Reply via email to