Repository: incubator-zeppelin Updated Branches: refs/heads/master 3bd2b2122 -> d0a304354
ZEPPELIN-152 Propagate error from interpreter process to the GUI https://issues.apache.org/jira/browse/ZEPPELIN-152 Errors from remote process are not propagated correctly. So, before go and check interpreter's log, it's hard to see actual problem. Author: Lee moon soo <[email protected]> Closes #136 from Leemoonsoo/ZEPPELIN-152 and squashes the following commits: 47ef559 [Lee moon soo] Display ERROR status correctly 650c788 [Lee moon soo] Fix unittest 34a0210 [Lee moon soo] Let exception goes 19cfc44 [Lee moon soo] Propagte remote process's exception Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/d0a30435 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/d0a30435 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/d0a30435 Branch: refs/heads/master Commit: d0a30435414726e7fa6d8b8e106e4b6ddb46da67 Parents: 3bd2b21 Author: Lee moon soo <[email protected]> Authored: Fri Jul 3 12:12:14 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Sun Jul 5 10:55:22 2015 -0700 ---------------------------------------------------------------------- .../zeppelin/spark/SparkSqlInterpreter.java | 12 +++----- .../zeppelin/spark/SparkSqlInterpreterTest.java | 9 ++++-- .../interpreter/ClassloaderInterpreter.java | 3 +- .../interpreter/remote/RemoteInterpreter.java | 4 ++- .../remote/RemoteInterpreterServer.java | 16 +++++----- .../java/org/apache/zeppelin/scheduler/Job.java | 14 +++------ .../zeppelin/scheduler/RemoteScheduler.java | 31 ++++++++++++++++--- .../remote/RemoteInterpreterTest.java | 32 ++++++++++++++++++++ .../apache/zeppelin/socket/NotebookServer.java | 4 ++- 9 files changed, 90 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 6e30f1f..e60ff2b 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -35,6 +35,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterUtils; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; @@ -128,13 +129,10 @@ public class SparkSqlInterpreter extends Interpreter { sc.setLocalProperty("spark.scheduler.pool", null); } - try { - Object rdd = sqlc.sql(st); - String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult); - return new InterpreterResult(Code.SUCCESS, msg); - } catch (Exception e) { - return new InterpreterResult(Code.ERROR, e.getMessage()); - } + + Object rdd = sqlc.sql(st); + String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult); + return new InterpreterResult(Code.SUCCESS, msg); } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index eaa0a8a..30166a7 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -17,7 +17,7 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.util.HashMap; import java.util.LinkedList; @@ -91,7 +91,12 @@ public class SparkSqlInterpreterTest { assertEquals(Type.TABLE, ret.type()); assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message()); - assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code()); + try { + sql.interpret("select wrong syntax", context); + fail("Exception not catched"); + } catch (Exception e) { + // okay + } assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code()); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java index d3d6c1c..3fb4eb4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/ClassloaderInterpreter.java @@ -55,8 +55,9 @@ public class ClassloaderInterpreter Thread.currentThread().setContextClassLoader(cl); try { return intp.interpret(st, context); + } catch (InterpreterException e) { + throw e; } catch (Exception e) { - e.printStackTrace(); throw new InterpreterException(e); } finally { cl = Thread.currentThread().getContextClassLoader(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index cd77dc4..8992f55 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterResult.Type; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; @@ -222,7 +223,8 @@ public class RemoteInterpreter extends Interpreter { context.getGui().setForms(remoteGui.getForms()); } - return convert(remoteResult); + InterpreterResult result = convert(remoteResult); + return result; } catch (TException e) { throw new InterpreterException(e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 8b4b236..33baf9a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -42,6 +42,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; @@ -210,18 +211,15 @@ public class RemoteInterpreterServer } } + InterpreterResult result; if (job.getStatus() == Status.ERROR) { - throw new TException(job.getException()); + result = new InterpreterResult(Code.ERROR, Job.getStack(job.getException())); } else { - if (intp.getFormType() == FormType.NATIVE) { - // serialize dynamic form - - } - - return convert((InterpreterResult) job.getReturn(), - context.getConfig(), - context.getGui()); + result = (InterpreterResult) job.getReturn(); } + return convert(result, + context.getConfig(), + context.getGui()); } class InterpretJobListener implements JobListener { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 9837ad2..4c8c70a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -21,7 +21,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.commons.lang.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,17 +191,13 @@ public abstract class Job { } } - public String getStack(Throwable e) { - StackTraceElement[] stacks = e.getStackTrace(); - if (stacks == null) { + public static String getStack(Throwable e) { + if (e == null) { return ""; } - String ss = ""; - for (StackTraceElement s : stacks) { - ss += s.toString() + "\n"; - } - return ss; + Throwable cause = ExceptionUtils.getRootCause(e); + return ExceptionUtils.getFullStackTrace(cause); } public Throwable getException() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 1bf91d6..904dc22 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.apache.zeppelin.scheduler.Job.Status; @@ -179,6 +181,10 @@ public class RemoteScheduler implements Scheduler { } } + if (terminate) { + // terminated by shutdown + break; + } Status newStatus = getStatus(); if (newStatus == null) { // unknown @@ -187,9 +193,10 @@ public class RemoteScheduler implements Scheduler { if (newStatus != Status.READY && newStatus != Status.PENDING) { // we don't need more - continue; + break; } } + terminate = true; } public void shutdown() { @@ -233,9 +240,9 @@ public class RemoteScheduler implements Scheduler { if ("Unknown".equals(statusStr)) { // not found this job in the remote schedulers. // maybe not submitted, maybe already finished - Status status = getLastStatus(); - listener.afterStatusChange(job, null, status); - return status; + //Status status = getLastStatus(); + listener.afterStatusChange(job, null, null); + return job.getStatus(); } Status status = Status.valueOf(statusStr); lastStatus = status; @@ -294,6 +301,7 @@ public class RemoteScheduler implements Scheduler { listener.jobStarted(scheduler, job); } job.run(); + jobExecuted = true; jobSubmittedRemotely = true; @@ -304,7 +312,16 @@ public class RemoteScheduler implements Scheduler { logger.error("JobStatusPoller interrupted", e); } - job.setStatus(jobStatusPoller.getStatus()); + // set job status based on result. + Status lastStatus = jobStatusPoller.getStatus(); + Object jobResult = job.getReturn(); + if (jobResult != null && jobResult instanceof InterpreterResult) { + if (((InterpreterResult) jobResult).code() == Code.ERROR) { + lastStatus = Status.ERROR; + } + } + job.setStatus(lastStatus); + if (listener != null) { listener.jobFinished(scheduler, job); } @@ -331,10 +348,14 @@ public class RemoteScheduler implements Scheduler { if (after == null) { // unknown. maybe before sumitted remotely, maybe already finished. if (jobExecuted) { jobSubmittedRemotely = true; + Object jobResult = job.getReturn(); if (job.isAborted()) { job.setStatus(Status.ABORT); } else if (job.getException() != null) { job.setStatus(Status.ERROR); + } else if (jobResult != null && jobResult instanceof InterpreterResult + && ((InterpreterResult) jobResult).code() == Code.ERROR) { + job.setStatus(Status.ERROR); } else { job.setStatus(Status.FINISHED); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index b49f86d..4338c50 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -37,6 +37,8 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.InterpretJob; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; import org.apache.zeppelin.scheduler.Job; @@ -129,6 +131,36 @@ public class RemoteInterpreterTest { } @Test + public void testRemoteInterperterErrorStatus() throws TTransportException, IOException { + Properties p = new Properties(); + + RemoteInterpreter intpA = new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000 + ); + + intpGroup.add(intpA); + intpA.setInterpreterGroup(intpGroup); + + intpA.open(); + InterpreterResult ret = intpA.interpret("non numeric value", + new InterpreterContext( + "id", + "title", + "text", + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>())); + + assertEquals(Code.ERROR, ret.code()); + } + + @Test public void testRemoteSchedulerSharing() throws TTransportException, IOException { Properties p = new Properties(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/d0a30435/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index c8238b4..659d4df 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -568,7 +568,9 @@ public class NotebookServer extends WebSocketServer implements @Override public void afterStatusChange(Job job, Status before, Status after) { if (after == Status.ERROR) { - job.getException().printStackTrace(); + if (job.getException() != null) { + LOG.error("Error", job.getException()); + } } if (job.isTerminated()) { LOG.info("Job {} is finished", job.getId());
