Repository: zeppelin
Updated Branches:
  refs/heads/master 64bbba479 -> 500b74b19


ZEPPELIN-3242. Listener threw an exception java.lang.NPEat 
o.a.zeppelin.spark.Utils.getNoteId(Utils.java:156)

### What is this PR for?
This issue also cause spark url can not be displayed in frontend. The root 
cause is that PySparkInterpreter/IPySparkInterpreter doesn't set JobGroup 
correctly.

### What type of PR is it?
[Bug Fix]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3242

### How should this be tested?
* CI pass and also manually verified.

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? NO
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2822 from zjffdu/ZEPPELIN-3242 and squashes the following commits:

8254162 [Jeff Zhang] ZEPPELIN-3242. Listener threw an exception java.lang.NPEat 
o.a.zeppelin.spark.Utils.getNoteId(Utils.java:156)


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/500b74b1
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/500b74b1
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/500b74b1

Branch: refs/heads/master
Commit: 500b74b196b740c810553c43216a56e23ab9caf0
Parents: 64bbba4
Author: Jeff Zhang <zjf...@apache.org>
Authored: Tue Feb 27 20:53:54 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Wed Feb 28 10:27:08 2018 +0800

----------------------------------------------------------------------
 .../zeppelin/spark/IPySparkInterpreter.java     | 11 +++++++
 .../zeppelin/spark/PySparkInterpreter.java      | 13 +++++---
 .../zeppelin/spark/IPySparkInterpreterTest.java | 33 +++++++++++++-------
 .../zeppelin/spark/NewSparkInterpreterTest.java | 17 ++++++++--
 .../zeppelin/spark/SparkRInterpreterTest.java   | 19 +++++++++--
 .../interpreter/InterpreterContext.java         |  4 +++
 6 files changed, 77 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
index a75fda8..3691156 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/IPySparkInterpreter.java
@@ -23,6 +23,7 @@ import org.apache.zeppelin.interpreter.BaseZeppelinContext;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.WrappedInterpreter;
 import org.apache.zeppelin.python.IPythonInterpreter;
@@ -99,6 +100,16 @@ public class IPySparkInterpreter extends IPythonInterpreter 
{
   }
 
   @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    InterpreterContext.set(context);
+    sparkInterpreter.populateSparkWebUrl(context);
+    String jobGroupId = Utils.buildJobGroupId(context);
+    String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
+    String setJobGroupStmt = "sc.setJobGroup('" +  jobGroupId + "', '" + 
jobDesc + "')";
+    return super.interpret(setJobGroupStmt +"\n" + st, context);
+  }
+
+  @Override
   public void cancel(InterpreterContext context) throws InterpreterException {
     super.cancel(context);
     sparkInterpreter.cancel(context);

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 0703ad7..f5e4793 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -406,16 +406,16 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context)
       throws InterpreterException {
+    if (iPySparkInterpreter != null) {
+      return iPySparkInterpreter.interpret(st, context);
+    }
+
     SparkInterpreter sparkInterpreter = getSparkInterpreter();
-    sparkInterpreter.populateSparkWebUrl(context);
     if (sparkInterpreter.isUnsupportedSparkVersion()) {
       return new InterpreterResult(Code.ERROR, "Spark "
           + sparkInterpreter.getSparkVersion().toString() + " is not 
supported");
     }
-
-    if (iPySparkInterpreter != null) {
-      return iPySparkInterpreter.interpret(st, context);
-    }
+    sparkInterpreter.populateSparkWebUrl(context);
 
     if (!pythonscriptRunning) {
       return new InterpreterResult(Code.ERROR, "python process not running"
@@ -467,10 +467,13 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
     }
     String jobGroup = Utils.buildJobGroupId(context);
     String jobDesc = "Started by: " + 
Utils.getUserName(context.getAuthenticationInfo());
+
     SparkZeppelinContext __zeppelin__ = sparkInterpreter.getZeppelinContext();
     __zeppelin__.setInterpreterContext(context);
     __zeppelin__.setGui(context.getGui());
     __zeppelin__.setNoteGui(context.getNoteGui());
+    InterpreterContext.set(context);
+
     pythonInterpretRequest = new PythonInterpretRequest(st, jobGroup, jobDesc);
     statementOutput = null;
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
index 5eaa42c..46a3a72 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java
@@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.python.IPythonInterpreterTest;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -39,15 +40,21 @@ import java.net.URL;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class IPySparkInterpreterTest {
 
   private IPySparkInterpreter iPySparkInterpreter;
   private InterpreterGroup intpGroup;
+  private RemoteEventClient mockRemoteEventClient = 
mock(RemoteEventClient.class);
 
   @Before
   public void setup() throws InterpreterException {
@@ -69,11 +76,13 @@ public class IPySparkInterpreterTest {
     intpGroup.get("session_1").add(sparkInterpreter);
     sparkInterpreter.setInterpreterGroup(intpGroup);
     sparkInterpreter.open();
+    
sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
 
     iPySparkInterpreter = new IPySparkInterpreter(p);
     intpGroup.get("session_1").add(iPySparkInterpreter);
     iPySparkInterpreter.setInterpreterGroup(intpGroup);
     iPySparkInterpreter.open();
+    
sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
   }
 
 
@@ -91,17 +100,21 @@ public class IPySparkInterpreterTest {
 
     // rdd
     InterpreterContext context = getInterpreterContext();
-    InterpreterResult result = 
iPySparkInterpreter.interpret("sc.range(1,10).sum()", context);
+    InterpreterResult result = iPySparkInterpreter.interpret("sc.version", 
context);
     Thread.sleep(100);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    List<InterpreterResultMessage> interpreterResultMessages = 
context.out.getInterpreterResultMessages();
-    assertEquals("45", interpreterResultMessages.get(0).getData());
+    // spark url is sent
+    verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
 
     context = getInterpreterContext();
-    result = iPySparkInterpreter.interpret("sc.version", context);
+    result = iPySparkInterpreter.interpret("sc.range(1,10).sum()", context);
     Thread.sleep(100);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    interpreterResultMessages = context.out.getInterpreterResultMessages();
+    List<InterpreterResultMessage> interpreterResultMessages = 
context.out.getInterpreterResultMessages();
+    assertEquals("45", interpreterResultMessages.get(0).getData());
+    // spark job url is sent
+    verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), 
any(String.class), any(Map.class));
+
     // spark sql
     context = getInterpreterContext();
     if (interpreterResultMessages.get(0).getData().startsWith("'1.") ||
@@ -146,7 +159,6 @@ public class IPySparkInterpreterTest {
           "1   a\n" +
           "2   b\n", interpreterResultMessages.get(0).getData());
     }
-
     // cancel
     final InterpreterContext context2 = getInterpreterContext();
 
@@ -166,6 +178,7 @@ public class IPySparkInterpreterTest {
     };
     thread.start();
 
+
     // sleep 1 second to wait for the spark job starts
     Thread.sleep(1000);
     iPySparkInterpreter.cancel(context);
@@ -177,10 +190,6 @@ public class IPySparkInterpreterTest {
     assertEquals("range", completions.get(0).getValue());
 
     // pyspark streaming
-
-    Class klass = py4j.GatewayServer.class;
-    URL location = klass.getResource('/' + klass.getName().replace('.', '/') + 
".class");
-    System.out.println("py4j location: " + location);
     context = getInterpreterContext();
     result = iPySparkInterpreter.interpret(
         "from pyspark.streaming import StreamingContext\n" +
@@ -204,7 +213,7 @@ public class IPySparkInterpreterTest {
   }
 
   private InterpreterContext getInterpreterContext() {
-    return new InterpreterContext(
+    InterpreterContext context = new InterpreterContext(
         "noteId",
         "paragraphId",
         "replName",
@@ -218,5 +227,7 @@ public class IPySparkInterpreterTest {
         null,
         null,
         new InterpreterOutput(null));
+    context.setClient(mockRemoteEventClient);
+    return context;
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index cfcf2a5..3d22af3 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -29,6 +29,8 @@ import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.After;
@@ -42,12 +44,14 @@ import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 
 public class NewSparkInterpreterTest {
@@ -59,6 +63,8 @@ public class NewSparkInterpreterTest {
   // catch the interpreter output in onUpdate
   private InterpreterResultMessageOutput messageOutput;
 
+  private RemoteEventClient mockRemoteEventClient = 
mock(RemoteEventClient.class);
+
   @Test
   public void testSparkInterpreter() throws IOException, InterruptedException, 
InterpreterException {
     Properties properties = new Properties();
@@ -72,9 +78,12 @@ public class NewSparkInterpreterTest {
     interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
     interpreter.open();
 
+    interpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
     InterpreterResult result = interpreter.interpret("val a=\"hello world\"", 
getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertEquals("a: String = hello world\n", output);
+    // spark web url is sent
+    verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
 
     result = interpreter.interpret("print(a)", getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -124,6 +133,8 @@ public class NewSparkInterpreterTest {
     result = interpreter.interpret("sc.range(1, 10).sum", 
getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertTrue(output.contains("45"));
+    // spark job url is sent
+    verify(mockRemoteEventClient).onParaInfosReceived(any(String.class), 
any(String.class), any(Map.class));
 
     // case class
     result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", 
getInterpreterContext());
@@ -349,7 +360,7 @@ public class NewSparkInterpreterTest {
 
   private InterpreterContext getInterpreterContext() {
     output = "";
-    return new InterpreterContext(
+    InterpreterContext context = new InterpreterContext(
         "noteId",
         "paragraphId",
         "replName",
@@ -385,5 +396,7 @@ public class NewSparkInterpreterTest {
               }
             })
     );
+    context.setClient(mockRemoteEventClient);
+    return context;
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
index 2d585f5..0bd88d4 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java
@@ -24,21 +24,27 @@ import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClient;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class SparkRInterpreterTest {
 
   private SparkRInterpreter sparkRInterpreter;
   private SparkInterpreter sparkInterpreter;
-
+  private RemoteEventClient mockRemoteEventClient = 
mock(RemoteEventClient.class);
 
   @Test
   public void testSparkRInterpreter() throws IOException, 
InterruptedException, InterpreterException {
@@ -60,10 +66,13 @@ public class SparkRInterpreterTest {
     sparkInterpreter.setInterpreterGroup(interpreterGroup);
 
     sparkRInterpreter.open();
+    
sparkInterpreter.getZeppelinContext().setEventClient(mockRemoteEventClient);
 
     InterpreterResult result = sparkRInterpreter.interpret("1+1", 
getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertTrue(result.message().get(0).getData().contains("2"));
+    // spark web url is sent
+    verify(mockRemoteEventClient).onMetaInfosReceived(any(Map.class));
 
     result = sparkRInterpreter.interpret("sparkR.version()", 
getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -72,16 +81,20 @@ public class SparkRInterpreterTest {
       result = sparkRInterpreter.interpret("df <- 
as.DataFrame(faithful)\nhead(df)", getInterpreterContext());
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertTrue(result.message().get(0).getData().contains("eruptions 
waiting"));
+      // spark job url is sent
+      verify(mockRemoteEventClient, 
atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), 
any(Map.class));
     } else {
       // spark 1.x
       result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, 
faithful)\nhead(df)", getInterpreterContext());
       assertEquals(InterpreterResult.Code.SUCCESS, result.code());
       assertTrue(result.message().get(0).getData().contains("eruptions 
waiting"));
+      // spark job url is sent
+      verify(mockRemoteEventClient, 
atLeastOnce()).onParaInfosReceived(any(String.class), any(String.class), 
any(Map.class));
     }
   }
 
   private InterpreterContext getInterpreterContext() {
-    return new InterpreterContext(
+    InterpreterContext context = new InterpreterContext(
         "noteId",
         "paragraphId",
         "replName",
@@ -95,5 +108,7 @@ public class SparkRInterpreterTest {
         null,
         null,
         null);
+    context.setClient(mockRemoteEventClient);
+    return context;
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/500b74b1/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 293f9bf..8fa0904 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -226,6 +226,10 @@ public class InterpreterContext {
     return client;
   }
 
+  public void setClient(RemoteEventClientWrapper client) {
+    this.client = client;
+  }
+
   public RemoteWorksController getRemoteWorksController() {
     return remoteWorksController;
   }

Reply via email to