http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java index c33db1f..20c336d 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/IPySparkInterpreterTest.java @@ -17,19 +17,8 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; import com.google.common.io.Files; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -43,11 +32,23 @@ import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.python.IPythonInterpreterTest; import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; + public class IPySparkInterpreterTest extends IPythonInterpreterTest { private InterpreterGroup intpGroup; - private RemoteInterpreterEventClient mockIntpEventClient = - mock(RemoteInterpreterEventClient.class); + private RemoteInterpreterEventClient mockIntpEventClient = mock(RemoteInterpreterEventClient.class); @Override protected Properties initIntpProperties() { @@ -66,14 +67,15 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { return p; } + @Override protected void startInterpreter(Properties properties) throws InterpreterException { InterpreterContext context = getInterpreterContext(); context.setIntpEventClient(mockIntpEventClient); InterpreterContext.set(context); - LazyOpenInterpreter sparkInterpreter = - new LazyOpenInterpreter(new SparkInterpreter(properties)); + LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter( + new SparkInterpreter(properties)); intpGroup = new InterpreterGroup(); intpGroup.put("session_1", new ArrayList<Interpreter>()); intpGroup.get("session_1").add(sparkInterpreter); @@ -91,6 +93,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { interpreter.open(); } + @Override public void tearDown() throws InterpreterException { intpGroup.close(); @@ -103,8 +106,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { testPySpark(interpreter, mockIntpEventClient); } - public static void testPySpark( - final Interpreter interpreter, RemoteInterpreterEventClient mockIntpEventClient) + public static void testPySpark(final Interpreter interpreter, RemoteInterpreterEventClient mockIntpEventClient) throws InterpreterException, IOException, InterruptedException { reset(mockIntpEventClient); // rdd @@ -118,8 +120,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { result = interpreter.interpret("sc.range(1,10).sum()", context); Thread.sleep(100); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - List<InterpreterResultMessage> interpreterResultMessages = - context.out.toInterpreterResultMessage(); + List<InterpreterResultMessage> interpreterResultMessages = context.out.toInterpreterResultMessage(); assertEquals("45", interpreterResultMessages.get(0).getData().trim()); // spark job url is sent verify(mockIntpEventClient).onParaInfosReceived(any(Map.class)); @@ -127,73 +128,69 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { // spark sql context = createInterpreterContext(mockIntpEventClient); if (!isSpark2(sparkVersion)) { - result = - interpreter.interpret( - "df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); + result = interpreter.interpret("df = sqlContext.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); assertEquals( - "+---+---+\n" - + "| _1| _2|\n" - + "+---+---+\n" - + "| 1| a|\n" - + "| 2| b|\n" - + "+---+---+", - interpreterResultMessages.get(0).getData().trim()); + "+---+---+\n" + + "| _1| _2|\n" + + "+---+---+\n" + + "| 1| a|\n" + + "| 2| b|\n" + + "+---+---+", interpreterResultMessages.get(0).getData().trim()); context = createInterpreterContext(mockIntpEventClient); result = interpreter.interpret("z.show(df)", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); - assertEquals("_1 _2\n" + "1 a\n" + "2 b", interpreterResultMessages.get(0).getData().trim()); + assertEquals( + "_1 _2\n" + + "1 a\n" + + "2 b", interpreterResultMessages.get(0).getData().trim()); } else { - result = - interpreter.interpret( - "df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); + result = interpreter.interpret("df = spark.createDataFrame([(1,'a'),(2,'b')])\ndf.show()", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); assertEquals( - "+---+---+\n" - + "| _1| _2|\n" - + "+---+---+\n" - + "| 1| a|\n" - + "| 2| b|\n" - + "+---+---+", - interpreterResultMessages.get(0).getData().trim()); + "+---+---+\n" + + "| _1| _2|\n" + + "+---+---+\n" + + "| 1| a|\n" + + "| 2| b|\n" + + "+---+---+", interpreterResultMessages.get(0).getData().trim()); context = createInterpreterContext(mockIntpEventClient); result = interpreter.interpret("z.show(df)", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); - assertEquals("_1 _2\n" + "1 a\n" + "2 b", interpreterResultMessages.get(0).getData().trim()); + assertEquals( + "_1 _2\n" + + "1 a\n" + + "2 b", interpreterResultMessages.get(0).getData().trim()); } // cancel if (interpreter instanceof IPySparkInterpreter) { final InterpreterContext context2 = createInterpreterContext(mockIntpEventClient); - Thread thread = - new Thread() { - @Override - public void run() { - InterpreterResult result = null; - try { - result = - interpreter.interpret( - "import time\nsc.range(1,10).foreach(lambda x: time.sleep(1))", context2); - } catch (InterpreterException e) { - e.printStackTrace(); - } - assertEquals(InterpreterResult.Code.ERROR, result.code()); - List<InterpreterResultMessage> interpreterResultMessages = null; - try { - interpreterResultMessages = context2.out.toInterpreterResultMessage(); - assertTrue( - interpreterResultMessages.get(0).getData().contains("KeyboardInterrupt")); - } catch (IOException e) { - e.printStackTrace(); - } - } - }; + Thread thread = new Thread() { + @Override + public void run() { + InterpreterResult result = null; + try { + result = interpreter.interpret("import time\nsc.range(1,10).foreach(lambda x: time.sleep(1))", context2); + } catch (InterpreterException e) { + e.printStackTrace(); + } + assertEquals(InterpreterResult.Code.ERROR, result.code()); + List<InterpreterResultMessage> interpreterResultMessages = null; + try { + interpreterResultMessages = context2.out.toInterpreterResultMessage(); + assertTrue(interpreterResultMessages.get(0).getData().contains("KeyboardInterrupt")); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; thread.start(); // sleep 1 second to wait for the spark job starts @@ -203,8 +200,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { } // completions - List<InterpreterCompletion> completions = - interpreter.completion("sc.ran", 6, createInterpreterContext(mockIntpEventClient)); + List<InterpreterCompletion> completions = interpreter.completion("sc.ran", 6, createInterpreterContext(mockIntpEventClient)); assertEquals(1, completions.size()); assertEquals("range", completions.get(0).getValue()); @@ -212,8 +208,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { assertTrue(completions.size() > 0); completions.contains(new InterpreterCompletion("range", "range", "")); - completions = - interpreter.completion("1+1\nsc.", 7, createInterpreterContext(mockIntpEventClient)); + completions = interpreter.completion("1+1\nsc.", 7, createInterpreterContext(mockIntpEventClient)); assertTrue(completions.size() > 0); completions.contains(new InterpreterCompletion("range", "range", "")); @@ -223,22 +218,20 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { // pyspark streaming context = createInterpreterContext(mockIntpEventClient); - result = - interpreter.interpret( - "from pyspark.streaming import StreamingContext\n" - + "import time\n" - + "ssc = StreamingContext(sc, 1)\n" - + "rddQueue = []\n" - + "for i in range(5):\n" - + " rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" - + "inputStream = ssc.queueStream(rddQueue)\n" - + "mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" - + "reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" - + "reducedStream.pprint()\n" - + "ssc.start()\n" - + "time.sleep(6)\n" - + "ssc.stop(stopSparkContext=False, stopGraceFully=True)", - context); + result = interpreter.interpret( + "from pyspark.streaming import StreamingContext\n" + + "import time\n" + + "ssc = StreamingContext(sc, 1)\n" + + "rddQueue = []\n" + + "for i in range(5):\n" + + " rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" + + "inputStream = ssc.queueStream(rddQueue)\n" + + "mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" + + "reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" + + "reducedStream.pprint()\n" + + "ssc.start()\n" + + "time.sleep(6)\n" + + "ssc.stop(stopSparkContext=False, stopGraceFully=True)", context); Thread.sleep(1000); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); interpreterResultMessages = context.out.toInterpreterResultMessage(); @@ -250,8 +243,7 @@ public class IPySparkInterpreterTest extends IPythonInterpreterTest { return sparkVersion.startsWith("'2.") || sparkVersion.startsWith("u'2."); } - private static InterpreterContext createInterpreterContext( - RemoteInterpreterEventClient mockRemoteEventClient) { + private static InterpreterContext createInterpreterContext(RemoteInterpreterEventClient mockRemoteEventClient) { return InterpreterContext.builder() .setNoteId("noteId") .setParagraphId("paragraphId")
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java index 2722bfb..ea19866 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java @@ -17,24 +17,7 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - import com.google.common.io.Files; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URL; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.ui.CheckBox; import org.apache.zeppelin.display.ui.Password; @@ -54,6 +37,26 @@ import org.junit.After; import org.junit.Ignore; import org.junit.Test; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; + + public class NewSparkInterpreterTest { private SparkInterpreter interpreter; @@ -64,12 +67,10 @@ public class NewSparkInterpreterTest { // catch the interpreter output in onUpdate private InterpreterResultMessageOutput messageOutput; - private RemoteInterpreterEventClient mockRemoteEventClient = - mock(RemoteInterpreterEventClient.class); + private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class); @Test - public void testSparkInterpreter() - throws IOException, InterruptedException, InterpreterException { + public void testSparkInterpreter() throws IOException, InterruptedException, InterpreterException { Properties properties = new Properties(); properties.setProperty("spark.master", "local"); properties.setProperty("spark.app.name", "test"); @@ -78,12 +79,11 @@ public class NewSparkInterpreterTest { properties.setProperty("zeppelin.spark.useNew", "true"); properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl"); - InterpreterContext context = - InterpreterContext.builder() - .setInterpreterOut(new InterpreterOutput(null)) - .setIntpEventClient(mockRemoteEventClient) - .setAngularObjectRegistry(new AngularObjectRegistry("spark", null)) - .build(); + InterpreterContext context = InterpreterContext.builder() + .setInterpreterOut(new InterpreterOutput(null)) + .setIntpEventClient(mockRemoteEventClient) + .setAngularObjectRegistry(new AngularObjectRegistry("spark", null)) + .build(); InterpreterContext.set(context); interpreter = new SparkInterpreter(properties); @@ -93,8 +93,7 @@ public class NewSparkInterpreterTest { assertEquals("fake_spark_weburl", interpreter.getSparkUIUrl()); - InterpreterResult result = - interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); + InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("a: String = hello world\n", output); @@ -121,13 +120,11 @@ public class NewSparkInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // single line comment - result = - interpreter.interpret("print(\"hello world\")/*comment here*/", getInterpreterContext()); + result = interpreter.interpret("print(\"hello world\")/*comment here*/", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals("hello world", output); - result = - interpreter.interpret("/*comment here*/\nprint(\"hello world\")", getInterpreterContext()); + result = interpreter.interpret("/*comment here*/\nprint(\"hello world\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // multiple line comment @@ -135,32 +132,27 @@ public class NewSparkInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // test function - result = - interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", getInterpreterContext()); + result = interpreter.interpret("def add(x:Int, y:Int)\n{ return x+y }", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); result = interpreter.interpret("print(add(1,2))", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = - interpreter.interpret( - "/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext()); + result = interpreter.interpret("/*line 1 \n line 2*/print(\"hello world\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + // Companion object with case class - result = - interpreter.interpret( - "import scala.math._\n" - + "object Circle {\n" - + " private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)\n" - + "}\n" - + "case class Circle(radius: Double) {\n" - + " import Circle._\n" - + " def area: Double = calculateArea(radius)\n" - + "}\n" - + "\n" - + "val circle1 = new Circle(5.0)", - getInterpreterContext()); + result = interpreter.interpret("import scala.math._\n" + + "object Circle {\n" + + " private def calculateArea(radius: Double): Double = Pi * pow(radius, 2.0)\n" + + "}\n" + + "case class Circle(radius: Double) {\n" + + " import Circle._\n" + + " def area: Double = calculateArea(radius)\n" + + "}\n" + + "\n" + + "val circle1 = new Circle(5.0)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // spark rdd operation @@ -172,22 +164,19 @@ public class NewSparkInterpreterTest { verify(mockRemoteEventClient).onParaInfosReceived(any(Map.class)); // case class - result = - interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); + result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = - interpreter.interpret( - "case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" - + "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" - + " s => Bank(s(0).toInt, \n" - + " s(1).replaceAll(\"\\\"\", \"\"),\n" - + " s(2).replaceAll(\"\\\"\", \"\"),\n" - + " s(3).replaceAll(\"\\\"\", \"\"),\n" - + " s(5).replaceAll(\"\\\"\", \"\").toInt\n" - + " )\n" - + ").toDF()", - getInterpreterContext()); + result = interpreter.interpret( + "case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)\n" + + "val bank = bankText.map(s=>s.split(\";\")).filter(s => s(0)!=\"\\\"age\\\"\").map(\n" + + " s => Bank(s(0).toInt, \n" + + " s(1).replaceAll(\"\\\"\", \"\"),\n" + + " s(2).replaceAll(\"\\\"\", \"\"),\n" + + " s(3).replaceAll(\"\\\"\", \"\"),\n" + + " s(5).replaceAll(\"\\\"\", \"\").toInt\n" + + " )\n" + + ").toDF()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // spark version @@ -200,36 +189,32 @@ public class NewSparkInterpreterTest { result = interpreter.interpret("sqlContext", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = - interpreter.interpret( - "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + "df.show()", - getInterpreterContext()); + result = interpreter.interpret( + "val df = sqlContext.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + + "df.show()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertTrue( - output.contains( - "+---+----+\n" - + "| _1| _2|\n" - + "+---+----+\n" - + "| 1| a|\n" - + "| 2|null|\n" - + "+---+----+")); + assertTrue(output.contains( + "+---+----+\n" + + "| _1| _2|\n" + + "+---+----+\n" + + "| 1| a|\n" + + "| 2|null|\n" + + "+---+----+")); } else if (version.contains("String = 2.")) { result = interpreter.interpret("spark", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = - interpreter.interpret( - "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + "df.show()", - getInterpreterContext()); + result = interpreter.interpret( + "val df = spark.createDataFrame(Seq((1,\"a\"),(2, null)))\n" + + "df.show()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - assertTrue( - output.contains( - "+---+----+\n" - + "| _1| _2|\n" - + "+---+----+\n" - + "| 1| a|\n" - + "| 2|null|\n" - + "+---+----+")); + assertTrue(output.contains( + "+---+----+\n" + + "| _1| _2|\n" + + "+---+----+\n" + + "| 1| a|\n" + + "| 2|null|\n" + + "+---+----+")); } // ZeppelinContext @@ -257,10 +242,7 @@ public class NewSparkInterpreterTest { assertEquals("pwd", pwd.getName()); context = getInterpreterContext(); - result = - interpreter.interpret( - "z.checkbox(\"checkbox_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", - context); + result = interpreter.interpret("z.checkbox(\"checkbox_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, context.getGui().getForms().size()); assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox); @@ -275,17 +257,13 @@ public class NewSparkInterpreterTest { assertEquals("name_2", checkBox.getOptions()[1].getDisplayName()); context = getInterpreterContext(); - result = - interpreter.interpret( - "z.select(\"select_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", - context); + result = interpreter.interpret("z.select(\"select_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, context.getGui().getForms().size()); assertTrue(context.getGui().getForms().get("select_1") instanceof Select); Select select = (Select) context.getGui().getForms().get("select_1"); assertEquals("select_1", select.getName()); - // TODO(zjffdu) it seems a bug of GUI, the default value should be 'value_2', but it is - // List(value_2) + // TODO(zjffdu) it seems a bug of GUI, the default value should be 'value_2', but it is List(value_2) // assertEquals("value_2", select.getDefaultValue()); assertEquals(2, select.getOptions().length); assertEquals("value_1", select.getOptions()[0].getValue()); @@ -293,9 +271,9 @@ public class NewSparkInterpreterTest { assertEquals("value_2", select.getOptions()[1].getValue()); assertEquals("name_2", select.getOptions()[1].getDisplayName()); + // completions - List<InterpreterCompletion> completions = - interpreter.completion("a.", 2, getInterpreterContext()); + List<InterpreterCompletion> completions = interpreter.completion("a.", 2, getInterpreterContext()); assertTrue(completions.size() > 0); completions = interpreter.completion("a.isEm", 6, getInterpreterContext()); @@ -306,57 +284,43 @@ public class NewSparkInterpreterTest { assertEquals(1, completions.size()); assertEquals("range", completions.get(0).name); + // Zeppelin-Display - result = - interpreter.interpret( - "import org.apache.zeppelin.display.angular.notebookscope._\n" + "import AngularElem._", - getInterpreterContext()); + result = interpreter.interpret("import org.apache.zeppelin.display.angular.notebookscope._\n" + + "import AngularElem._", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - result = - interpreter.interpret( - "<div style=\"color:blue\">\n" - + "<h4>Hello Angular Display System</h4>\n" - + "</div>.display", - getInterpreterContext()); + result = interpreter.interpret("<div style=\"color:blue\">\n" + + "<h4>Hello Angular Display System</h4>\n" + + "</div>.display", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType()); - assertTrue( - messageOutput - .toInterpreterResultMessage() - .getData() - .contains("Hello Angular Display System")); - - result = - interpreter.interpret( - "<div class=\"btn btn-success\">\n" - + " Click me\n" - + "</div>.onClick{() =>\n" - + " println(\"hello world\")\n" - + "}.display", - getInterpreterContext()); + assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Hello Angular Display System")); + + result = interpreter.interpret("<div class=\"btn btn-success\">\n" + + " Click me\n" + + "</div>.onClick{() =>\n" + + " println(\"hello world\")\n" + + "}.display", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(InterpreterResult.Type.ANGULAR, messageOutput.getType()); assertTrue(messageOutput.toInterpreterResultMessage().getData().contains("Click me")); // getProgress final InterpreterContext context2 = getInterpreterContext(); - Thread interpretThread = - new Thread() { - @Override - public void run() { - InterpreterResult result = null; - try { - result = - interpreter.interpret( - "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", - context2); - } catch (InterpreterException e) { - e.printStackTrace(); - } - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } - }; + Thread interpretThread = new Thread() { + @Override + public void run() { + InterpreterResult result = null; + try { + result = interpreter.interpret( + "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", context2); + } catch (InterpreterException e) { + e.printStackTrace(); + } + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } + }; interpretThread.start(); boolean nonZeroProgress = false; int progress = 0; @@ -372,23 +336,20 @@ public class NewSparkInterpreterTest { // cancel final InterpreterContext context3 = getInterpreterContext(); - interpretThread = - new Thread() { - @Override - public void run() { - InterpreterResult result = null; - try { - result = - interpreter.interpret( - "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", - context3); - } catch (InterpreterException e) { - e.printStackTrace(); - } - assertEquals(InterpreterResult.Code.ERROR, result.code()); - assertTrue(output.contains("cancelled")); - } - }; + interpretThread = new Thread() { + @Override + public void run() { + InterpreterResult result = null; + try { + result = interpreter.interpret( + "val df = sc.parallelize(1 to 10, 2).foreach(e=>Thread.sleep(1000))", context3); + } catch (InterpreterException e) { + e.printStackTrace(); + } + assertEquals(InterpreterResult.Code.ERROR, result.code()); + assertTrue(output.contains("cancelled")); + } + }; interpretThread.start(); // sleep 1 second to wait for the spark job start @@ -406,9 +367,7 @@ public class NewSparkInterpreterTest { properties.setProperty("zeppelin.spark.useNew", "true"); // download spark-avro jar - URL website = - new URL( - "http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar"); + URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar"); ReadableByteChannel rbc = Channels.newChannel(website.openStream()); File avroJarFile = new File("spark-avro_2.11-3.2.0.jar"); FileOutputStream fos = new FileOutputStream(avroJarFile); @@ -421,13 +380,11 @@ public class NewSparkInterpreterTest { interpreter.setInterpreterGroup(mock(InterpreterGroup.class)); interpreter.open(); - InterpreterResult result = - interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext()); + InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); } - // TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the - // classpath issue is fixed. + //TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed. @Ignore public void testDepInterpreter() throws InterpreterException { Properties properties = new Properties(); @@ -449,8 +406,7 @@ public class NewSparkInterpreterTest { depInterpreter.open(); InterpreterResult result = - depInterpreter.interpret( - "z.load(\"com.databricks:spark-avro_2.11:3.2.0\")", getInterpreterContext()); + depInterpreter.interpret("z.load(\"com.databricks:spark-avro_2.11:3.2.0\")", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); interpreter.open(); @@ -474,8 +430,7 @@ public class NewSparkInterpreterTest { interpreter.setInterpreterGroup(mock(InterpreterGroup.class)); interpreter.open(); - InterpreterResult result = - interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); + InterpreterResult result = interpreter.interpret("val a=\"hello world\"", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); // no output for define new variable assertEquals("", output); @@ -577,17 +532,19 @@ public class NewSparkInterpreterTest { private InterpreterContext getInterpreterContext() { output = ""; - InterpreterContext context = - InterpreterContext.builder() - .setInterpreterOut(new InterpreterOutput(null)) - .setIntpEventClient(mockRemoteEventClient) - .setAngularObjectRegistry(new AngularObjectRegistry("spark", null)) - .build(); + InterpreterContext context = InterpreterContext.builder() + .setInterpreterOut(new InterpreterOutput(null)) + .setIntpEventClient(mockRemoteEventClient) + .setAngularObjectRegistry(new AngularObjectRegistry("spark", null)) + .build(); context.out = new InterpreterOutput( + new InterpreterOutputListener() { @Override - public void onUpdateAll(InterpreterOutput out) {} + public void onUpdateAll(InterpreterOutput out) { + + } @Override public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java index cac3295..ed91ffe 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkSqlInterpreterTest.java @@ -17,12 +17,6 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -import java.util.LinkedList; -import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -37,6 +31,13 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + public class NewSparkSqlInterpreterTest { private static SparkSqlInterpreter sqlInterpreter; @@ -63,16 +64,15 @@ public class NewSparkSqlInterpreterTest { intpGroup.get("session_1").add(sparkInterpreter); intpGroup.get("session_1").add(sqlInterpreter); - context = - InterpreterContext.builder() - .setNoteId("noteId") - .setParagraphId("paragraphId") - .setParagraphTitle("title") - .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null)) - .setResourcePool(new LocalResourcePool("id")) - .setInterpreterOut(new InterpreterOutput(null)) - .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) - .build(); + context = InterpreterContext.builder() + .setNoteId("noteId") + .setParagraphId("paragraphId") + .setParagraphTitle("title") + .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null)) + .setResourcePool(new LocalResourcePool("id")) + .setInterpreterOut(new InterpreterOutput(null)) + .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) + .build(); InterpreterContext.set(context); sparkInterpreter.open(); @@ -88,13 +88,10 @@ public class NewSparkSqlInterpreterTest { @Test public void test() throws InterpreterException { sparkInterpreter.interpret("case class Test(name:String, age:Int)", context); - sparkInterpreter.interpret( - "val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", - context); + sparkInterpreter.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context); sparkInterpreter.interpret("test.toDF.registerTempTable(\"test\")", context); - InterpreterResult ret = - sqlInterpreter.interpret("select name, age from test where age < 40", context); + InterpreterResult ret = sqlInterpreter.interpret("select name, age from test where age < 40", context); assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); assertEquals(Type.TABLE, ret.message().get(0).getType()); assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData()); @@ -103,11 +100,7 @@ public class NewSparkSqlInterpreterTest { assertEquals(InterpreterResult.Code.ERROR, ret.code()); assertTrue(ret.message().get(0).getData().length() > 0); - assertEquals( - InterpreterResult.Code.SUCCESS, - sqlInterpreter - .interpret("select case when name='aa' then name else name end from test", context) - .code()); + assertEquals(InterpreterResult.Code.SUCCESS, sqlInterpreter.interpret("select case when name='aa' then name else name end from test", context).code()); } @Test @@ -136,14 +129,17 @@ public class NewSparkSqlInterpreterTest { "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))", context); sparkInterpreter.interpret( - "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", context); + "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", + context); sparkInterpreter.interpret( - "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context); - sparkInterpreter.interpret("val people = sqlContext.createDataFrame(raw, schema)", context); + "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", + context); + sparkInterpreter.interpret("val people = sqlContext.createDataFrame(raw, schema)", + context); sparkInterpreter.interpret("people.toDF.registerTempTable(\"people\")", context); - InterpreterResult ret = - sqlInterpreter.interpret("select name, age from people where name = 'gates'", context); + InterpreterResult ret = sqlInterpreter.interpret( + "select name, age from people where name = 'gates'", context); assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); assertEquals(Type.TABLE, ret.message().get(0).getType()); assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData()); @@ -165,38 +161,34 @@ public class NewSparkSqlInterpreterTest { @Test public void testConcurrentSQL() throws InterpreterException, InterruptedException { if (sparkInterpreter.getSparkVersion().isSpark2()) { - sparkInterpreter.interpret( - "spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); + sparkInterpreter.interpret("spark.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); } else { - sparkInterpreter.interpret( - "sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); + sparkInterpreter.interpret("sqlContext.udf.register(\"sleep\", (e:Int) => {Thread.sleep(e*1000); e})", context); } - Thread thread1 = - new Thread() { - @Override - public void run() { - try { - InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } catch (InterpreterException e) { - e.printStackTrace(); - } - } - }; - - Thread thread2 = - new Thread() { - @Override - public void run() { - try { - InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context); - assertEquals(InterpreterResult.Code.SUCCESS, result.code()); - } catch (InterpreterException e) { - e.printStackTrace(); - } - } - }; + Thread thread1 = new Thread() { + @Override + public void run() { + try { + InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } catch (InterpreterException e) { + e.printStackTrace(); + } + } + }; + + Thread thread2 = new Thread() { + @Override + public void run() { + try { + InterpreterResult result = sqlInterpreter.interpret("select sleep(10)", context); + assertEquals(InterpreterResult.Code.SUCCESS, result.code()); + } catch (InterpreterException e) { + e.printStackTrace(); + } + } + }; // start running 2 spark sql, each would sleep 10 seconds, the totally running time should // be less than 20 seconds, which means they run concurrently. @@ -206,6 +198,8 @@ public class NewSparkSqlInterpreterTest { thread1.join(); thread2.join(); long end = System.currentTimeMillis(); - assertTrue("running time must be less than 20 seconds", ((end - start) / 1000) < 20); + assertTrue("running time must be less than 20 seconds", ((end - start)/1000) < 20); + } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java index 12e3252..8ae66b2 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java @@ -17,14 +17,6 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; import org.apache.spark.SparkConf; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.Interpreter; @@ -49,10 +41,21 @@ import org.junit.runners.MethodSorters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class OldSparkInterpreterTest { - @ClassRule public static TemporaryFolder tmpDir = new TemporaryFolder(); + @ClassRule + public static TemporaryFolder tmpDir = new TemporaryFolder(); static SparkInterpreter repl; static InterpreterGroup intpGroup; @@ -60,7 +63,8 @@ public class OldSparkInterpreterTest { static Logger LOGGER = LoggerFactory.getLogger(OldSparkInterpreterTest.class); /** - * Get spark version number as a numerical value. eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... + * Get spark version number as a numerical value. + * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... */ public static int getSparkVersionNumber(SparkInterpreter repl) { if (repl == null) { @@ -87,16 +91,15 @@ public class OldSparkInterpreterTest { @BeforeClass public static void setUp() throws Exception { intpGroup = new InterpreterGroup(); - context = - InterpreterContext.builder() - .setNoteId("noteId") - .setParagraphId("paragraphId") - .setParagraphTitle("title") - .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null)) - .setResourcePool(new LocalResourcePool("id")) - .setInterpreterOut(new InterpreterOutput(null)) - .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) - .build(); + context = InterpreterContext.builder() + .setNoteId("noteId") + .setParagraphId("paragraphId") + .setParagraphTitle("title") + .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null)) + .setResourcePool(new LocalResourcePool("id")) + .setInterpreterOut(new InterpreterOutput(null)) + .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) + .build(); InterpreterContext.set(context); intpGroup.put("note", new LinkedList<Interpreter>()); @@ -105,10 +108,10 @@ public class OldSparkInterpreterTest { intpGroup.get("note").add(repl); repl.open(); // The first para interpretdr will set the Eventclient wrapper - // SparkInterpreter.interpret(String, InterpreterContext) -> - // SparkInterpreter.populateSparkWebUrl(InterpreterContext) -> - // ZeppelinContext.setEventClient(RemoteEventClientWrapper) - // running a dummy to ensure that we dont have any race conditions among tests + //SparkInterpreter.interpret(String, InterpreterContext) -> + //SparkInterpreter.populateSparkWebUrl(InterpreterContext) -> + //ZeppelinContext.setEventClient(RemoteEventClientWrapper) + //running a dummy to ensure that we dont have any race conditions among tests repl.interpret("sc", context); } @@ -119,14 +122,14 @@ public class OldSparkInterpreterTest { @Test public void testBasicIntp() throws InterpreterException { - assertEquals( - InterpreterResult.Code.SUCCESS, repl.interpret("val a = 1\nval b = 2", context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, + repl.interpret("val a = 1\nval b = 2", context).code()); // when interpret incomplete expression InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context); assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error - // message + // message /* * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b")); @@ -150,41 +153,30 @@ public class OldSparkInterpreterTest { @Test public void testNextLineComments() throws InterpreterException { - assertEquals( - InterpreterResult.Code.SUCCESS, - repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n/*comment here\n*/.toInt", context).code()); } @Test public void testNextLineCompanionObject() throws InterpreterException { - String code = - "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}"; + String code = "class Counter {\nvar value: Long = 0\n}\n // comment\n\n object Counter {\n def apply(x: Long) = new Counter()\n}"; assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret(code, context).code()); } @Test public void testEndWithComment() throws InterpreterException { - assertEquals( - InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); } @Test public void testCreateDataFrame() throws InterpreterException { if (getSparkVersionNumber(repl) >= 13) { repl.interpret("case class Person(name:String, age:Int)\n", context); - repl.interpret( - "val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", - context); + repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); repl.interpret("people.toDF.count", context); - assertEquals( - new Long(4), - context - .getResourcePool() - .get( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ZeppelinReplResult.toString()) - .get()); + assertEquals(new Long(4), context.getResourcePool().get( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ZeppelinReplResult.toString()).get()); } } @@ -192,26 +184,23 @@ public class OldSparkInterpreterTest { public void testZShow() throws InterpreterException { String code = ""; repl.interpret("case class Person(name:String, age:Int)\n", context); - repl.interpret( - "val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", - context); + repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); if (getSparkVersionNumber(repl) < 13) { repl.interpret("people.registerTempTable(\"people\")", context); code = "z.show(sqlc.sql(\"select * from people\"))"; } else { code = "z.show(people.toDF)"; } - assertEquals(Code.SUCCESS, repl.interpret(code, context).code()); + assertEquals(Code.SUCCESS, repl.interpret(code, context).code()); } @Test public void testSparkSql() throws IOException, InterpreterException { repl.interpret("case class Person(name:String, age:Int)\n", context); - repl.interpret( - "val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", - context); + repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code()); + if (getSparkVersionNumber(repl) <= 11) { // spark 1.2 or later does not allow create multiple // SparkContext in the same jvm by default. // create new interpreter @@ -221,9 +210,7 @@ public class OldSparkInterpreterTest { repl2.open(); repl2.interpret("case class Man(name:String, age:Int)", context); - repl2.interpret( - "val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", - context); + repl2.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context); assertEquals(Code.SUCCESS, repl2.interpret("man.take(3)", context).code()); repl2.close(); } @@ -231,9 +218,8 @@ public class OldSparkInterpreterTest { @Test public void testReferencingUndefinedVal() throws InterpreterException { - InterpreterResult result = - repl.interpret( - "def category(min: Int) = {" + " if (0 <= value) \"error\"" + "}", context); + InterpreterResult result = repl.interpret("def category(min: Int) = {" + + " if (0 <= value) \"error\"" + "}", context); assertEquals(Code.ERROR, result.code()); } @@ -246,26 +232,23 @@ public class OldSparkInterpreterTest { String value = (String) intpProperty.get(key); LOGGER.debug(String.format("[%s]: [%s]", key, value)); if (key.startsWith("spark.") && value.isEmpty()) { - assertTrue( - String.format("configuration starting from 'spark.' should not be empty. [%s]", key), - !sparkConf.contains(key) || !sparkConf.get(key).isEmpty()); + assertTrue(String.format("configuration starting from 'spark.' should not be empty. [%s]", key), !sparkConf.contains(key) || !sparkConf.get(key).isEmpty()); } } } @Test - public void shareSingleSparkContext() - throws InterruptedException, IOException, InterpreterException { + public void shareSingleSparkContext() throws InterruptedException, IOException, InterpreterException { // create another SparkInterpreter SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir)); repl2.setInterpreterGroup(intpGroup); intpGroup.get("note").add(repl2); repl2.open(); - assertEquals( - Code.SUCCESS, repl.interpret("print(sc.parallelize(1 to 10).count())", context).code()); - assertEquals( - Code.SUCCESS, repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code()); + assertEquals(Code.SUCCESS, + repl.interpret("print(sc.parallelize(1 to 10).count())", context).code()); + assertEquals(Code.SUCCESS, + repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code()); repl2.close(); } @@ -314,17 +297,17 @@ public class OldSparkInterpreterTest { @Test public void testMultilineCompletion() throws InterpreterException { String buf = "val x = 1\nsc."; - List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null); + List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null); assertTrue(completions.size() > 0); } @Test public void testMultilineCompletionNewVar() throws InterpreterException { Assume.assumeFalse("this feature does not work with scala 2.10", Utils.isScala2_10()); - Assume.assumeTrue( - "This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7()); + Assume.assumeTrue("This feature does not work with scala < 2.11.8", Utils.isCompilerAboveScala2_11_7()); String buf = "val x = sc\nx."; - List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null); + List<InterpreterCompletion> completions = repl.completion(buf, buf.length(), null); assertTrue(completions.size() > 0); } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java index f69eb0c..fa1e257 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkSqlInterpreterTest.java @@ -17,12 +17,6 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -import java.util.LinkedList; -import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -39,9 +33,17 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.util.LinkedList; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + public class OldSparkSqlInterpreterTest { - @ClassRule public static TemporaryFolder tmpDir = new TemporaryFolder(); + @ClassRule + public static TemporaryFolder tmpDir = new TemporaryFolder(); static SparkSqlInterpreter sql; static SparkInterpreter repl; @@ -72,16 +74,15 @@ public class OldSparkSqlInterpreterTest { sql.setInterpreterGroup(intpGroup); sql.open(); - context = - InterpreterContext.builder() - .setNoteId("noteId") - .setParagraphId("paragraphId") - .setParagraphTitle("title") - .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null)) - .setResourcePool(new LocalResourcePool("id")) - .setInterpreterOut(new InterpreterOutput(null)) - .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) - .build(); + context = InterpreterContext.builder() + .setNoteId("noteId") + .setParagraphId("paragraphId") + .setParagraphTitle("title") + .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null)) + .setResourcePool(new LocalResourcePool("id")) + .setInterpreterOut(new InterpreterOutput(null)) + .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) + .build(); } @AfterClass @@ -97,9 +98,7 @@ public class OldSparkSqlInterpreterTest { @Test public void test() throws InterpreterException { repl.interpret("case class Test(name:String, age:Int)", context); - repl.interpret( - "val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", - context); + repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context); if (isDataFrameSupported()) { repl.interpret("test.toDF.registerTempTable(\"test\")", context); } else { @@ -115,10 +114,7 @@ public class OldSparkSqlInterpreterTest { assertEquals(InterpreterResult.Code.ERROR, ret.code()); assertTrue(ret.message().get(0).getData().length() > 0); - assertEquals( - InterpreterResult.Code.SUCCESS, - sql.interpret("select case when name==\"aa\" then name else name end from test", context) - .code()); + assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code()); } @Test @@ -153,19 +149,23 @@ public class OldSparkSqlInterpreterTest { "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))", context); repl.interpret( - "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", context); + "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", + context); repl.interpret( - "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context); + "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", + context); if (isDataFrameSupported()) { - repl.interpret("val people = sqlContext.createDataFrame(raw, schema)", context); + repl.interpret("val people = sqlContext.createDataFrame(raw, schema)", + context); repl.interpret("people.toDF.registerTempTable(\"people\")", context); } else { - repl.interpret("val people = sqlContext.applySchema(raw, schema)", context); + repl.interpret("val people = sqlContext.applySchema(raw, schema)", + context); repl.interpret("people.registerTempTable(\"people\")", context); } - InterpreterResult ret = - sql.interpret("select name, age from people where name = 'gates'", context); + InterpreterResult ret = sql.interpret( + "select name, age from people where name = 'gates'", context); System.err.println("RET=" + ret.message()); assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); assertEquals(Type.TABLE, ret.message().get(0).getType()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java index c365160..5a05ad5 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterMatplotlibTest.java @@ -17,15 +17,6 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Properties; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -46,10 +37,21 @@ import org.junit.runners.MethodSorters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class PySparkInterpreterMatplotlibTest { - @ClassRule public static TemporaryFolder tmpDir = new TemporaryFolder(); + @ClassRule + public static TemporaryFolder tmpDir = new TemporaryFolder(); static SparkInterpreter sparkInterpreter; static PySparkInterpreter pyspark; @@ -59,21 +61,21 @@ public class PySparkInterpreterMatplotlibTest { public static class AltPySparkInterpreter extends PySparkInterpreter { /** - * Since pyspark output is sent to an outputstream rather than being directly provided by - * interpret(), this subclass is created to override interpret() to append the result from the - * outputStream for the sake of convenience in testing. + * Since pyspark output is sent to an outputstream rather than + * being directly provided by interpret(), this subclass is created to + * override interpret() to append the result from the outputStream + * for the sake of convenience in testing. */ public AltPySparkInterpreter(Properties property) { super(property); } /** - * This code is mainly copied from RemoteInterpreterServer.java which normally handles this in - * real use cases. + * This code is mainly copied from RemoteInterpreterServer.java which + * normally handles this in real use cases. */ @Override - public InterpreterResult interpret(String st, InterpreterContext context) - throws InterpreterException { + public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { context.out.clear(); InterpreterResult result = super.interpret(st, context); List<InterpreterResultMessage> resultMessages = null; @@ -104,7 +106,8 @@ public class PySparkInterpreterMatplotlibTest { } /** - * Get spark version number as a numerical value. eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... + * Get spark version number as a numerical value. + * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... */ public static int getSparkVersionNumber() { if (sparkInterpreter == null) { @@ -120,13 +123,12 @@ public class PySparkInterpreterMatplotlibTest { public static void setUp() throws Exception { intpGroup = new InterpreterGroup(); intpGroup.put("note", new LinkedList<Interpreter>()); - context = - InterpreterContext.builder() - .setNoteId("note") - .setInterpreterOut(new InterpreterOutput(null)) - .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) - .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null)) - .build(); + context = InterpreterContext.builder() + .setNoteId("note") + .setInterpreterOut(new InterpreterOutput(null)) + .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) + .setAngularObjectRegistry(new AngularObjectRegistry(intpGroup.getId(), null)) + .build(); InterpreterContext.set(context); sparkInterpreter = new SparkInterpreter(getPySparkTestProperties()); @@ -181,8 +183,7 @@ public class PySparkInterpreterMatplotlibTest { InterpreterResult ret2; ret = pyspark.interpret("import matplotlib.pyplot as plt", context); ret = pyspark.interpret("plt.close()", context); - ret = - pyspark.interpret("z.configure_mpl(interactive=False, close=True, angular=False)", context); + ret = pyspark.interpret("z.configure_mpl(interactive=False, close=True, angular=False)", context); ret = pyspark.interpret("plt.plot([1, 2, 3])", context); ret1 = pyspark.interpret("plt.show()", context); @@ -209,9 +210,7 @@ public class PySparkInterpreterMatplotlibTest { InterpreterResult ret2; ret = pyspark.interpret("import matplotlib.pyplot as plt", context); ret = pyspark.interpret("plt.close()", context); - ret = - pyspark.interpret( - "z.configure_mpl(interactive=False, close=False, angular=False)", context); + ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=False)", context); ret = pyspark.interpret("plt.plot([1, 2, 3])", context); ret1 = pyspark.interpret("plt.show()", context); @@ -236,8 +235,7 @@ public class PySparkInterpreterMatplotlibTest { InterpreterResult ret; ret = pyspark.interpret("import matplotlib.pyplot as plt", context); ret = pyspark.interpret("plt.close()", context); - ret = - pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context); + ret = pyspark.interpret("z.configure_mpl(interactive=False, close=False, angular=True)", context); ret = pyspark.interpret("plt.plot([1, 2, 3])", context); ret = pyspark.interpret("plt.show()", context); assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 0a3677c..64f1ff5 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -17,12 +17,8 @@ package org.apache.zeppelin.spark; -import static org.mockito.Mockito.mock; import com.google.common.io.Files; -import java.io.IOException; -import java.util.LinkedList; -import java.util.Properties; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; @@ -33,10 +29,16 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.apache.zeppelin.python.PythonInterpreterTest; import org.junit.Test; +import java.io.IOException; +import java.util.LinkedList; +import java.util.Properties; + +import static org.mockito.Mockito.mock; + + public class PySparkInterpreterTest extends PythonInterpreterTest { - private RemoteInterpreterEventClient mockRemoteEventClient = - mock(RemoteInterpreterEventClient.class); + private RemoteInterpreterEventClient mockRemoteEventClient = mock(RemoteInterpreterEventClient.class); @Override public void setUp() throws InterpreterException { @@ -57,11 +59,10 @@ public class PySparkInterpreterTest extends PythonInterpreterTest { intpGroup = new InterpreterGroup(); intpGroup.put("note", new LinkedList<Interpreter>()); - InterpreterContext context = - InterpreterContext.builder() - .setInterpreterOut(new InterpreterOutput(null)) - .setIntpEventClient(mockRemoteEventClient) - .build(); + InterpreterContext context = InterpreterContext.builder() + .setInterpreterOut(new InterpreterOutput(null)) + .setIntpEventClient(mockRemoteEventClient) + .build(); InterpreterContext.set(context); LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(new SparkInterpreter(properties)); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java index e685d8a..fb9ad62 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkRInterpreterTest.java @@ -17,17 +17,6 @@ package org.apache.zeppelin.spark; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; @@ -39,12 +28,23 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + public class SparkRInterpreterTest { private SparkRInterpreter sparkRInterpreter; private SparkInterpreter sparkInterpreter; - private RemoteInterpreterEventClient mockRemoteIntpEventClient = - mock(RemoteInterpreterEventClient.class); + private RemoteInterpreterEventClient mockRemoteIntpEventClient = mock(RemoteInterpreterEventClient.class); @Before public void setUp() throws InterpreterException { @@ -63,10 +63,8 @@ public class SparkRInterpreterTest { sparkInterpreter = new SparkInterpreter(properties); InterpreterGroup interpreterGroup = new InterpreterGroup(); - interpreterGroup.addInterpreterToSession( - new LazyOpenInterpreter(sparkRInterpreter), "session_1"); - interpreterGroup.addInterpreterToSession( - new LazyOpenInterpreter(sparkInterpreter), "session_1"); + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkRInterpreter), "session_1"); + interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(sparkInterpreter), "session_1"); sparkRInterpreter.setInterpreterGroup(interpreterGroup); sparkInterpreter.setInterpreterGroup(interpreterGroup); @@ -81,6 +79,7 @@ public class SparkRInterpreterTest { @Test public void testSparkRInterpreter() throws InterpreterException, InterruptedException { + InterpreterResult result = sparkRInterpreter.interpret("1+1", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains("2")); @@ -89,9 +88,7 @@ public class SparkRInterpreterTest { assertEquals(InterpreterResult.Code.SUCCESS, result.code()); if (result.message().get(0).getData().contains("2.")) { // spark 2.x - result = - sparkRInterpreter.interpret( - "df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext()); + result = sparkRInterpreter.interpret("df <- as.DataFrame(faithful)\nhead(df)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains("eruptions waiting")); // spark job url is sent @@ -99,36 +96,30 @@ public class SparkRInterpreterTest { // cancel final InterpreterContext context = getInterpreterContext(); - Thread thread = - new Thread() { - @Override - public void run() { - try { - InterpreterResult result = - sparkRInterpreter.interpret( - "ldf <- dapplyCollect(\n" - + " df,\n" - + " function(x) {\n" - + " Sys.sleep(3)\n" - + " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" - + " })\n" - + "head(ldf, 3)", - context); - assertTrue(result.message().get(0).getData().contains("cancelled")); - } catch (InterpreterException e) { - fail("Should not throw InterpreterException"); - } - } - }; + Thread thread = new Thread() { + @Override + public void run() { + try { + InterpreterResult result = sparkRInterpreter.interpret("ldf <- dapplyCollect(\n" + + " df,\n" + + " function(x) {\n" + + " Sys.sleep(3)\n" + + " x <- cbind(x, \"waiting_secs\" = x$waiting * 60)\n" + + " })\n" + + "head(ldf, 3)", context); + assertTrue(result.message().get(0).getData().contains("cancelled")); + } catch (InterpreterException e) { + fail("Should not throw InterpreterException"); + } + } + }; thread.setName("Cancel-Thread"); thread.start(); Thread.sleep(1000); sparkRInterpreter.cancel(context); } else { // spark 1.x - result = - sparkRInterpreter.interpret( - "df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext()); + result = sparkRInterpreter.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertTrue(result.message().get(0).getData().contains("eruptions waiting")); // spark job url is sent @@ -145,11 +136,8 @@ public class SparkRInterpreterTest { assertTrue(result.message().get(0).getData().contains("<img src=")); assertTrue(result.message().get(0).getData().contains("width=\"100\"")); - result = - sparkRInterpreter.interpret( - "library(ggplot2)\n" - + "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()", - getInterpreterContext()); + result = sparkRInterpreter.interpret("library(ggplot2)\n" + + "ggplot(diamonds, aes(x=carat, y=price, color=cut)) + geom_point()", getInterpreterContext()); assertEquals(InterpreterResult.Code.SUCCESS, result.code()); assertEquals(1, result.message().size()); assertEquals(InterpreterResult.Type.HTML, result.message().get(0).getType()); @@ -163,14 +151,14 @@ public class SparkRInterpreterTest { } private InterpreterContext getInterpreterContext() { - InterpreterContext context = - InterpreterContext.builder() - .setNoteId("note_1") - .setParagraphId("paragraph_1") - .setIntpEventClient(mockRemoteIntpEventClient) - .setInterpreterOut(new InterpreterOutput(null)) - .setLocalProperties(new HashMap<>()) - .build(); + InterpreterContext context = InterpreterContext.builder() + .setNoteId("note_1") + .setParagraphId("paragraph_1") + .setIntpEventClient(mockRemoteIntpEventClient) + .setInterpreterOut(new InterpreterOutput(null)) + .setLocalProperties(new HashMap<>()) + .build(); return context; } } + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java index d6170b7..48d0055 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java @@ -41,6 +41,7 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.Mock; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; @@ -90,8 +91,9 @@ public class SparkShimsTest { SparkShims sparkShims = new SparkShims(new Properties()) { @Override - public void setupSparkListener( - String master, String sparkWebUrl, InterpreterContext context) {} + public void setupSparkListener(String master, + String sparkWebUrl, + InterpreterContext context) {} @Override public String showDataFrame(Object obj, int maxResult) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java ---------------------------------------------------------------------- diff --git a/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java b/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java index e22e003..bee8cf1 100644 --- a/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java +++ b/zeppelin-examples/zeppelin-example-clock/src/main/java/org/apache/zeppelin/example/app/clock/Clock.java @@ -16,9 +16,6 @@ */ package org.apache.zeppelin.example.app.clock; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; import org.apache.zeppelin.helium.Application; import org.apache.zeppelin.helium.ApplicationContext; import org.apache.zeppelin.helium.ApplicationException; @@ -27,7 +24,14 @@ import org.apache.zeppelin.resource.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Basic example application. Get java.util.Date from resource pool and display it */ +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Basic example application. + * Get java.util.Date from resource pool and display it + */ public class Clock extends Application { private final Logger logger = LoggerFactory.getLogger(Clock.class); @@ -56,30 +60,31 @@ public class Clock extends Application { } } + public void start() { - updateThread = - new Thread() { - public void run() { - while (!shutdown) { - // format date - SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - // put formatted string to angular object. - context().getAngularObjectRegistry().add("date", df.format(date)); - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // nothing todo - } - date = new Date(date.getTime() + 1000); - } + updateThread = new Thread() { + public void run() { + while (!shutdown) { + // format date + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + // put formatted string to angular object. + context().getAngularObjectRegistry().add("date", df.format(date)); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // nothing todo } - }; + date = new Date(date.getTime() + 1000); + } + } + }; updateThread.start(); } + @Override public void unload() throws ApplicationException { shutdown = true; @@ -91,13 +96,16 @@ public class Clock extends Application { context().getAngularObjectRegistry().remove("date"); } - /** Development mode */ + /** + * Development mode + */ public static void main(String[] args) throws Exception { LocalResourcePool pool = new LocalResourcePool("dev"); pool.put("date", new Date()); - ZeppelinApplicationDevServer devServer = - new ZeppelinApplicationDevServer(Clock.class.getName(), pool.getAll()); + ZeppelinApplicationDevServer devServer = new ZeppelinApplicationDevServer( + Clock.class.getName(), + pool.getAll()); devServer.start(); devServer.join();
