Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.8 ea861f456 -> 0318a94b8


ZEPPELIN-3749. New Spark interpreter has to be restarted two times inorder to 
work fine for different users

This PR fix the issue of scoped mode for new spark interpreter. It will only 
close SparkContext when there's no other live sessions.

[Bug Fix]

* [ ] - Task

* https://issues.apache.org/jira/browse/ZEPPELIN-3749

* Unit test is added

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #3166 from zjffdu/ZEPPELIN-3749 and squashes the following commits:

67ba55627 [Jeff Zhang] ZEPPELIN-3749. New Spark interpreter has to be restarted 
two times in order to work fine for different users

(cherry picked from commit 71bc75966a7e5a18ce35b1f767c903a4b5c02eb9)
Signed-off-by: Jeff Zhang <zjf...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/0318a94b
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/0318a94b
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/0318a94b

Branch: refs/heads/branch-0.8
Commit: 0318a94b8b85a2efb863e845f10dc9a1117aa7eb
Parents: ea861f4
Author: Jeff Zhang <zjf...@apache.org>
Authored: Thu Aug 30 10:14:42 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Tue Sep 4 17:13:23 2018 +0800

----------------------------------------------------------------------
 spark/interpreter/figure/unnamed-chunk-1-1.png  | Bin 407541 -> 0 bytes
 .../zeppelin/spark/NewSparkInterpreterTest.java |  47 ++++++++++++++++++-
 .../spark/SparkScala210Interpreter.scala        |   3 --
 .../spark/BaseSparkScalaInterpreter.scala       |  30 ++++++++----
 4 files changed, 67 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0318a94b/spark/interpreter/figure/unnamed-chunk-1-1.png
----------------------------------------------------------------------
diff --git a/spark/interpreter/figure/unnamed-chunk-1-1.png 
b/spark/interpreter/figure/unnamed-chunk-1-1.png
deleted file mode 100644
index 6f03c95..0000000
Binary files a/spark/interpreter/figure/unnamed-chunk-1-1.png and /dev/null 
differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0318a94b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index 18e7a53..4fe85db 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -35,6 +35,7 @@ import 
org.apache.zeppelin.interpreter.remote.RemoteEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -69,7 +70,12 @@ public class NewSparkInterpreterTest {
   // catch the interpreter output in onUpdate
   private InterpreterResultMessageOutput messageOutput;
 
-  private RemoteEventClient mockRemoteEventClient = 
mock(RemoteEventClient.class);
+  private RemoteEventClient mockRemoteEventClient;
+
+  @Before
+  public void setUp() {
+    mockRemoteEventClient = mock(RemoteEventClient.class);
+  }
 
   @Test
   public void testSparkInterpreter() throws IOException, InterruptedException, 
InterpreterException {
@@ -482,6 +488,45 @@ public class NewSparkInterpreterTest {
         any(String.class), any(Map.class));
   }
 
+  @Test
+  public void testScopedMode() throws InterpreterException {
+    Properties properties = new Properties();
+    properties.setProperty("spark.master", "local");
+    properties.setProperty("spark.app.name", "test");
+    properties.setProperty("zeppelin.spark.maxResult", "100");
+    properties.setProperty("zeppelin.spark.test", "true");
+    properties.setProperty("zeppelin.spark.useNew", "true");
+
+    SparkInterpreter interpreter1 = new SparkInterpreter(properties);
+    SparkInterpreter interpreter2 = new SparkInterpreter(properties);
+
+    InterpreterGroup interpreterGroup = new InterpreterGroup();
+    interpreter1.setInterpreterGroup(interpreterGroup);
+    interpreter2.setInterpreterGroup(interpreterGroup);
+
+    interpreterGroup.addInterpreterToSession(interpreter1, "session_1");
+    interpreterGroup.addInterpreterToSession(interpreter2, "session_2");
+
+    InterpreterContext.set(getInterpreterContext());
+    interpreter1.open();
+    interpreter2.open();
+
+    InterpreterContext context = getInterpreterContext();
+
+    InterpreterResult result1 = interpreter1.interpret("sc.range(1, 10).sum", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result1.code());
+
+    InterpreterResult result2 = interpreter2.interpret("sc.range(1, 10).sum", 
context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
+
+    // interpreter2 continue to work after interpreter1 is closed
+    interpreter1.close();
+
+    result2 = interpreter2.interpret("sc.range(1, 10).sum", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
+    interpreter2.close();
+  }
+
   @After
   public void tearDown() throws InterpreterException {
     if (this.interpreter != null) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0318a94b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
----------------------------------------------------------------------
diff --git 
a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
 
b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
index c66aa71..995ee15 100644
--- 
a/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
+++ 
b/spark/scala-2.10/src/main/scala/org/apache/zeppelin/spark/SparkScala210Interpreter.scala
@@ -88,9 +88,6 @@ class SparkScala210Interpreter(override val conf: SparkConf,
 
   override def close(): Unit = {
     super.close()
-    if (sparkILoop != null) {
-      callMethod(sparkILoop, 
"org$apache$spark$repl$SparkILoop$$closeInterpreter")
-    }
   }
 
   def scalaInterpret(code: String): scala.tools.nsc.interpreter.IR.Result =

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0318a94b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
----------------------------------------------------------------------
diff --git 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 61f9217..734a303 100644
--- 
a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ 
b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -19,6 +19,7 @@ package org.apache.zeppelin.spark
 
 
 import java.io.File
+import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.{JobProgressUtil, SparkConf, SparkContext}
@@ -59,6 +60,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
 
   protected val interpreterOutput: InterpreterOutputStream
 
+
   protected def open(): Unit = {
     /* Required for scoped mode.
      * In scoped mode multiple scala compiler (repl) generates class in the 
same directory.
@@ -77,6 +79,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
      *
      */
     System.setProperty("scala.repl.name.line", ("$line" + 
this.hashCode).replace('-', '0'))
+
+    BaseSparkScalaInterpreter.sessionNum.incrementAndGet()
   }
 
   def interpret(code: String, context: InterpreterContext): InterpreterResult 
= {
@@ -152,16 +156,20 @@ abstract class BaseSparkScalaInterpreter(val conf: 
SparkConf,
     bind(name, tpe, value, modifier.asScala.toList)
 
   protected def close(): Unit = {
-    if (sc != null) {
-      sc.stop()
-    }
-    sc = null
-    sqlContext = null
-    if (sparkSession != null) {
-      sparkSession.getClass.getMethod("stop").invoke(sparkSession)
-      sparkSession = null
+    if (BaseSparkScalaInterpreter.sessionNum.decrementAndGet() == 0) {
+      if (sc != null) {
+        sc.stop()
+      }
+      if (sparkHttpServer != null) {
+        sparkHttpServer.getClass.getMethod("stop").invoke(sparkHttpServer)
+      }
+      sc = null
+      sqlContext = null
+      if (sparkSession != null) {
+        sparkSession.getClass.getMethod("stop").invoke(sparkSession)
+        sparkSession = null
+      }
     }
-
   }
 
   protected def createSparkContext(): Unit = {
@@ -373,3 +381,7 @@ abstract class BaseSparkScalaInterpreter(val conf: 
SparkConf,
     depFiles.asScala.filter(!_.endsWith(".jar"))
   }
 }
+
+object BaseSparkScalaInterpreter {
+  val sessionNum = new AtomicInteger(0)
+}

Reply via email to