Repository: zeppelin Updated Branches: refs/heads/master 733bed591 -> a2473daf8
ZEPPELIN-3821. Yarn app is not killed after flink interpreter is restarted ### What is this PR for? Trvial PR to shutdown cluster when flink shell is closed. ### What type of PR is it? [Hot Fix ] ### Todos * [ ] - Task ### What is the Jira issue? * https://jira.apache.org/jira/browse/ZEPPELIN-3821 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3208 from zjffdu/ZEPPELIN-3821 and squashes the following commits: 8482bf31d [Jeff Zhang] ZEPPELIN-3821. Yarn app is not killed after flink interpreter is restarted Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/a2473daf Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/a2473daf Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/a2473daf Branch: refs/heads/master Commit: a2473daf8e7400fb1cb635ba97f66835cad58ffc Parents: 733bed5 Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Oct 23 17:37:14 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Fri Oct 26 16:25:25 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/flink/FlinkScalaInterpreter.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a2473daf/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala ---------------------------------------------------------------------- diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 0653c2a..14f8959 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -213,10 +213,18 @@ class FlinkScalaInterpreter(val properties: Properties) { } if (cluster != null) { cluster match { - case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close() - case Some(Left(Right(newMiniCluster))) => newMiniCluster.close() - case Some(Right(yarnCluster)) => yarnCluster.shutdown() - case _ => + case Some(Left(Left(legacyMiniCluster))) => + LOGGER.info("Shutdown LegacyMiniCluster") + legacyMiniCluster.close() + case Some(Left(Right(newMiniCluster))) => + LOGGER.info("Shutdown NewMiniCluster") + newMiniCluster.close() + case Some(Right(yarnCluster)) => + LOGGER.info("Shutdown YarnCluster") + yarnCluster.shutDownCluster() + yarnCluster.shutdown() + case e => + LOGGER.error("Unrecognized cluster type: " + e.getClass.getSimpleName) } } }