This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d753db27757376def72eda1d3c312a3d128a4bae Author: Sebastian Klemke <pac...@nerdheim.de> AuthorDate: Sat Aug 11 22:43:48 2018 +0200 [FLINK-9891] Added hook to shutdown cluster if a session was created in per-job mode. This closes #6540. --- .../java/org/apache/flink/client/cli/CliFrontend.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index e2a260c..98df8df 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -58,6 +58,7 @@ import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Options; @@ -250,13 +251,22 @@ public class CliFrontend { LOG.info("Could not properly shut down the client.", e); } } else { + final Thread shutdownHook; if (clusterId != null) { client = clusterDescriptor.retrieve(clusterId); + shutdownHook = null; } else { // also in job mode we have to deploy a session cluster because the job // might consist of multiple parts (e.g. when using collect) final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine); client = clusterDescriptor.deploySessionCluster(clusterSpecification); + // if not running in detached mode, add a shutdown hook to shut down cluster if client exits + // there's a race-condition here if cli is killed before shutdown hook is installed + if (!runOptions.getDetachedMode()) { + shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG); + } else { + shutdownHook = null; + } } try { @@ -279,12 +289,12 @@ public class CliFrontend { executeProgram(program, client, userParallelism); } finally { - if (clusterId == null && !client.isDetached()) { + if (shutdownHook != null) { // terminate the cluster only if we have started it before and if it's not detached try { - client.shutDownCluster(); - } catch (final Exception e) { - LOG.info("Could not properly terminate the Flink cluster.", e); + shutdownHook.run(); + } finally { + ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG); } }