Repository: zeppelin Updated Branches: refs/heads/master 6eecdecb5 -> 5cd806dc3
ZEPPELIN-2445. Display which line of python code raise exception ### What is this PR for? When python code raise exception, user can only see the stacktrace, but don't know which line cause it. It is not so convenient especially when you write many lines of code. This PR would display which line raise exception. Besides that, I also fix another issue that we would have duplicated error message in python3. (See screenshot) ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2445 ### How should this be tested? Unit test is added ### Screenshots (if appropriate) Python 2 Before ![image](https://cloud.githubusercontent.com/assets/164491/25367168/89ce67fa-29a6-11e7-900c-cff2c66a6df4.png) Python2 After ![image](https://cloud.githubusercontent.com/assets/164491/25366936/65d246d8-29a5-11e7-8ad7-6786252e7913.png) Python 3 Before ![image](https://cloud.githubusercontent.com/assets/164491/25367181/9d9a1608-29a6-11e7-8a62-5404b5941256.png) Python3 After ![image](https://cloud.githubusercontent.com/assets/164491/25366943/6f0200c2-29a5-11e7-8fe6-80794e7c7d86.png) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2280 from zjffdu/ZEPPELIN-2445 and squashes the following commits: fa0cfba [Jeff Zhang] ZEPPELIN-2445. Display which line of python code raise exception Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/5cd806dc Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/5cd806dc Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/5cd806dc Branch: refs/heads/master Commit: 5cd806dc3290cbcdcfb7db4f160e32077cf2e522 Parents: 6eecdec Author: Jeff Zhang <zjf...@apache.org> Authored: Mon Apr 24 15:55:14 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Thu Apr 27 13:10:06 2017 +0800 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 32 ++++++------ .../main/resources/python/zeppelin_pyspark.py | 51 ++++++++------------ spark/src/test/resources/log4j.properties | 47 ++++++++++++++++++ .../util/InterpreterOutputStream.java | 2 +- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 21 ++++++++ .../src/test/resources/log4j.properties | 3 +- 6 files changed, 109 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index bf0a915..56cec94 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -63,7 +63,7 @@ import py4j.GatewayServer; * */ public class PySparkInterpreter extends Interpreter implements ExecuteResultHandler { - Logger logger = LoggerFactory.getLogger(PySparkInterpreter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreter.class); private GatewayServer gatewayServer; private DefaultExecutor executor; private int port; @@ -106,7 +106,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand throw new InterpreterException(e); } - logger.info("File {} created", scriptPath); + LOGGER.info("File {} created", scriptPath); } @Override @@ -131,7 +131,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand try { urlList.add(f.toURI().toURL()); } catch (MalformedURLException e) { - logger.error("Error", e); + LOGGER.error("Error", e); } } } @@ -148,7 +148,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand try { urlList.add(f.toURI().toURL()); } catch (MalformedURLException e) { - logger.error("Error", e); + LOGGER.error("Error", e); } } } @@ -162,7 +162,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand Thread.currentThread().setContextClassLoader(newCl); createGatewayServerAndStartScript(); } catch (Exception e) { - logger.error("Error", e); + LOGGER.error("Error", e); throw new InterpreterException(e); } finally { Thread.currentThread().setContextClassLoader(oldCl); @@ -217,7 +217,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand cmd.addArgument(Integer.toString(port), false); cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false); executor = new DefaultExecutor(); - outputStream = new InterpreterOutputStream(logger); + outputStream = new InterpreterOutputStream(LOGGER); PipedOutputStream ps = new PipedOutputStream(); in = null; try { @@ -313,6 +313,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand public void setStatementsFinished(String out, boolean error) { synchronized (statementFinishedNotifier) { + LOGGER.debug("Setting python statement output: " + out + ", error: " + error); statementOutput = out; statementError = error; statementFinishedNotifier.notify(); @@ -325,12 +326,14 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand public void onPythonScriptInitialized(long pid) { pythonPid = pid; synchronized (pythonScriptInitializeNotifier) { + LOGGER.debug("onPythonScriptInitialized is called"); pythonScriptInitialized = true; pythonScriptInitializeNotifier.notifyAll(); } } public void appendOutput(String message) throws IOException { + LOGGER.debug("Output from python process: " + message); outputStream.getInterpreterOutput().write(message); } @@ -358,6 +361,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand try { pythonScriptInitializeNotifier.wait(1000); } catch (InterruptedException e) { + e.printStackTrace(); } } } @@ -426,10 +430,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand public void interrupt() throws IOException { if (pythonPid > -1) { - logger.info("Sending SIGINT signal to PID : " + pythonPid); + LOGGER.info("Sending SIGINT signal to PID : " + pythonPid); Runtime.getRuntime().exec("kill -SIGINT " + pythonPid); } else { - logger.warn("Non UNIX/Linux system, close the interpreter"); + LOGGER.warn("Non UNIX/Linux system, close the interpreter"); close(); } } @@ -441,7 +445,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand try { interrupt(); } catch (IOException e) { - logger.error("Error", e); + LOGGER.error("Error", e); } } @@ -486,13 +490,13 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand && pythonscriptRunning) { try { if (System.currentTimeMillis() - startTime > MAX_TIMEOUT_SEC * 1000) { - logger.error("pyspark completion didn't have response for {}sec.", MAX_TIMEOUT_SEC); + LOGGER.error("pyspark completion didn't have response for {}sec.", MAX_TIMEOUT_SEC); break; } statementFinishedNotifier.wait(1000); } catch (InterruptedException e) { // not working - logger.info("wait drop"); + LOGGER.info("wait drop"); return new LinkedList<>(); } } @@ -527,7 +531,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand completionScriptText = text.substring(0, cursor); } catch (Exception e) { - logger.error(e.toString()); + LOGGER.error(e.toString()); return null; } completionEndPosition = completionScriptText.length(); @@ -637,12 +641,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public void onProcessComplete(int exitValue) { pythonscriptRunning = false; - logger.info("python process terminated. exit code " + exitValue); + LOGGER.info("python process terminated. exit code " + exitValue); } @Override public void onProcessFailed(ExecuteException e) { pythonscriptRunning = false; - logger.error("python process failed", e); + LOGGER.error("python process failed", e); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index da4d794..4376888 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -21,15 +21,7 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.protocol import Py4JJavaError from pyspark.conf import SparkConf from pyspark.context import SparkContext -from pyspark.rdd import RDD -from pyspark.files import SparkFiles -from pyspark.storagelevel import StorageLevel -from pyspark.accumulators import Accumulator, AccumulatorParam -from pyspark.broadcast import Broadcast -from pyspark.serializers import MarshalSerializer, PickleSerializer -import warnings import ast -import traceback import warnings # for back compatibility @@ -231,20 +223,13 @@ class PySparkCompletion: result = json.dumps(list(filter(lambda x : not re.match("^__.*", x), list(completionList)))) self.interpreterObject.setStatementsFinished(result, False) - -output = Logger() -sys.stdout = output -sys.stderr = output - client = GatewayClient(port=int(sys.argv[1])) sparkVersion = SparkVersion(int(sys.argv[2])) - if sparkVersion.isSpark2(): from pyspark.sql import SparkSession else: from pyspark.sql import SchemaRDD - if sparkVersion.isAutoConvertEnabled(): gateway = JavaGateway(client, auto_convert = True) else: @@ -257,6 +242,9 @@ java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") intp = gateway.entry_point +output = Logger() +sys.stdout = output +sys.stderr = output intp.onPythonScriptInitialized(os.getpid()) jsc = intp.getJavaSparkContext() @@ -310,7 +298,6 @@ while True : try: stmts = req.statements().split("\n") jobGroup = req.jobGroup() - final_code = [] # Get post-execute hooks try: @@ -328,22 +315,11 @@ while True : if hook: nhooks += 1 - for s in stmts: - if s == None: - continue - - # skip comment - s_stripped = s.strip() - if len(s_stripped) == 0 or s_stripped.startswith("#"): - continue - - final_code.append(s) - - if final_code: + if stmts: # use exec mode to compile the statements except the last statement, # so that the last statement's evaluation will be printed to stdout sc.setJobGroup(jobGroup, "Zeppelin") - code = compile('\n'.join(final_code), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1) + code = compile('\n'.join(stmts), '<stdin>', 'exec', ast.PyCF_ONLY_AST, 1) to_run_hooks = [] if (nhooks > 0): to_run_hooks = code.body[-nhooks:] @@ -365,10 +341,23 @@ while True : mod = ast.Module([node]) code = compile(mod, '<stdin>', 'exec') exec(code, _zcUserQueryNameSpace) + + intp.setStatementsFinished("", False) + except Py4JJavaError: + # raise it to outside try except + raise except: - raise Exception(traceback.format_exc()) + exception = traceback.format_exc() + m = re.search("File \"<stdin>\", line (\d+).*", exception) + if m: + line_no = int(m.group(1)) + intp.setStatementsFinished( + "Fail to execute line {}: {}\n".format(line_no, stmts[line_no - 1]) + exception, True) + else: + intp.setStatementsFinished(exception, True) + else: + intp.setStatementsFinished("", False) - intp.setStatementsFinished("", False) except Py4JJavaError: excInnerError = traceback.format_exc() # format_tb() does not return the inner exception innerErrorStart = excInnerError.find("Py4JJavaError:") http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/spark/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/spark/src/test/resources/log4j.properties b/spark/src/test/resources/log4j.properties new file mode 100644 index 0000000..b0d1067 --- /dev/null +++ b/spark/src/test/resources/log4j.properties @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c:%L - %m%n +#log4j.appender.stdout.layout.ConversionPattern= +#%5p [%t] (%F:%L) - %m%n +#%-4r [%t] %-5p %c %x - %m%n +# + +# Root logger option +log4j.rootLogger=INFO, stdout + +#mute some noisy guys +log4j.logger.org.apache.hadoop.mapred=WARN +log4j.logger.org.apache.hadoop.hive.ql=WARN +log4j.logger.org.apache.hadoop.hive.metastore=WARN +log4j.logger.org.apache.haadoop.hive.service.HiveServer=WARN +log4j.logger.org.apache.zeppelin.scheduler=WARN + +log4j.logger.org.quartz=WARN +log4j.logger.DataNucleus=WARN +log4j.logger.DataNucleus.MetaData=ERROR +log4j.logger.DataNucleus.Datastore=ERROR + +# Log all JDBC parameters +log4j.logger.org.hibernate.type=ALL + +log4j.logger.org.apache.zeppelin.interpreter=DEBUG +log4j.logger.org.apache.zeppelin.spark=DEBUG http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java index 6bdc2db..6f2a0b4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/InterpreterOutputStream.java @@ -28,7 +28,7 @@ import java.io.IOException; * Can be used to channel output from interpreters. */ public class InterpreterOutputStream extends LogOutputStream { - public static Logger logger; + private Logger logger; InterpreterOutput interpreterOutput; boolean ignoreLeadingNewLinesFromScalaReporter = false; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index ec0e0bd..2714352 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -275,6 +275,27 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { waitForFinish(p); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("[Row(len=u'3')]\n", p.getResult().message().get(0).getData()); + + // test exception + p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); + config = p.getConfig(); + config.put("enabled", true); + p.setConfig(config); + /** + %pyspark + a=1 + + print(a2) + */ + p.setText("%pyspark a=1\n\nprint(a2)"); + p.setAuthenticationInfo(anonymous); + note.run(p.getId()); + waitForFinish(p); + assertEquals(Status.ERROR, p.getStatus()); + assertTrue(p.getResult().message().get(0).getData() + .contains("Fail to execute line 3: print(a2)")); + assertTrue(p.getResult().message().get(0).getData() + .contains("name 'a2' is not defined")); } if (sparkVersion >= 20) { // run SparkSession test http://git-wip-us.apache.org/repos/asf/zeppelin/blob/5cd806dc/zeppelin-server/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/resources/log4j.properties b/zeppelin-server/src/test/resources/log4j.properties index 041daf0..b0d1067 100644 --- a/zeppelin-server/src/test/resources/log4j.properties +++ b/zeppelin-server/src/test/resources/log4j.properties @@ -43,4 +43,5 @@ log4j.logger.DataNucleus.Datastore=ERROR # Log all JDBC parameters log4j.logger.org.hibernate.type=ALL -log4j.logger.org.apache.zeppelin.interpreter=DEBUG \ No newline at end of file +log4j.logger.org.apache.zeppelin.interpreter=DEBUG +log4j.logger.org.apache.zeppelin.spark=DEBUG