Repository: zeppelin Updated Branches: refs/heads/master 74c0408d3 -> 8d03920b9
[Zeppelin-802] Support for Zeppelin Context redefinition on Python and Pyspark ### What is this PR for? If you override the reserved word ZeppelinContext such as `z` in the python language, the whole paragraph output problem occurred. I have taken care to avoid this issue. `z` == `_zc` == `zeppelin context` ### What type of PR is it? Improvement ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-802 ### How should this be tested? The error should not occur in the following situations: ``` %python z = 1 print("Hello Zeppelin") ``` ``` %pyspark z = 1 print("Hello Zeppelin") ``` ### Screenshots (if appropriate) #### before ![replace zeppelin context-err](https://cloud.githubusercontent.com/assets/10525473/24521772/319946be-15c8-11e7-96cf-7fdf41c70a66.png) #### after ![replace zeppelin context](https://cloud.githubusercontent.com/assets/10525473/24521775/349fa7cc-15c8-11e7-8fe4-4f3f5597deff.png) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: CloverHearts <cloverhearts...@gmail.com> Author: cloverhearts <cloverhearts...@gmail.com> Closes #2207 from cloverhearts/ZEPPELIN-802-pyspark-zeppelin-context and squashes the following commits: cc986010 [CloverHearts] added completion on namespace 14695cb8 [CloverHearts] Recovering a member name that is not associated with a namespace 31af92ab [CloverHearts] fix test case _zc to __zeppelin__ 6697d677 [CloverHearts] apply to namespace and replace name _zc to __zeppelin__ ca795cff [cloverhearts] replace output 1d372df4 [cloverhearts] change name logger 4e8435ac [CloverHearts] added test case on python b6b804ad [CloverHearts] replace name zeppelin context on python 9fbf70d6 [CloverHearts] fix pyspark test case 987e2118 [CloverHearts] added test code 5da3d6ed [CloverHearts] replace name zeppelin context on pyspark Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/8d03920b Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/8d03920b Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/8d03920b Branch: refs/heads/master Commit: 8d03920b9bec86dd8e2fc343d32749a08f501362 Parents: 74c0408 Author: CloverHearts <cloverhearts...@gmail.com> Authored: Wed Apr 12 17:14:36 2017 +0900 Committer: CloverHearts <cloverhearts...@gmail.com> Committed: Mon Apr 17 12:39:02 2017 +0900 ---------------------------------------------------------------------- .../zeppelin/python/PythonInterpreter.java | 2 +- .../python/PythonInterpreterPandasSql.java | 3 +- .../main/resources/python/zeppelin_python.py | 16 +++++--- .../zeppelin/python/PythonInterpreterTest.java | 13 ++++++ .../zeppelin/spark/PySparkInterpreter.java | 8 ++-- .../main/resources/python/zeppelin_pyspark.py | 42 ++++++++++++++------ .../zeppelin/spark/PySparkInterpreterTest.java | 15 +++++++ 7 files changed, 75 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d03920b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index df62406..7f6a7eb 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -222,7 +222,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl // Add matplotlib display hook InterpreterGroup intpGroup = getInterpreterGroup(); if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { - registerHook(HookType.POST_EXEC_DEV, "z._displayhook()"); + registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()"); } // Add matplotlib display hook try { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d03920b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java ---------------------------------------------------------------------- diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java index 6bf1970..e73d7b3 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreterPandasSql.java @@ -87,7 +87,8 @@ public class PythonInterpreterPandasSql extends Interpreter { LOG.info("Running SQL query: '{}' over Pandas DataFrame", st); Interpreter python = getPythonInterpreter(); - return python.interpret("z.show(pysqldf('" + st + "'))\nz._displayhook()", context); + return python.interpret( + "__zeppelin__.show(pysqldf('" + st + "'))\n__zeppelin__._displayhook()", context); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d03920b/python/src/main/resources/python/zeppelin_python.py ---------------------------------------------------------------------- diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py index a537c5d..31b993d 100644 --- a/python/src/main/resources/python/zeppelin_python.py +++ b/python/src/main/resources/python/zeppelin_python.py @@ -195,6 +195,7 @@ host = "127.0.0.1" if len(sys.argv) >= 3: host = sys.argv[2] +_zcUserQueryNameSpace = {} client = GatewayClient(address=host, port=int(sys.argv[1])) #gateway = JavaGateway(client, auto_convert = True) @@ -204,8 +205,11 @@ intp = gateway.entry_point intp.onPythonScriptInitialized(os.getpid()) java_import(gateway.jvm, "org.apache.zeppelin.display.Input") -z = PyZeppelinContext(intp) -z._setup_matplotlib() +z = __zeppelin__ = PyZeppelinContext(intp) +__zeppelin__._setup_matplotlib() + +_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ +_zcUserQueryNameSpace["z"] = z output = Logger() sys.stdout = output @@ -227,7 +231,7 @@ while True : global_hook = None try: - user_hook = z.getHook('post_exec') + user_hook = __zeppelin__.getHook('post_exec') except: user_hook = None @@ -263,17 +267,17 @@ while True : for node in to_run_exec: mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') - exec(code) + exec(code, _zcUserQueryNameSpace) for node in to_run_single: mod = ast.Interactive([node]) code = compile(mod, '<stdin>', 'single') - exec(code) + exec(code, _zcUserQueryNameSpace) for node in to_run_hooks: mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') - exec(code) + exec(code, _zcUserQueryNameSpace) except: raise Exception(traceback.format_exc()) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d03920b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java ---------------------------------------------------------------------- diff --git a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java index b5cd680..837626c 100644 --- a/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java +++ b/python/src/test/java/org/apache/zeppelin/python/PythonInterpreterTest.java @@ -106,6 +106,19 @@ public class PythonInterpreterTest implements InterpreterOutputListener { assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("hi\nhi\nhi")); } + @Test + public void testRedefinitionZeppelinContext() { + String pyRedefinitionCode = "z = 1\n"; + String pyRestoreCode = "z = __zeppelin__\n"; + String pyValidCode = "z.input(\"test\")\n"; + + assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyValidCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyRedefinitionCode, context).code()); + assertEquals(InterpreterResult.Code.ERROR, pythonInterpreter.interpret(pyValidCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyRestoreCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pythonInterpreter.interpret(pyValidCode, context).code()); + } + @Override public void onUpdateAll(InterpreterOutput out) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d03920b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 6e957ed..da99b9f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -113,7 +113,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand // Add matplotlib display hook InterpreterGroup intpGroup = getInterpreterGroup(); if (intpGroup != null && intpGroup.getInterpreterHookRegistry() != null) { - registerHook(HookType.POST_EXEC_DEV, "z._displayhook()"); + registerHook(HookType.POST_EXEC_DEV, "__zeppelin__._displayhook()"); } DepInterpreter depInterpreter = getDepInterpreter(); @@ -390,9 +390,9 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return new InterpreterResult(Code.ERROR, errorMessage); } String jobGroup = Utils.buildJobGroupId(context); - ZeppelinContext z = sparkInterpreter.getZeppelinContext(); - z.setInterpreterContext(context); - z.setGui(context.getGui()); + ZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext(); + __zeppelin__.setInterpreterContext(context); + __zeppelin__.setGui(context.getGui()); pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup); statementOutput = null; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d03920b/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 6c39400..5029d59 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -271,19 +271,37 @@ else: java_import(gateway.jvm, "scala.Tuple2") +_zcUserQueryNameSpace = {} + jconf = intp.getSparkConf() conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) -sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) +sc = _zsc_ = SparkContext(jsc=jsc, gateway=gateway, conf=conf) +_zcUserQueryNameSpace["_zsc_"] = _zsc_ +_zcUserQueryNameSpace["sc"] = sc + if sparkVersion.isSpark2(): - spark = SparkSession(sc, intp.getSparkSession()) - sqlc = spark._wrapped + spark = __zSpark__ = SparkSession(sc, intp.getSparkSession()) + sqlc = __zSqlc__ = __zSpark__._wrapped + _zcUserQueryNameSpace["sqlc"] = sqlc + _zcUserQueryNameSpace["__zSqlc__"] = __zSqlc__ + _zcUserQueryNameSpace["spark"] = spark + _zcUserQueryNameSpace["__zSpark__"] = __zSpark__ else: - sqlc = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) -sqlContext = sqlc + sqlc = __zSqlc__ = SQLContext(sparkContext=sc, sqlContext=intp.getSQLContext()) + _zcUserQueryNameSpace["sqlc"] = sqlc + _zcUserQueryNameSpace["__zSqlc__"] = sqlc + +sqlContext = __zSqlc__ +_zcUserQueryNameSpace["sqlContext"] = sqlContext + +completion = __zeppelin_completion__ = PySparkCompletion(intp) +_zcUserQueryNameSpace["completion"] = completion +_zcUserQueryNameSpace["__zeppelin_completion__"] = __zeppelin_completion__ -completion = PySparkCompletion(intp) -z = PyZeppelinContext(intp.getZeppelinContext()) -z._setup_matplotlib() +z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext()) +__zeppelin__._setup_matplotlib() +_zcUserQueryNameSpace["z"] = z +_zcUserQueryNameSpace["__zeppelin__"] = __zeppelin__ while True : req = intp.getStatements() @@ -299,7 +317,7 @@ while True : global_hook = None try: - user_hook = z.getHook('post_exec') + user_hook = __zeppelin__.getHook('post_exec') except: user_hook = None @@ -334,17 +352,17 @@ while True : for node in to_run_exec: mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') - exec(code) + exec(code, _zcUserQueryNameSpace) for node in to_run_single: mod = ast.Interactive([node]) code = compile(mod, '<stdin>', 'single') - exec(code) + exec(code, _zcUserQueryNameSpace) for node in to_run_hooks: mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') - exec(code) + exec(code, _zcUserQueryNameSpace) except: raise Exception(traceback.format_exc()) http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d03920b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 3697512..d47a8bd 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -123,6 +123,21 @@ public class PySparkInterpreterTest { } } + @Test + public void testRedefinitionZeppelinContext() { + if (getSparkVersionNumber() > 11) { + String redefinitionCode = "z = 1\n"; + String restoreCode = "z = __zeppelin__\n"; + String validCode = "z.input(\"test\")\n"; + + assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(redefinitionCode, context).code()); + assertEquals(InterpreterResult.Code.ERROR, pySparkInterpreter.interpret(validCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(restoreCode, context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret(validCode, context).code()); + } + } + private class infinityPythonJob implements Runnable { @Override public void run() {