Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master 72f6c91c7 -> 59aa3f025


[ZEPPELIN-717] protect the whole spark repl init process

### What is this PR for?
ZeppelinContext may not be initialized properly in concurrent mode:

When I create & run multiple notebooks using rest API concurrently, I can see 
such errors if the jobs trying to use ZeppelinContext:
```
<console>:23: error: not found: value z
```

I think this issue can be reproduced by:
1. create 4 - 5 new notebooks with content: `println(z)`
2. use rest API to run the newly created notebooks concurrently

It seems the issue is gone after I expand the lock to protect the whole process 
of spark REPL initializing / binding.

### What type of PR is it?
Bug Fix

### Todos

### What is the Jira issue?
[ZEPPELIN-717](https://issues.apache.org/jira/browse/ZEPPELIN-717)

### How should this be tested?
1. create 4 - 5 new notebooks with content: `println(z)`
2. use rest API to run the newly created notebooks concurrently

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update?
NO

* Is there breaking changes for older versions?
NO

* Does this needs documentation?
NO

Author: Zhong Wang <[email protected]>

Closes #760 from zhongneu/protect-spark-repl-init and squashes the following 
commits:

3022612 [Zhong Wang] protect the whole spark repl init process


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/59aa3f02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/59aa3f02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/59aa3f02

Branch: refs/heads/master
Commit: 59aa3f02568aedd937bc3930cc0ea59059a9cd07
Parents: 72f6c91
Author: Zhong Wang <[email protected]>
Authored: Thu Mar 3 23:00:43 2016 -0800
Committer: Lee moon soo <[email protected]>
Committed: Tue Mar 8 13:59:31 2016 -0800

----------------------------------------------------------------------
 .../apache/zeppelin/spark/SparkInterpreter.java | 72 ++++++++++----------
 1 file changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/59aa3f02/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 5a1a0fd..57d2724 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -481,18 +481,18 @@ public class SparkInterpreter extends Interpreter {
 
     System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$");
 
-    /* create scala repl */
-    this.interpreter = new SparkILoop(null, new PrintWriter(out));
+    synchronized (sharedInterpreterLock) {
+      /* create scala repl */
+      this.interpreter = new SparkILoop(null, new PrintWriter(out));
 
-    interpreter.settings_$eq(settings);
+      interpreter.settings_$eq(settings);
 
-    interpreter.createInterpreter();
+      interpreter.createInterpreter();
 
-    intp = interpreter.intp();
-    intp.setContextClassLoader();
-    intp.initializeSynchronous();
+      intp = interpreter.intp();
+      intp.setContextClassLoader();
+      intp.initializeSynchronous();
 
-    synchronized (sharedInterpreterLock) {
       if (classOutputDir == null) {
         classOutputDir = settings.outputDirs().getSingleOutput().get();
       } else {
@@ -523,35 +523,35 @@ public class SparkInterpreter extends Interpreter {
       sparkVersion = SparkVersion.fromVersionString(sc.version());
 
       sqlc = getSQLContext();
-    }
-
-    dep = getDependencyResolver();
-
-    z = new ZeppelinContext(sc, sqlc, null, dep,
-        Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
 
-    intp.interpret("@transient var _binder = new java.util.HashMap[String, 
Object]()");
-    binder = (Map<String, Object>) getValue("_binder");
-    binder.put("sc", sc);
-    binder.put("sqlc", sqlc);
-    binder.put("z", z);
-
-    intp.interpret("@transient val z = "
-                 + 
"_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
-    intp.interpret("@transient val sc = "
-                 + 
"_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]");
-    intp.interpret("@transient val sqlc = "
-                 + 
"_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-    intp.interpret("@transient val sqlContext = "
-                 + 
"_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
-    intp.interpret("import org.apache.spark.SparkContext._");
-
-    if (sparkVersion.oldSqlContextImplicits()) {
-      intp.interpret("import sqlContext._");
-    } else {
-      intp.interpret("import sqlContext.implicits._");
-      intp.interpret("import sqlContext.sql");
-      intp.interpret("import org.apache.spark.sql.functions._");
+      dep = getDependencyResolver();
+
+      z = new ZeppelinContext(sc, sqlc, null, dep,
+              Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
+
+      intp.interpret("@transient var _binder = new java.util.HashMap[String, 
Object]()");
+      binder = (Map<String, Object>) getValue("_binder");
+      binder.put("sc", sc);
+      binder.put("sqlc", sqlc);
+      binder.put("z", z);
+
+      intp.interpret("@transient val z = "
+              + 
"_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
+      intp.interpret("@transient val sc = "
+              + 
"_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]");
+      intp.interpret("@transient val sqlc = "
+              + 
"_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
+      intp.interpret("@transient val sqlContext = "
+              + 
"_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
+      intp.interpret("import org.apache.spark.SparkContext._");
+
+      if (sparkVersion.oldSqlContextImplicits()) {
+        intp.interpret("import sqlContext._");
+      } else {
+        intp.interpret("import sqlContext.implicits._");
+        intp.interpret("import sqlContext.sql");
+        intp.interpret("import org.apache.spark.sql.functions._");
+      }
     }
 
     /* Temporary disabling DisplayUtils. see 
https://issues.apache.org/jira/browse/ZEPPELIN-127

Reply via email to