Repository: zeppelin Updated Branches: refs/heads/master 64bbba479 -> 500b74b19
ZEPPELIN-3242. Listener threw an exception java.lang.NPEat o.a.zeppelin.spark.Utils.getNoteId(Utils.java:156) ### What is this PR for? This issue also cause spark url can not be displayed in frontend. The root cause is that PySparkInterpreter/IPySparkInterpreter doesn't set JobGroup correctly. ### What type of PR is it? [Bug Fix] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3242 ### How should this be tested? * CI pass and also manually verified. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? NO * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2822 from zjffdu/ZEPPELIN-3242 and squashes the following commits: 8254162 [Jeff Zhang] ZEPPELIN-3242. Listener threw an exception java.lang.NPEat o.a.zeppelin.spark.Utils.getNoteId(Utils.java:156) Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/500b74b1 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/500b74b1 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/500b74b1 Branch: refs/heads/master Commit: 500b74b196b740c810553c43216a56e23ab9caf0 Parents: 64bbba4 Author: Jeff Zhang <zjf...@apache.org> Authored: Tue Feb 27 20:53:54 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Wed Feb 28 10:27:08 2018 +0800 ---------------------------------------------------------------------- .../zeppelin/spark/IPySparkInterpreter.java | 11 +++++++ .../zeppelin/spark/PySparkInterpreter.java | 13 +++++--- .../zeppelin/spark/IPySparkInterpreterTest.java | 33 +++++++++++++------- .../zeppelin/spark/NewSparkInterpreterTest.java | 17 ++++++++-- .../zeppelin/spark/SparkRInterpreterTest.java | 19 +++++++++-- .../interpreter/InterpreterContext.java | 4 +++ 6 files changed, 77 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java index a75fda8..3691156 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java @@ -23,6 +23,7 @@ import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.python.IPythonInterpreter; @@ -99,6 +100,16 @@ public class IPySparkInterpreter extends IPythonInterpreter { } @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + InterpreterContext.set(context); + sparkInterpreter.populateSparkWebUrl(context); + String jobGroupId = Utils.buildJobGroupId(context); + String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); + String setJobGroupStmt = "sc.setJobGroup('" + jobGroupId + "', '" + jobDesc + "')"; + return super.interpret(setJobGroupStmt +"\n" + st, context); + } + + @Override public void cancel(InterpreterContext context) throws InterpreterException { super.cancel(context); sparkInterpreter.cancel(context); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 0703ad7..f5e4793 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -406,16 +406,16 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { + if (iPySparkInterpreter != null) { + return iPySparkInterpreter.interpret(st, context); + } + SparkInterpreter sparkInterpreter = getSparkInterpreter(); - sparkInterpreter.populateSparkWebUrl(context); if (sparkInterpreter.isUnsupportedSparkVersion()) { return new InterpreterResult(Code.ERROR, "Spark " + sparkInterpreter.getSparkVersion().toString() + " is not supported"); } - - if (iPySparkInterpreter != null) { - return iPySparkInterpreter.interpret(st, context); - } + sparkInterpreter.populateSparkWebUrl(context); if (!pythonscriptRunning) { return new InterpreterResult(Code.ERROR, "python process not running" @@ -467,10 +467,13 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } String jobGroup = Utils.buildJobGroupId(context); String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); + SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext(); __zeppelin__.setInterpreterContext(context); __zeppelin__.setGui(context.getGui()); __zeppelin__.setNoteGui(context.getNoteGui()); + InterpreterContext.set(context); + pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup, jobDesc); statementOutput = null; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index 5eaa42c..46a3a72 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; +import org.apache.zeppelin.interpreter.remote.RemoteEventClient; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.python.IPythonInterpreterTest; import org.apache.zeppelin.user.AuthenticationInfo; @@ -39,15 +40,21 @@ import java.net.URL; import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; public class IPySparkInterpreterTest { private IPySparkInterpreter iPySparkInterpreter; private InterpreterGroup intpGroup; + private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class); @Before public void setup() throws InterpreterException { @@ -69,11 +76,13 @@ public class IPySparkInterpreterTest { intpGroup.get("session_1").add(sparkInterpreter); sparkInterpreter.setInterpreterGroup(intpGroup); sparkInterpreter.open(); + sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient); iPySparkInterpreter = new IPySparkInterpreter(p); intpGroup.get("session_1").add(iPySparkInterpreter); iPySparkInterpreter.setInterpreterGroup(intpGroup); iPySparkInterpreter.open(); + sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient); } @@ -91,17 +100,21 @@ public class IPySparkInterpreterTest { // rdd InterpreterContext context = getInterpreterContext(); - InterpreterResult result = iPySparkInterpreter.interpret("sc.range(1,10).sum()", context); + InterpreterResult result = iPySparkInterpreter.interpret("sc.version", context); Thread.sleep(100); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages(); - assertEquals("45", interpreterResultMessages.get(0).getData()); + // spark url is sent + verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class)); context = getInterpreterContext(); - result = iPySparkInterpreter.interpret("sc.version", context); + result = iPySparkInterpreter.interpret("sc.range(1,10).sum()", context); Thread.sleep(100); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - interpreterResultMessages = context.out.getInterpreterResultMessages(); + List<InterpreterResultMessage> interpreterResultMessages = context.out.getInterpreterResultMessages(); + assertEquals("45", interpreterResultMessages.get(0).getData()); + // spark job url is sent + verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), any(String.class), any(Map.class)); + // spark sql context = getInterpreterContext(); if (interpreterResultMessages.get(0).getData().startsWith("'1.") || @@ -146,7 +159,6 @@ public class IPySparkInterpreterTest { "1 a\n" + "2 b\n", interpreterResultMessages.get(0).getData()); } - // cancel final InterpreterContext context2 = getInterpreterContext(); @@ -166,6 +178,7 @@ public class IPySparkInterpreterTest { }; thread.start(); + // sleep 1 second to wait for the spark job starts Thread.sleep(1000); iPySparkInterpreter.cancel(context); @@ -177,10 +190,6 @@ public class IPySparkInterpreterTest { assertEquals("range", completions.get(0).getValue()); // pyspark streaming - - Class klass = py4j.GatewayServer.class; - URL location = klass.getResource('/' + klass.getName().replace('.', '/') + ".class"); - System.out.println("py4j location: " + location); context = getInterpreterContext(); result = iPySparkInterpreter.interpret( "from pyspark.streaming import StreamingContext\n" + @@ -204,7 +213,7 @@ public class IPySparkInterpreterTest { } private InterpreterContext getInterpreterContext() { - return new InterpreterContext( + InterpreterContext context = new InterpreterContext( "noteId", "paragraphId", "replName", @@ -218,5 +227,7 @@ public class IPySparkInterpreterTest { null, null, new InterpreterOutput(null)); + context.setClient(mockRemoteEventClient); + return context; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java index cfcf2a5..3d22af3 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java @@ -29,6 +29,8 @@ import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput; +import org.apache.zeppelin.interpreter.remote.RemoteEventClient; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.After; @@ -42,12 +44,14 @@ import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; public class NewSparkInterpreterTest { @@ -59,6 +63,8 @@ public class NewSparkInterpreterTest { // catch the interpreter output in onUpdate private InterpreterResultMessageOutput messageOutput; + private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class); + @Test public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException { Properties properties = new Properties(); @@ -72,9 +78,12 @@ public class NewSparkInterpreterTest { interpreter.setInterpreterGroup(mock(InterpreterGroup.class)); interpreter.open(); + interpreter.getZeppelinContext().setEventClient(mockRemoteEventClient); InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("a: String = hello world\n", output); + // spark web url is sent + verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class)); result = interpreter.interpret("print(a)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -124,6 +133,8 @@ public class NewSparkInterpreterTest { result = interpreter.interpret("sc.range(1, 10).sum", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(output.contains("45")); + // spark job url is sent + verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), any(String.class), any(Map.class)); // case class result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); @@ -349,7 +360,7 @@ public class NewSparkInterpreterTest { private InterpreterContext getInterpreterContext() { output = ""; - return new InterpreterContext( + InterpreterContext context = new InterpreterContext( "noteId", "paragraphId", "replName", @@ -385,5 +396,7 @@ public class NewSparkInterpreterTest { } }) ); + context.setClient(mockRemoteEventClient); + return context; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index 2d585f5..0bd88d4 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -24,21 +24,27 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteEventClient; import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.Test; import java.io.IOException; import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; public class SparkRInterpreterTest { private SparkRInterpreter sparkRInterpreter; private SparkInterpreter sparkInterpreter; - + private RemoteEventClient mockRemoteEventClient = mock(RemoteEventClient.class); @Test public void testSparkRInterpreter() throws IOException, InterruptedException, InterpreterException { @@ -60,10 +66,13 @@ public class SparkRInterpreterTest { sparkInterpreter.setInterpreterGroup(interpreterGroup); sparkRInterpreter.open(); + sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient); InterpreterResult result = sparkRInterpreter.interpret("1+1", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains("2")); + // spark web url is sent + verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class)); result = sparkRInterpreter.interpret("sparkR.version()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); @@ -72,16 +81,20 @@ public class SparkRInterpreterTest { result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains("eruptions waiting")); + // spark job url is sent + verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class)); } else { // spark 1.x result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains("eruptions waiting")); + // spark job url is sent + verify(mockRemoteEventClient, atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), any(Map.class)); } } private InterpreterContext getInterpreterContext() { - return new InterpreterContext( + InterpreterContext context = new InterpreterContext( "noteId", "paragraphId", "replName", @@ -95,5 +108,7 @@ public class SparkRInterpreterTest { null, null, null); + context.setClient(mockRemoteEventClient); + return context; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 293f9bf..8fa0904 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -226,6 +226,10 @@ public class InterpreterContext { return client; } + public void setClient(RemoteEventClientWrapper client) { + this.client = client; + } + public RemoteWorksController getRemoteWorksController() { return remoteWorksController; }