Repository: incubator-zeppelin Updated Branches: refs/heads/master 706755a35 -> 26c89a336
fix remoteinterpreterserver deadlock problem After using zeppelin server for some days, we often found that paragraph can't run while the zeppelin server process is on, restart interpreter is just not work. Then we find out that the RemoteInterpreterServer is in deadlock, and can't be stopped by zeppelin server. Unfortunately, this deadlocked RemoteInterpreterServer will still hold the hadoop resources, we need to kill the process manually each time. The detail Info shows in: https://issues.apache.org/jira/browse/ZEPPELIN-271 We figure out that the deadlock is caused by the lack of synchronized (intpGroup) in SparkSqlInterpreter.java Author: æ¾æè¥¿ <[email protected]> Closes #267 from zenglinxi0615/fix-remoteinterpreterserver-deadlock and squashes the following commits: a173374 [æ¾æè¥¿] fix a problem in getSparkInterpreter() 160fa02 [æ¾æè¥¿] move ((LazyOpenInterpreter) p).open() outside of synchronized(intpGroup) block to fix the remoteinterpreterserver deadlock 93e703d [æ¾æè¥¿] add synchronized (intpGroup) in open() of LazyOpenInterpreter.java, to make sure thread-locking sequence consistent between multithreading 4ec953b [æ¾æè¥¿] fix remoteinterpreterserver deadlock problem Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/26c89a33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/26c89a33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/26c89a33 Branch: refs/heads/master Commit: 26c89a33646c3de2c4d85efe084d524347abf23b Parents: 706755a Author: æ¾æè¥¿ <[email protected]> Authored: Tue Sep 1 23:05:12 2015 +0800 Committer: Lee moon soo <[email protected]> Committed: Wed Sep 2 20:11:27 2015 -0700 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 11 +++++--- .../zeppelin/spark/SparkSqlInterpreter.java | 27 +++++++++++++------- 2 files changed, 26 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/26c89a33/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 852dd33..c579d21 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -344,21 +344,26 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private SparkInterpreter getSparkInterpreter() { InterpreterGroup intpGroup = getInterpreterGroup(); + LazyOpenInterpreter lazy = null; + SparkInterpreter spark = null; synchronized (intpGroup) { for (Interpreter intp : getInterpreterGroup()){ if (intp.getClassName().equals(SparkInterpreter.class.getName())) { Interpreter p = intp; while (p instanceof WrappedInterpreter) { if (p instanceof LazyOpenInterpreter) { - ((LazyOpenInterpreter) p).open(); + lazy = (LazyOpenInterpreter) p; } p = ((WrappedInterpreter) p).getInnerInterpreter(); } - return (SparkInterpreter) p; + spark = (SparkInterpreter) p; } } } - return null; + if (lazy != null) { + lazy.open(); + } + return spark; } public ZeppelinContext getZeppelinContext() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/26c89a33/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 053b887..7c1ba11 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -27,6 +27,7 @@ import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -81,19 +82,27 @@ public class SparkSqlInterpreter extends Interpreter { } private SparkInterpreter getSparkInterpreter() { - for (Interpreter intp : getInterpreterGroup()) { - if (intp.getClassName().equals(SparkInterpreter.class.getName())) { - Interpreter p = intp; - while (p instanceof WrappedInterpreter) { - if (p instanceof LazyOpenInterpreter) { - p.open(); + InterpreterGroup intpGroup = getInterpreterGroup(); + LazyOpenInterpreter lazy = null; + SparkInterpreter spark = null; + synchronized (intpGroup) { + for (Interpreter intp : getInterpreterGroup()){ + if (intp.getClassName().equals(SparkInterpreter.class.getName())) { + Interpreter p = intp; + while (p instanceof WrappedInterpreter) { + if (p instanceof LazyOpenInterpreter) { + lazy = (LazyOpenInterpreter) p; + } + p = ((WrappedInterpreter) p).getInnerInterpreter(); } - p = ((WrappedInterpreter) p).getInnerInterpreter(); + spark = (SparkInterpreter) p; } - return (SparkInterpreter) p; } } - return null; + if (lazy != null) { + lazy.open(); + } + return spark; } public boolean concurrentSQL() {
