Repository: zeppelin
Updated Branches:
  refs/heads/master 733bed591 -> a2473daf8


ZEPPELIN-3821. Yarn app is not killed after flink interpreter is restarted

### What is this PR for?
Trvial PR to shutdown cluster when flink shell is closed.

### What type of PR is it?
[Hot Fix ]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://jira.apache.org/jira/browse/ZEPPELIN-3821

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #3208 from zjffdu/ZEPPELIN-3821 and squashes the following commits:

8482bf31d [Jeff Zhang] ZEPPELIN-3821. Yarn app is not killed after flink 
interpreter is restarted


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/a2473daf
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/a2473daf
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/a2473daf

Branch: refs/heads/master
Commit: a2473daf8e7400fb1cb635ba97f66835cad58ffc
Parents: 733bed5
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Oct 23 17:37:14 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Fri Oct 26 16:25:25 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/flink/FlinkScalaInterpreter.scala      | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/a2473daf/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git 
a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala 
b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 0653c2a..14f8959 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -213,10 +213,18 @@ class FlinkScalaInterpreter(val properties: Properties) {
     }
     if (cluster != null) {
       cluster match {
-        case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
-        case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
-        case Some(Right(yarnCluster)) => yarnCluster.shutdown()
-        case _ =>
+        case Some(Left(Left(legacyMiniCluster))) =>
+          LOGGER.info("Shutdown LegacyMiniCluster")
+          legacyMiniCluster.close()
+        case Some(Left(Right(newMiniCluster))) =>
+          LOGGER.info("Shutdown NewMiniCluster")
+          newMiniCluster.close()
+        case Some(Right(yarnCluster)) =>
+          LOGGER.info("Shutdown YarnCluster")
+          yarnCluster.shutDownCluster()
+          yarnCluster.shutdown()
+        case e =>
+          LOGGER.error("Unrecognized cluster type: " + 
e.getClass.getSimpleName)
       }
     }
   }

Reply via email to