Repository: zeppelin
Updated Branches:
  refs/heads/branch-0.8 e91837cd5 -> 45d07668f


ZEPPELIN-3339. Add more test for ZeppelinContext

### What is this PR for?

Add more test for ZeppelinContext, especially for run api and ResourcePool

### What type of PR is it?
[Improvement  | Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3339

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2874 from zjffdu/ZEPPELIN-3339 and squashes the following commits:

83de188 [Jeff Zhang] ZEPPELIN-3339. Add more test for ZeppelinContext

(cherry picked from commit c6f7f2bdb3a41bb04508c800ec6ed4c411680b26)
Signed-off-by: Jeff Zhang <zjf...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/45d07668
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/45d07668
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/45d07668

Branch: refs/heads/branch-0.8
Commit: 45d07668f95886f35d628db8caba9a48484d1eed
Parents: e91837c
Author: Jeff Zhang <zjf...@apache.org>
Authored: Thu Mar 15 16:04:57 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Fri Mar 16 14:46:49 2018 +0800

----------------------------------------------------------------------
 .../interpreter/BaseZeppelinContext.java        |   4 +
 .../zeppelin/rest/ZeppelinSparkClusterTest.java | 360 ++++++-------------
 2 files changed, 115 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/45d07668/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
index e38a29f..3433c32 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java
@@ -28,6 +28,8 @@ import 
org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
 import org.apache.zeppelin.resource.Resource;
 import org.apache.zeppelin.resource.ResourcePool;
 import org.apache.zeppelin.resource.ResourceSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -40,6 +42,7 @@ import java.util.Map;
  */
 public abstract class BaseZeppelinContext {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseZeppelinContext.class);
 
   protected InterpreterContext interpreterContext;
   protected int maxResult;
@@ -336,6 +339,7 @@ public abstract class BaseZeppelinContext {
       if (r.getNoteId().equals(runningNoteId) && 
r.getParagraphId().equals(runningParagraphId)) {
         continue;
       }
+      LOGGER.debug("Run Paragraph: " + r.getParagraphId() + " of Note: " + 
r.getNoteId());
       r.run();
     }
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/45d07668/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index bda555a..14eb0d9 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -17,8 +17,7 @@
 package org.apache.zeppelin.rest;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterProperty;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -37,19 +36,14 @@ import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URL;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -108,6 +102,7 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(),
 "helium");
     
AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName());
   }
 
@@ -132,174 +127,112 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
   public void scalaOutputTest() throws IOException {
     // create new note
     Note note = ZeppelinServer.notebook.createNote(anonymous);
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
+    Paragraph p = note.addNewParagraph(anonymous);
     p.setText("%spark import java.util.Date\n" +
         "import java.net.URL\n" +
         "println(\"hello\")\n"
     );
-    p.setAuthenticationInfo(anonymous);
-    note.run(p.getId());
-    waitForFinish(p);
+    note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
     assertEquals("import java.util.Date\n" +
         "import java.net.URL\n" +
         "hello\n", p.getResult().message().get(0).getData());
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+
+    p.setText("%spark invalid_code");
+    note.run(p.getId(), true);
+    assertEquals(Status.ERROR, p.getStatus());
+    assertTrue(p.getResult().message().get(0).getData().contains("error: "));
   }
 
 
   @Test
   public void basicRDDTransformationAndActionTest() throws IOException {
-    // create new note
     Note note = ZeppelinServer.notebook.createNote(anonymous);
-
-    // run markdown paragraph, again
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
+    Paragraph p = note.addNewParagraph(anonymous);
     p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))");
-    p.setAuthenticationInfo(anonymous);
-    note.run(p.getId());
-    waitForFinish(p);
+    note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
     assertEquals("55", p.getResult().message().get(0).getData());
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
   }
 
   @Test
   public void sparkSQLTest() throws IOException {
-    // create new note
     Note note = ZeppelinServer.notebook.createNote(anonymous);
     // test basic dataframe api
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
+    Paragraph p = note.addNewParagraph(anonymous);
     p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
         "df.collect()");
-    p.setAuthenticationInfo(anonymous);
-    note.run(p.getId());
-    waitForFinish(p);
+    note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
     assertTrue(p.getResult().message().get(0).getData().contains(
         "Array[org.apache.spark.sql.Row] = Array([hello,20])"));
 
     // test display DataFrame
-    p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
+    p = note.addNewParagraph(anonymous);
     p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
         "z.show(df)");
-    p.setAuthenticationInfo(anonymous);
-    note.run(p.getId());
-    waitForFinish(p);
+    note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
     assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(1).getType());
     assertEquals("_1\t_2\nhello\t20\n", 
p.getResult().message().get(1).getData());
 
     // test display DataSet
     if (isSpark2()) {
-      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-      config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
+      p = note.addNewParagraph(anonymous);
       p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" +
           "z.show(ds)");
-      p.setAuthenticationInfo(anonymous);
-      note.run(p.getId());
-      waitForFinish(p);
+      note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(1).getType());
       assertEquals("_1\t_2\nhello\t20\n", 
p.getResult().message().get(1).getData());
     }
-
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
   }
 
   @Test
-  public void sparkRTest() throws IOException, InterpreterException {
-    // create new note
+  public void sparkRTest() throws IOException {
     Note note = ZeppelinServer.notebook.createNote(anonymous);
-    // restart spark interpreter
-    List<InterpreterSetting> settings =
-        ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId());
-
-    for (InterpreterSetting setting : settings) {
-      if (setting.getName().equals("spark")) {
-        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
-        break;
-      }
-    }
 
     String sqlContextName = "sqlContext";
     if (isSpark2()) {
       sqlContextName = "spark";
     }
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
-    p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 
23, 18))\n" +
+    Paragraph p = note.addNewParagraph(anonymous);
+    p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), 
age=c(19, 23, 18))\n" +
         "df <- createDataFrame(" + sqlContextName + ", localDF)\n" +
         "count(df)"
     );
-    p.setAuthenticationInfo(anonymous);
-    note.run(p.getId());
-    waitForFinish(p);
-    System.err.println("sparkRTest=" + 
p.getResult().message().get(0).getData());
+    note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
     assertEquals("[1] 3", p.getResult().message().get(0).getData().trim());
-
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
   }
 
   @Test
   public void pySparkTest() throws IOException {
     // create new note
     Note note = ZeppelinServer.notebook.createNote(anonymous);
-    note.setName("note");
 
     // run markdown paragraph, again
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
-    p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: 
a + b))");
-    p.setAuthenticationInfo(anonymous);
-    note.run(p.getId());
-    waitForFinish(p);
+    Paragraph p = note.addNewParagraph(anonymous);
+    p.setText("%spark.pyspark sc.parallelize(range(1, 11)).reduce(lambda a, b: 
a + b)");
+    note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
     assertEquals("55\n", p.getResult().message().get(0).getData());
     if (!isSpark2()) {
       // run sqlContext test
-      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-      config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
+      p = note.addNewParagraph(anonymous);
       p.setText("%pyspark from pyspark.sql import Row\n" +
           "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
           "df.collect()");
-      p.setAuthenticationInfo(anonymous);
-      note.run(p.getId());
-      waitForFinish(p);
+      note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals("[Row(age=20, id=1)]\n", 
p.getResult().message().get(0).getData());
 
       // test display Dataframe
-      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-      config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
+      p = note.addNewParagraph(anonymous);
       p.setText("%pyspark from pyspark.sql import Row\n" +
           "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
           "z.show(df)");
-      p.setAuthenticationInfo(anonymous);
-      note.run(p.getId());
+      note.run(p.getId(), true);
       waitForFinish(p);
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(0).getType());
@@ -307,24 +240,16 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
       assertEquals("age\tid\n20\t1\n", 
p.getResult().message().get(0).getData());
 
       // test udf
-      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-      config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
+      p = note.addNewParagraph(anonymous);
       p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" 
+
           "sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()");
-      p.setAuthenticationInfo(anonymous);
-      note.run(p.getId());
-      waitForFinish(p);
+      note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       
assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) 
||
           "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData()));
 
       // test exception
-      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-      config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
+      p = note.addNewParagraph(anonymous);
       /**
        %pyspark
        a=1
@@ -332,9 +257,7 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
        print(a2)
        */
       p.setText("%pyspark a=1\n\nprint(a2)");
-      p.setAuthenticationInfo(anonymous);
-      note.run(p.getId());
-      waitForFinish(p);
+      note.run(p.getId(), true);
       assertEquals(Status.ERROR, p.getStatus());
       assertTrue(p.getResult().message().get(0).getData()
           .contains("Fail to execute line 3: print(a2)"));
@@ -342,107 +265,52 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
           .contains("name 'a2' is not defined"));
     } else {
       // run SparkSession test
-      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-      config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
+      p = note.addNewParagraph(anonymous);
       p.setText("%pyspark from pyspark.sql import Row\n" +
           "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
           "df.collect()");
-      p.setAuthenticationInfo(anonymous);
-      note.run(p.getId());
-      waitForFinish(p);
+      note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       assertEquals("[Row(age=20, id=1)]\n", 
p.getResult().message().get(0).getData());
 
       // test udf
-      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-      config = p.getConfig();
-      config.put("enabled", true);
-      p.setConfig(config);
+      p = note.addNewParagraph(anonymous);
       // use SQLContext to register UDF but use this UDF through SparkSession
       p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" 
+
           "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()");
-      p.setAuthenticationInfo(anonymous);
-      note.run(p.getId());
-      waitForFinish(p);
+      note.run(p.getId(), true);
       assertEquals(Status.FINISHED, p.getStatus());
       
assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) 
||
           "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData()));
     }
-
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
-  }
-
-  @Test
-  public void pySparkAutoConvertOptionTest() throws IOException {
-    // create new note
-    Note note = ZeppelinServer.notebook.createNote(anonymous);
-    note.setName("note");
-
-    // run markdown paragraph, again
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
-
-    String sqlContextName = "sqlContext";
-    if (isSpark2()) {
-      sqlContextName = "spark";
-    }
-
-    p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
-        + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', 
rand(seed=10) * 3.14).count())");
-    p.setAuthenticationInfo(anonymous);
-    note.run(p.getId());
-    waitForFinish(p);
-    assertEquals(Status.FINISHED, p.getStatus());
-    assertEquals("10\n", p.getResult().message().get(0).getData());
-
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
   }
 
   @Test
   public void zRunTest() throws IOException {
     // create new note
     Note note = ZeppelinServer.notebook.createNote(anonymous);
-    Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config0 = p0.getConfig();
-    config0.put("enabled", true);
-    p0.setConfig(config0);
+    Paragraph p0 = note.addNewParagraph(anonymous);
+    // z.run(paragraphIndex)
     p0.setText("%spark z.run(1)");
-    p0.setAuthenticationInfo(anonymous);
-    Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config1 = p1.getConfig();
-    config1.put("enabled", true);
-    p1.setConfig(config1);
+    Paragraph p1 = note.addNewParagraph(anonymous);
     p1.setText("%spark val a=10");
-    p1.setAuthenticationInfo(anonymous);
-    Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config2 = p2.getConfig();
-    config2.put("enabled", true);
-    p2.setConfig(config2);
+    Paragraph p2 = note.addNewParagraph(anonymous);
     p2.setText("%spark print(a)");
-    p2.setAuthenticationInfo(anonymous);
 
-    note.run(p0.getId());
-    waitForFinish(p0);
+    note.run(p0.getId(), true);
     assertEquals(Status.FINISHED, p0.getStatus());
 
     // z.run is not blocking call. So p1 may not be finished when p0 is done.
     waitForFinish(p1);
-    note.run(p2.getId());
-    waitForFinish(p2);
+    assertEquals(Status.FINISHED, p1.getStatus());
+    note.run(p2.getId(), true);
     assertEquals(Status.FINISHED, p2.getStatus());
     assertEquals("10", p2.getResult().message().get(0).getData());
 
-    Paragraph p3 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config3 = p3.getConfig();
-    config3.put("enabled", true);
-    p3.setConfig(config3);
+    Paragraph p3 = note.addNewParagraph(anonymous);
     p3.setText("%spark println(new java.util.Date())");
-    p3.setAuthenticationInfo(anonymous);
 
+    // run current Node, z.runNote(noteId)
     p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId()));
     note.run(p0.getId());
     waitForFinish(p0);
@@ -452,46 +320,75 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
 
     assertEquals(Status.FINISHED, p3.getStatus());
     String p3result = p3.getResult().message().get(0).getData();
-    assertNotEquals(null, p3result);
-    assertNotEquals("", p3result);
+    assertTrue(p3result.length() > 0);
 
+    // z.run(noteId, paragraphId)
     p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note.getId(), 
p3.getId()));
-    p3.setText("%%spark println(\"END\")");
+    p3.setText("%spark println(\"END\")");
 
-    note.run(p0.getId());
-    waitForFinish(p0);
+    note.run(p0.getId(), true);
     waitForFinish(p3);
+    assertEquals(Status.FINISHED, p3.getStatus());
+    assertEquals("END\n", p3.getResult().message().get(0).getData());
+
+    // run paragraph in note2 via paragraph in note1
+    Note note2 = ZeppelinServer.notebook.createNote(anonymous);
+    Paragraph p20 = note2.addNewParagraph(anonymous);
+    p20.setText("%spark val a = 1");
+    Paragraph p21 = note2.addNewParagraph(anonymous);
+    p21.setText("%spark print(a)");
+
+    // run p20 of note2 via paragraph in note1
+    p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note2.getId(), 
p20.getId()));
+    note.run(p0.getId(), true);
+    waitForFinish(p20);
+    assertEquals(Status.FINISHED, p20.getStatus());
+    assertEquals(Status.READY, p21.getStatus());
+
+    p0.setText(String.format("%%spark z.runNote(\"%s\")", note2.getId()));
+    note.run(p0.getId(), true);
+    waitForFinish(p20);
+    waitForFinish(p21);
+    assertEquals(Status.FINISHED, p20.getStatus());
+    assertEquals(Status.FINISHED, p21.getStatus());
+    assertEquals("1", p21.getResult().message().get(0).getData());
+  }
+
+  @Test
+  public void testZeppelinContextResource() throws IOException {
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
 
-    assertNotEquals(p3result, p3.getResult().message());
+    Paragraph p1 = note.addNewParagraph(anonymous);
+    p1.setText("%spark z.put(\"var_1\", \"hello world\")");
 
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+    Paragraph p2 = note.addNewParagraph(anonymous);
+    p2.setText("%spark println(z.get(\"var_1\"))");
+
+    Paragraph p3 = note.addNewParagraph(anonymous);
+    p3.setText("%spark.pyspark print(z.get(\"var_1\"))");
+
+    note.run(p1.getId(), true);
+    note.run(p2.getId(), true);
+    note.run(p3.getId(), true);
+
+    assertEquals(Status.FINISHED, p1.getStatus());
+    assertEquals(Status.FINISHED, p2.getStatus());
+    assertEquals("hello world\n", p2.getResult().message().get(0).getData());
+    assertEquals(Status.FINISHED, p3.getStatus());
+    assertEquals("hello world\n", p3.getResult().message().get(0).getData());
   }
 
   @Test
   public void pySparkDepLoaderTest() throws IOException, InterpreterException {
-    // create new note
     Note note = ZeppelinServer.notebook.createNote(anonymous);
 
-    // restart spark interpreter
-    List<InterpreterSetting> settings =
-        ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId());
-
-    for (InterpreterSetting setting : settings) {
-      if (setting.getName().equals("spark")) {
-        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
-        break;
-      }
-    }
+    // restart spark interpreter to make dep loader work
+    ZeppelinServer.notebook.getInterpreterSettingManager().close();
 
     // load dep
-    Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = p0.getConfig();
-    config.put("enabled", true);
-    p0.setConfig(config);
+    Paragraph p0 = note.addNewParagraph(anonymous);
     p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")");
-    p0.setAuthenticationInfo(anonymous);
-    note.run(p0.getId());
-    waitForFinish(p0);
+    note.run(p0.getId(), true);
     assertEquals(Status.FINISHED, p0.getStatus());
 
     // write test csv file
@@ -499,8 +396,7 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
     FileUtils.write(tmpFile, "a,b\n1,2");
 
     // load data using libraries from dep loader
-    Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    p1.setConfig(config);
+    Paragraph p1 = note.addNewParagraph(anonymous);
 
     String sqlContextName = "sqlContext";
     if (isSpark2()) {
@@ -509,26 +405,18 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
     p1.setText("%pyspark\n" +
         "from pyspark.sql import SQLContext\n" +
         "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" 
+
-        ".load('" + tmpFile.getAbsolutePath() + "').count())");
-    p1.setAuthenticationInfo(anonymous);
-    note.run(p1.getId());
+        ".load('file://" + tmpFile.getAbsolutePath() + "').count())");
+    note.run(p1.getId(), true);
 
-    waitForFinish(p1);
     assertEquals(Status.FINISHED, p1.getStatus());
     assertEquals("2\n", p1.getResult().message().get(0).getData());
-
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
   }
 
   private void verifySparkVersionNumber() throws IOException {
     Note note = ZeppelinServer.notebook.createNote(anonymous);
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    note.setName("note");
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
+    Paragraph p = note.addNewParagraph(anonymous);
+
     p.setText("%spark print(sc.version)");
-    p.setAuthenticationInfo(anonymous);
     note.run(p.getId());
     waitForFinish(p);
     assertEquals(Status.FINISHED, p.getStatus());
@@ -548,11 +436,7 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
   @Test
   public void testSparkZeppelinContextDynamicForms() throws IOException {
     Note note = ZeppelinServer.notebook.createNote(anonymous);
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    note.setName("note");
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
+    Paragraph p = note.addNewParagraph(anonymous);
     String code = "%spark.spark println(z.textbox(\"my_input\", 
\"default_name\"))\n" +
         "println(z.select(\"my_select\", \"1\"," +
         "Seq((\"1\", \"select_1\"), (\"2\", \"select_2\"))))\n" +
@@ -560,7 +444,6 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
         "Seq((\"1\", \"check_1\"), (\"2\", \"check_2\")))\n" +
         "println(items(0))";
     p.setText(code);
-    p.setAuthenticationInfo(anonymous);
     note.run(p.getId());
     waitForFinish(p);
 
@@ -577,18 +460,12 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
     assertEquals("1", result[1]);
     assertEquals("items: Seq[Object] = Buffer(2)", result[2]);
     assertEquals("2", result[3]);
-
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
   }
 
   @Test
   public void testPySparkZeppelinContextDynamicForms() throws IOException {
     Note note = ZeppelinServer.notebook.createNote(anonymous);
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    note.setName("note");
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
+    Paragraph p = note.addNewParagraph(anonymous);
     String code = "%spark.pyspark print(z.input('my_input', 
'default_name'))\n" +
         "print(z.select('my_select', " +
         "[('1', 'select_1'), ('2', 'select_2')], defaultValue='1'))\n" +
@@ -596,7 +473,6 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
         "[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" +
         "print(items[0])";
     p.setText(code);
-    p.setAuthenticationInfo(anonymous);
     note.run(p.getId());
     waitForFinish(p);
 
@@ -612,34 +488,20 @@ public class ZeppelinSparkClusterTest extends 
AbstractTestRestApi {
     assertEquals("default_name", result[0]);
     assertEquals("1", result[1]);
     assertEquals("2", result[2]);
-
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
   }
 
   @Test
   public void testConfInterpreter() throws IOException {
     ZeppelinServer.notebook.getInterpreterSettingManager().close();
-    Note note = 
ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
-    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    Map config = p.getConfig();
-    config.put("enabled", true);
-    p.setConfig(config);
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    Paragraph p = note.addNewParagraph(anonymous);
     p.setText("%spark.conf 
spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0");
-    p.setAuthenticationInfo(anonymous);
-    note.run(p.getId());
-    waitForFinish(p);
+    note.run(p.getId(), true);
     assertEquals(Status.FINISHED, p.getStatus());
 
-    Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-    p1.setConfig(config);
+    Paragraph p1 = note.addNewParagraph(anonymous);
     p1.setText("%spark\nimport com.databricks.spark.csv._");
-    p1.setAuthenticationInfo(anonymous);
-    note.run(p1.getId());
-
-    waitForFinish(p1);
+    note.run(p1.getId(), true);
     assertEquals(Status.FINISHED, p1.getStatus());
-
-    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
-
   }
 }

Reply via email to