Repository: zeppelin Updated Branches: refs/heads/branch-0.8 e91837cd5 -> 45d07668f
ZEPPELIN-3339. Add more test for ZeppelinContext ### What is this PR for? Add more test for ZeppelinContext, especially for run api and ResourcePool ### What type of PR is it? [Improvement | Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3339 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2874 from zjffdu/ZEPPELIN-3339 and squashes the following commits: 83de188 [Jeff Zhang] ZEPPELIN-3339. Add more test for ZeppelinContext (cherry picked from commit c6f7f2bdb3a41bb04508c800ec6ed4c411680b26) Signed-off-by: Jeff Zhang <zjf...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/45d07668 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/45d07668 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/45d07668 Branch: refs/heads/branch-0.8 Commit: 45d07668f95886f35d628db8caba9a48484d1eed Parents: e91837c Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Mar 15 16:04:57 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Fri Mar 16 14:46:49 2018 +0800 ---------------------------------------------------------------------- .../interpreter/BaseZeppelinContext.java | 4 + .../zeppelin/rest/ZeppelinSparkClusterTest.java | 360 ++++++------------- 2 files changed, 115 insertions(+), 249 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/45d07668/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java index e38a29f..3433c32 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java @@ -28,6 +28,8 @@ import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.ResourceSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -40,6 +42,7 @@ import java.util.Map; */ public abstract class BaseZeppelinContext { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseZeppelinContext.class); protected InterpreterContext interpreterContext; protected int maxResult; @@ -336,6 +339,7 @@ public abstract class BaseZeppelinContext { if (r.getNoteId().equals(runningNoteId) && r.getParagraphId().equals(runningParagraphId)) { continue; } + LOGGER.debug("Run Paragraph: " + r.getParagraphId() + " of Note: " + r.getNoteId()); r.run(); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/45d07668/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index bda555a..14eb0d9 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -17,8 +17,7 @@ package org.apache.zeppelin.rest; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterProperty; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -37,19 +36,14 @@ import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URL; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; /** @@ -108,6 +102,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { @BeforeClass public static void setUp() throws Exception { + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HELIUM_REGISTRY.getVarName(), "helium"); AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName()); } @@ -132,174 +127,112 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { public void scalaOutputTest() throws IOException { // create new note Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + Paragraph p = note.addNewParagraph(anonymous); p.setText("%spark import java.util.Date\n" + "import java.net.URL\n" + "println(\"hello\")\n" ); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("import java.util.Date\n" + "import java.net.URL\n" + "hello\n", p.getResult().message().get(0).getData()); - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + + p.setText("%spark invalid_code"); + note.run(p.getId(), true); + assertEquals(Status.ERROR, p.getStatus()); + assertTrue(p.getResult().message().get(0).getData().contains("error: ")); } @Test public void basicRDDTransformationAndActionTest() throws IOException { - // create new note Note note = ZeppelinServer.notebook.createNote(anonymous); - - // run markdown paragraph, again - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + Paragraph p = note.addNewParagraph(anonymous); p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("55", p.getResult().message().get(0).getData()); - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @Test public void sparkSQLTest() throws IOException { - // create new note Note note = ZeppelinServer.notebook.createNote(anonymous); // test basic dataframe api - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + Paragraph p = note.addNewParagraph(anonymous); p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" + "df.collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertTrue(p.getResult().message().get(0).getData().contains( "Array[org.apache.spark.sql.Row] = Array([hello,20])")); // test display DataFrame - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + p = note.addNewParagraph(anonymous); p.setText("%spark val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" + "z.show(df)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); // test display DataSet if (isSpark2()) { - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + p = note.addNewParagraph(anonymous); p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" + "z.show(ds)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(1).getType()); assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(1).getData()); } - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @Test - public void sparkRTest() throws IOException, InterpreterException { - // create new note + public void sparkRTest() throws IOException { Note note = ZeppelinServer.notebook.createNote(anonymous); - // restart spark interpreter - List<InterpreterSetting> settings = - ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId()); - - for (InterpreterSetting setting : settings) { - if (setting.getName().equals("spark")) { - ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); - break; - } - } String sqlContextName = "sqlContext"; if (isSpark2()) { sqlContextName = "spark"; } - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + + Paragraph p = note.addNewParagraph(anonymous); + p.setText("%spark.r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + "df <- createDataFrame(" + sqlContextName + ", localDF)\n" + "count(df)" ); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - System.err.println("sparkRTest=" + p.getResult().message().get(0).getData()); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("[1] 3", p.getResult().message().get(0).getData().trim()); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @Test public void pySparkTest() throws IOException { // create new note Note note = ZeppelinServer.notebook.createNote(anonymous); - note.setName("note"); // run markdown paragraph, again - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b))"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + Paragraph p = note.addNewParagraph(anonymous); + p.setText("%spark.pyspark sc.parallelize(range(1, 11)).reduce(lambda a, b: a + b)"); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("55\n", p.getResult().message().get(0).getData()); if (!isSpark2()) { // run sqlContext test - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + p = note.addNewParagraph(anonymous); p.setText("%pyspark from pyspark.sql import Row\n" + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + "df.collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("[Row(age=20, id=1)]\n", p.getResult().message().get(0).getData()); // test display Dataframe - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + p = note.addNewParagraph(anonymous); p.setText("%pyspark from pyspark.sql import Row\n" + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + "z.show(df)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); + note.run(p.getId(), true); waitForFinish(p); assertEquals(Status.FINISHED, p.getStatus()); assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(0).getType()); @@ -307,24 +240,16 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("age\tid\n20\t1\n", p.getResult().message().get(0).getData()); // test udf - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + p = note.addNewParagraph(anonymous); p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + "sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) || "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData())); // test exception - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + p = note.addNewParagraph(anonymous); /** %pyspark a=1 @@ -332,9 +257,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { print(a2) */ p.setText("%pyspark a=1\n\nprint(a2)"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.ERROR, p.getStatus()); assertTrue(p.getResult().message().get(0).getData() .contains("Fail to execute line 3: print(a2)")); @@ -342,107 +265,52 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { .contains("name 'a2' is not defined")); } else { // run SparkSession test - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + p = note.addNewParagraph(anonymous); p.setText("%pyspark from pyspark.sql import Row\n" + "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" + "df.collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertEquals("[Row(age=20, id=1)]\n", p.getResult().message().get(0).getData()); // test udf - p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + p = note.addNewParagraph(anonymous); // use SQLContext to register UDF but use this UDF through SparkSession p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" + "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) || "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData())); } - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); - } - - @Test - public void pySparkAutoConvertOptionTest() throws IOException { - // create new note - Note note = ZeppelinServer.notebook.createNote(anonymous); - note.setName("note"); - - // run markdown paragraph, again - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); - - String sqlContextName = "sqlContext"; - if (isSpark2()) { - sqlContextName = "spark"; - } - - p.setText("%pyspark\nfrom pyspark.sql.functions import *\n" - + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); - assertEquals(Status.FINISHED, p.getStatus()); - assertEquals("10\n", p.getResult().message().get(0).getData()); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @Test public void zRunTest() throws IOException { // create new note Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config0 = p0.getConfig(); - config0.put("enabled", true); - p0.setConfig(config0); + Paragraph p0 = note.addNewParagraph(anonymous); + // z.run(paragraphIndex) p0.setText("%spark z.run(1)"); - p0.setAuthenticationInfo(anonymous); - Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config1 = p1.getConfig(); - config1.put("enabled", true); - p1.setConfig(config1); + Paragraph p1 = note.addNewParagraph(anonymous); p1.setText("%spark val a=10"); - p1.setAuthenticationInfo(anonymous); - Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config2 = p2.getConfig(); - config2.put("enabled", true); - p2.setConfig(config2); + Paragraph p2 = note.addNewParagraph(anonymous); p2.setText("%spark print(a)"); - p2.setAuthenticationInfo(anonymous); - note.run(p0.getId()); - waitForFinish(p0); + note.run(p0.getId(), true); assertEquals(Status.FINISHED, p0.getStatus()); // z.run is not blocking call. So p1 may not be finished when p0 is done. waitForFinish(p1); - note.run(p2.getId()); - waitForFinish(p2); + assertEquals(Status.FINISHED, p1.getStatus()); + note.run(p2.getId(), true); assertEquals(Status.FINISHED, p2.getStatus()); assertEquals("10", p2.getResult().message().get(0).getData()); - Paragraph p3 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config3 = p3.getConfig(); - config3.put("enabled", true); - p3.setConfig(config3); + Paragraph p3 = note.addNewParagraph(anonymous); p3.setText("%spark println(new java.util.Date())"); - p3.setAuthenticationInfo(anonymous); + // run current Node, z.runNote(noteId) p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId())); note.run(p0.getId()); waitForFinish(p0); @@ -452,46 +320,75 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals(Status.FINISHED, p3.getStatus()); String p3result = p3.getResult().message().get(0).getData(); - assertNotEquals(null, p3result); - assertNotEquals("", p3result); + assertTrue(p3result.length() > 0); + // z.run(noteId, paragraphId) p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note.getId(), p3.getId())); - p3.setText("%%spark println(\"END\")"); + p3.setText("%spark println(\"END\")"); - note.run(p0.getId()); - waitForFinish(p0); + note.run(p0.getId(), true); waitForFinish(p3); + assertEquals(Status.FINISHED, p3.getStatus()); + assertEquals("END\n", p3.getResult().message().get(0).getData()); + + // run paragraph in note2 via paragraph in note1 + Note note2 = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p20 = note2.addNewParagraph(anonymous); + p20.setText("%spark val a = 1"); + Paragraph p21 = note2.addNewParagraph(anonymous); + p21.setText("%spark print(a)"); + + // run p20 of note2 via paragraph in note1 + p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note2.getId(), p20.getId())); + note.run(p0.getId(), true); + waitForFinish(p20); + assertEquals(Status.FINISHED, p20.getStatus()); + assertEquals(Status.READY, p21.getStatus()); + + p0.setText(String.format("%%spark z.runNote(\"%s\")", note2.getId())); + note.run(p0.getId(), true); + waitForFinish(p20); + waitForFinish(p21); + assertEquals(Status.FINISHED, p20.getStatus()); + assertEquals(Status.FINISHED, p21.getStatus()); + assertEquals("1", p21.getResult().message().get(0).getData()); + } + + @Test + public void testZeppelinContextResource() throws IOException { + Note note = ZeppelinServer.notebook.createNote(anonymous); - assertNotEquals(p3result, p3.getResult().message()); + Paragraph p1 = note.addNewParagraph(anonymous); + p1.setText("%spark z.put(\"var_1\", \"hello world\")"); - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); + Paragraph p2 = note.addNewParagraph(anonymous); + p2.setText("%spark println(z.get(\"var_1\"))"); + + Paragraph p3 = note.addNewParagraph(anonymous); + p3.setText("%spark.pyspark print(z.get(\"var_1\"))"); + + note.run(p1.getId(), true); + note.run(p2.getId(), true); + note.run(p3.getId(), true); + + assertEquals(Status.FINISHED, p1.getStatus()); + assertEquals(Status.FINISHED, p2.getStatus()); + assertEquals("hello world\n", p2.getResult().message().get(0).getData()); + assertEquals(Status.FINISHED, p3.getStatus()); + assertEquals("hello world\n", p3.getResult().message().get(0).getData()); } @Test public void pySparkDepLoaderTest() throws IOException, InterpreterException { - // create new note Note note = ZeppelinServer.notebook.createNote(anonymous); - // restart spark interpreter - List<InterpreterSetting> settings = - ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId()); - - for (InterpreterSetting setting : settings) { - if (setting.getName().equals("spark")) { - ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId()); - break; - } - } + // restart spark interpreter to make dep loader work + ZeppelinServer.notebook.getInterpreterSettingManager().close(); // load dep - Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p0.getConfig(); - config.put("enabled", true); - p0.setConfig(config); + Paragraph p0 = note.addNewParagraph(anonymous); p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")"); - p0.setAuthenticationInfo(anonymous); - note.run(p0.getId()); - waitForFinish(p0); + note.run(p0.getId(), true); assertEquals(Status.FINISHED, p0.getStatus()); // write test csv file @@ -499,8 +396,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { FileUtils.write(tmpFile, "a,b\n1,2"); // load data using libraries from dep loader - Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setConfig(config); + Paragraph p1 = note.addNewParagraph(anonymous); String sqlContextName = "sqlContext"; if (isSpark2()) { @@ -509,26 +405,18 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { p1.setText("%pyspark\n" + "from pyspark.sql import SQLContext\n" + "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" + - ".load('" + tmpFile.getAbsolutePath() + "').count())"); - p1.setAuthenticationInfo(anonymous); - note.run(p1.getId()); + ".load('file://" + tmpFile.getAbsolutePath() + "').count())"); + note.run(p1.getId(), true); - waitForFinish(p1); assertEquals(Status.FINISHED, p1.getStatus()); assertEquals("2\n", p1.getResult().message().get(0).getData()); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } private void verifySparkVersionNumber() throws IOException { Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - note.setName("note"); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + Paragraph p = note.addNewParagraph(anonymous); + p.setText("%spark print(sc.version)"); - p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); assertEquals(Status.FINISHED, p.getStatus()); @@ -548,11 +436,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { @Test public void testSparkZeppelinContextDynamicForms() throws IOException { Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - note.setName("note"); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + Paragraph p = note.addNewParagraph(anonymous); String code = "%spark.spark println(z.textbox(\"my_input\", \"default_name\"))\n" + "println(z.select(\"my_select\", \"1\"," + "Seq((\"1\", \"select_1\"), (\"2\", \"select_2\"))))\n" + @@ -560,7 +444,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { "Seq((\"1\", \"check_1\"), (\"2\", \"check_2\")))\n" + "println(items(0))"; p.setText(code); - p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); @@ -577,18 +460,12 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("1", result[1]); assertEquals("items: Seq[Object] = Buffer(2)", result[2]); assertEquals("2", result[3]); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @Test public void testPySparkZeppelinContextDynamicForms() throws IOException { Note note = ZeppelinServer.notebook.createNote(anonymous); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - note.setName("note"); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + Paragraph p = note.addNewParagraph(anonymous); String code = "%spark.pyspark print(z.input('my_input', 'default_name'))\n" + "print(z.select('my_select', " + "[('1', 'select_1'), ('2', 'select_2')], defaultValue='1'))\n" + @@ -596,7 +473,6 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { "[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" + "print(items[0])"; p.setText(code); - p.setAuthenticationInfo(anonymous); note.run(p.getId()); waitForFinish(p); @@ -612,34 +488,20 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi { assertEquals("default_name", result[0]); assertEquals("1", result[1]); assertEquals("2", result[2]); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); } @Test public void testConfInterpreter() throws IOException { ZeppelinServer.notebook.getInterpreterSettingManager().close(); - Note note = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS); - Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - Map config = p.getConfig(); - config.put("enabled", true); - p.setConfig(config); + Note note = ZeppelinServer.notebook.createNote(anonymous); + Paragraph p = note.addNewParagraph(anonymous); p.setText("%spark.conf spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0"); - p.setAuthenticationInfo(anonymous); - note.run(p.getId()); - waitForFinish(p); + note.run(p.getId(), true); assertEquals(Status.FINISHED, p.getStatus()); - Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS); - p1.setConfig(config); + Paragraph p1 = note.addNewParagraph(anonymous); p1.setText("%spark\nimport com.databricks.spark.csv._"); - p1.setAuthenticationInfo(anonymous); - note.run(p1.getId()); - - waitForFinish(p1); + note.run(p1.getId(), true); assertEquals(Status.FINISHED, p1.getStatus()); - - ZeppelinServer.notebook.removeNote(note.getId(), anonymous); - } }