Repository: incubator-zeppelin Updated Branches: refs/heads/master 72f6c91c7 -> 59aa3f025
[ZEPPELIN-717] protect the whole spark repl init process ### What is this PR for? ZeppelinContext may not be initialized properly in concurrent mode: When I create & run multiple notebooks using rest API concurrently, I can see such errors if the jobs trying to use ZeppelinContext: ``` <console>:23: error: not found: value z ``` I think this issue can be reproduced by: 1. create 4 - 5 new notebooks with content: `println(z)` 2. use rest API to run the newly created notebooks concurrently It seems the issue is gone after I expand the lock to protect the whole process of spark REPL initializing / binding. ### What type of PR is it? Bug Fix ### Todos ### What is the Jira issue? [ZEPPELIN-717](https://issues.apache.org/jira/browse/ZEPPELIN-717) ### How should this be tested? 1. create 4 - 5 new notebooks with content: `println(z)` 2. use rest API to run the newly created notebooks concurrently ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? NO * Is there breaking changes for older versions? NO * Does this needs documentation? NO Author: Zhong Wang <[email protected]> Closes #760 from zhongneu/protect-spark-repl-init and squashes the following commits: 3022612 [Zhong Wang] protect the whole spark repl init process Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/59aa3f02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/59aa3f02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/59aa3f02 Branch: refs/heads/master Commit: 59aa3f02568aedd937bc3930cc0ea59059a9cd07 Parents: 72f6c91 Author: Zhong Wang <[email protected]> Authored: Thu Mar 3 23:00:43 2016 -0800 Committer: Lee moon soo <[email protected]> Committed: Tue Mar 8 13:59:31 2016 -0800 ---------------------------------------------------------------------- .../apache/zeppelin/spark/SparkInterpreter.java | 72 ++++++++++---------- 1 file changed, 36 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/59aa3f02/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 5a1a0fd..57d2724 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -481,18 +481,18 @@ public class SparkInterpreter extends Interpreter { System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$"); - /* create scala repl */ - this.interpreter = new SparkILoop(null, new PrintWriter(out)); + synchronized (sharedInterpreterLock) { + /* create scala repl */ + this.interpreter = new SparkILoop(null, new PrintWriter(out)); - interpreter.settings_$eq(settings); + interpreter.settings_$eq(settings); - interpreter.createInterpreter(); + interpreter.createInterpreter(); - intp = interpreter.intp(); - intp.setContextClassLoader(); - intp.initializeSynchronous(); + intp = interpreter.intp(); + intp.setContextClassLoader(); + intp.initializeSynchronous(); - synchronized (sharedInterpreterLock) { if (classOutputDir == null) { classOutputDir = settings.outputDirs().getSingleOutput().get(); } else { @@ -523,35 +523,35 @@ public class SparkInterpreter extends Interpreter { sparkVersion = SparkVersion.fromVersionString(sc.version()); sqlc = getSQLContext(); - } - - dep = getDependencyResolver(); - - z = new ZeppelinContext(sc, sqlc, null, dep, - Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); - intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); - binder = (Map<String, Object>) getValue("_binder"); - binder.put("sc", sc); - binder.put("sqlc", sqlc); - binder.put("z", z); - - intp.interpret("@transient val z = " - + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]"); - intp.interpret("@transient val sc = " - + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]"); - intp.interpret("@transient val sqlc = " - + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); - intp.interpret("@transient val sqlContext = " - + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); - intp.interpret("import org.apache.spark.SparkContext._"); - - if (sparkVersion.oldSqlContextImplicits()) { - intp.interpret("import sqlContext._"); - } else { - intp.interpret("import sqlContext.implicits._"); - intp.interpret("import sqlContext.sql"); - intp.interpret("import org.apache.spark.sql.functions._"); + dep = getDependencyResolver(); + + z = new ZeppelinContext(sc, sqlc, null, dep, + Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); + + intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()"); + binder = (Map<String, Object>) getValue("_binder"); + binder.put("sc", sc); + binder.put("sqlc", sqlc); + binder.put("z", z); + + intp.interpret("@transient val z = " + + "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]"); + intp.interpret("@transient val sc = " + + "_binder.get(\"sc\").asInstanceOf[org.apache.spark.SparkContext]"); + intp.interpret("@transient val sqlc = " + + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); + intp.interpret("@transient val sqlContext = " + + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); + intp.interpret("import org.apache.spark.SparkContext._"); + + if (sparkVersion.oldSqlContextImplicits()) { + intp.interpret("import sqlContext._"); + } else { + intp.interpret("import sqlContext.implicits._"); + intp.interpret("import sqlContext.sql"); + intp.interpret("import org.apache.spark.sql.functions._"); + } } /* Temporary disabling DisplayUtils. see https://issues.apache.org/jira/browse/ZEPPELIN-127
