Repository: incubator-zeppelin Updated Branches: refs/heads/master fa61a054f -> a90908d72
[ZEPPELIN-23] Set version of default spark interpreter build profile from 1.1 to 1.3 I updated the version of spark used by default throughout the dependency listing to be 1.3.0 (to match the version used in the 1.3 profile). Author: Lee moon soo <[email protected]> Author: Ilya Ganelin <[email protected]> This patch had conflicts when merged, resolved by Committer: Lee moon soo <[email protected]> Closes #71 from ilganeli/ZEPPELIN-23 and squashes the following commits: d1d642e [Ilya Ganelin] Merge pull request #1 from Leemoonsoo/ZEPPELIN-23 0108781 [Lee moon soo] update isDataFrameSupported() ac4e8bc [Lee moon soo] check spark version as a numerical number in tests 405da99 [Lee moon soo] Fix indentation 28dd7bf [Lee moon soo] Make unittest pass with spark-1.3 profile cf2ed0c [Lee moon soo] Activate spark-1.3 profile by default. Set spark 1.3.1 as a default version of spark-1.3 profile 13bf6c1 [Lee moon soo] remove z.load() error for spark 1.2, spark 1.3 1402df2 [Ilya Ganelin] Updated default spark version to 1.3 Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/a90908d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/a90908d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/a90908d7 Branch: refs/heads/master Commit: a90908d7213c3fda5d0ac9e8f96c7a8bde458741 Parents: fa61a05 Author: Lee moon soo <[email protected]> Authored: Tue Jun 2 17:11:22 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Wed Jun 3 14:26:50 2015 +0900 ---------------------------------------------------------------------- conf/log4j.properties | 1 + pom.xml | 9 +- .../zeppelin/spark/dep/DependencyResolver.java | 30 +++- .../zeppelin/spark/SparkInterpreterTest.java | 129 +++++++------ .../zeppelin/spark/SparkSqlInterpreterTest.java | 180 +++++++++++-------- 5 files changed, 212 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/conf/log4j.properties b/conf/log4j.properties index a7ef28b..b132ce1 100644 --- a/conf/log4j.properties +++ b/conf/log4j.properties @@ -21,6 +21,7 @@ log4j.appender.stdout = org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout = org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n +log4j.appender.dailyfile.DatePattern=.yyyy-MM-dd log4j.appender.dailyfile.Threshold = INFO log4j.appender.dailyfile = org.apache.log4j.DailyRollingFileAppender log4j.appender.dailyfile.File = ${zeppelin.log.file} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 62fe12b..fcd1a25 100644 --- a/pom.xml +++ b/pom.xml @@ -1397,7 +1397,7 @@ <id>spark-1.3</id> <properties> <akka.version>2.3.4-spark</akka.version> - <spark.version>1.3.0</spark.version> + <spark.version>1.3.1</spark.version> <mesos.version>0.21.0</mesos.version> <hbase.version>0.98.7</hbase.version> <hbase.artifact>hbase</hbase.artifact> @@ -1419,17 +1419,18 @@ <snappy.version>1.1.1.6</snappy.version> <mesos.version>0.21.0</mesos.version> </properties> - + <activation> + <activeByDefault>true</activeByDefault> + </activation> <dependencies> </dependencies> - </profile> <profile> <id>cassandra-spark-1.3</id> <properties> <akka.version>2.3.4-spark</akka.version> - <spark.version>1.3.0</spark.version> + <spark.version>1.3.1</spark.version> <mesos.version>0.21.0</mesos.version> <hbase.version>0.98.7</hbase.version> <hbase.artifact>hbase</hbase.artifact> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java b/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java index 06a4022..0702948 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/dep/DependencyResolver.java @@ -143,8 +143,9 @@ public class DependencyResolver { // Until spark 1.1.x // check https://github.com/apache/spark/commit/191d7cf2a655d032f160b9fa181730364681d0e7 - private void updateRuntimeClassPath(URL[] urls) throws SecurityException, IllegalAccessException, - IllegalArgumentException, InvocationTargetException, NoSuchMethodException { + private void updateRuntimeClassPath_1_x(URL[] urls) throws SecurityException, + IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException { ClassLoader cl = intp.classLoader().getParent(); Method addURL; addURL = cl.getClass().getDeclaredMethod("addURL", new Class[] {URL.class}); @@ -154,6 +155,18 @@ public class DependencyResolver { } } + private void updateRuntimeClassPath_2_x(URL[] urls) throws SecurityException, + IllegalAccessException, IllegalArgumentException, + InvocationTargetException, NoSuchMethodException { + ClassLoader cl = intp.classLoader().getParent(); + Method addURL; + addURL = cl.getClass().getDeclaredMethod("addNewUrl", new Class[] {URL.class}); + addURL.setAccessible(true); + for (URL url : urls) { + addURL.invoke(cl, url); + } + } + private MergedClassPath<AbstractFile> mergeUrlsIntoClassPath(JavaPlatform platform, URL[] urls) { IndexedSeq<ClassPath<AbstractFile>> entries = ((MergedClassPath<AbstractFile>) platform.classPath()).entries(); @@ -217,8 +230,11 @@ public class DependencyResolver { intp.global().new Run(); - updateRuntimeClassPath(new URL[] {jarFile.toURI().toURL()}); - updateCompilerClassPath(new URL[] {jarFile.toURI().toURL()}); + if (sc.version().startsWith("1.1")) { + updateRuntimeClassPath_1_x(new URL[] {jarFile.toURI().toURL()}); + } else { + updateRuntimeClassPath_2_x(new URL[] {jarFile.toURI().toURL()}); + } if (addSparkContext) { sc.addJar(jarFile.getAbsolutePath()); @@ -261,7 +277,11 @@ public class DependencyResolver { } intp.global().new Run(); - updateRuntimeClassPath(newClassPathList.toArray(new URL[0])); + if (sc.version().startsWith("1.1")) { + updateRuntimeClassPath_1_x(newClassPathList.toArray(new URL[0])); + } else { + updateRuntimeClassPath_2_x(newClassPathList.toArray(new URL[0])); + } updateCompilerClassPath(newClassPathList.toArray(new URL[0])); if (addSparkContext) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 87df793..c49f1e1 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -45,6 +45,21 @@ public class SparkInterpreterTest { private InterpreterContext context; private File tmpDir; + + /** + * Get spark version number as a numerical value. + * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ... + */ + public static int getSparkVersionNumber() { + if (repl == null) { + return 0; + } + + String[] split = repl.getSparkContext().version().split("."); + int version = Integer.parseInt(split[0]) + Integer.parseInt(split[1]); + return version; + } + @Before public void setUp() throws Exception { tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); @@ -52,18 +67,19 @@ public class SparkInterpreterTest { tmpDir.mkdirs(); - if (repl == null) { - Properties p = new Properties(); + if (repl == null) { + Properties p = new Properties(); - repl = new SparkInterpreter(p); - repl.open(); - } + repl = new SparkInterpreter(p); + repl.open(); + } - InterpreterGroup intpGroup = new InterpreterGroup(); - context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), + InterpreterGroup intpGroup = new InterpreterGroup(); + context = new InterpreterContext("id", "title", "text", + new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry( + intpGroup.getId(), null), new LinkedList<InterpreterContextRunner>()); - } + } @After public void tearDown() throws Exception { @@ -83,52 +99,55 @@ public class SparkInterpreterTest { } } - @Test - public void testBasicIntp() { - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val a = 1\nval b = 2", context).code()); - - // when interpret incomplete expression - InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context); - assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); - assertTrue(incomplete.message().length()>0); // expecting some error message - /* - assertEquals(1, repl.getValue("a")); - assertEquals(2, repl.getValue("b")); - repl.interpret("val ver = sc.version"); - assertNotNull(repl.getValue("ver")); - assertEquals("HELLO\n", repl.interpret("println(\"HELLO\")").message()); - */ - } - - @Test - public void testEndWithComment() { - assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); - } - - @Test - public void testSparkSql(){ - repl.interpret("case class Person(name:String, age:Int)\n", context); - repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); - assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code()); - - // create new interpreter - Properties p = new Properties(); - SparkInterpreter repl2 = new SparkInterpreter(p); - repl2.open(); - - repl.interpret("case class Man(name:String, age:Int)", context); - repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context); - assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code()); - repl2.getSparkContext().stop(); - } - - @Test - public void testReferencingUndefinedVal(){ - InterpreterResult result = repl.interpret("def category(min: Int) = {" + - " if (0 <= value) \"error\"" + - "}", context); - assertEquals(Code.ERROR, result.code()); - } + @Test + public void testBasicIntp() { + assertEquals(InterpreterResult.Code.SUCCESS, + repl.interpret("val a = 1\nval b = 2", context).code()); + + // when interpret incomplete expression + InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context); + assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code()); + assertTrue(incomplete.message().length() > 0); // expecting some error + // message + /* + * assertEquals(1, repl.getValue("a")); assertEquals(2, repl.getValue("b")); + * repl.interpret("val ver = sc.version"); + * assertNotNull(repl.getValue("ver")); assertEquals("HELLO\n", + * repl.interpret("println(\"HELLO\")").message()); + */ + } + + @Test + public void testEndWithComment() { + assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); + } + + @Test + public void testSparkSql(){ + repl.interpret("case class Person(name:String, age:Int)\n", context); + repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context); + assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code()); + + + if (getSparkVersionNumber() <= 11) { // spark 1.2 or later does not allow create multiple SparkContext in the same jvm by default. + // create new interpreter + Properties p = new Properties(); + SparkInterpreter repl2 = new SparkInterpreter(p); + repl2.open(); + + repl.interpret("case class Man(name:String, age:Int)", context); + repl.interpret("val man = sc.parallelize(Seq(Man(\"moon\", 33), Man(\"jobs\", 51), Man(\"gates\", 51), Man(\"park\", 34)))", context); + assertEquals(Code.SUCCESS, repl.interpret("man.take(3)", context).code()); + repl2.getSparkContext().stop(); + } + } + + @Test + public void testReferencingUndefinedVal() { + InterpreterResult result = repl.interpret("def category(min: Int) = {" + + " if (0 <= value) \"error\"" + "}", context); + assertEquals(Code.ERROR, result.code()); + } @Test public void testZContextDependencyLoading() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/a90908d7/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index 27198b3..eaa0a8a 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -36,81 +36,115 @@ import org.junit.Test; public class SparkSqlInterpreterTest { - private SparkSqlInterpreter sql; + private SparkSqlInterpreter sql; private SparkInterpreter repl; private InterpreterContext context; private InterpreterGroup intpGroup; - @Before - public void setUp() throws Exception { - Properties p = new Properties(); - - if (repl == null) { - - if (SparkInterpreterTest.repl == null) { - repl = new SparkInterpreter(p); - repl.open(); - SparkInterpreterTest.repl = repl; - } else { - repl = SparkInterpreterTest.repl; - } - - sql = new SparkSqlInterpreter(p); - - intpGroup = new InterpreterGroup(); - intpGroup.add(repl); - intpGroup.add(sql); - sql.setInterpreterGroup(intpGroup); - sql.open(); - } - context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(), - new AngularObjectRegistry(intpGroup.getId(), null), - new LinkedList<InterpreterContextRunner>()); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public void test() { - repl.interpret("case class Test(name:String, age:Int)", context); - repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context); - repl.interpret("test.registerAsTable(\"test\")", context); - - InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(Type.TABLE, ret.type()); - assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message()); - - assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code()); - assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from people", context).code()); - } - - @Test - public void testStruct(){ - repl.interpret("case class Person(name:String, age:Int)", context); - repl.interpret("case class People(group:String, person:Person)", context); - repl.interpret("val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))", context); - repl.interpret("gr.registerAsTable(\"gr\")", context); - InterpreterResult ret = sql.interpret("select * from gr", context); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - } - - @Test - public void test_null_value_in_row() { - repl.interpret("import org.apache.spark.sql._", context); - repl.interpret("def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}", context); - repl.interpret("val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))", context); - repl.interpret("val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", context); - repl.interpret("val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", context); - repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", context); - repl.interpret("people.registerTempTable(\"people\")", context); - - InterpreterResult ret = sql.interpret("select name, age from people where name = 'gates'", context); - System.err.println("RET=" + ret.message()); - assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); - assertEquals(Type.TABLE, ret.type()); - assertEquals("name\tage\ngates\tnull\n", ret.message()); - } + @Before + public void setUp() throws Exception { + Properties p = new Properties(); + + if (repl == null) { + + if (SparkInterpreterTest.repl == null) { + repl = new SparkInterpreter(p); + repl.open(); + SparkInterpreterTest.repl = repl; + } else { + repl = SparkInterpreterTest.repl; + } + + sql = new SparkSqlInterpreter(p); + + intpGroup = new InterpreterGroup(); + intpGroup.add(repl); + intpGroup.add(sql); + sql.setInterpreterGroup(intpGroup); + sql.open(); + } + context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>()); + } + + @After + public void tearDown() throws Exception { + } + + boolean isDataFrameSupported() { + return SparkInterpreterTest.getSparkVersionNumber() >= 13; + } + + @Test + public void test() { + repl.interpret("case class Test(name:String, age:Int)", context); + repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context); + if (isDataFrameSupported()) { + repl.interpret("test.toDF.registerTempTable(\"test\")", context); + } else { + repl.interpret("test.registerTempTable(\"test\")", context); + } + + InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(Type.TABLE, ret.type()); + assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message()); + + assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code()); + assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code()); + } + + @Test + public void testStruct() { + repl.interpret("case class Person(name:String, age:Int)", context); + repl.interpret("case class People(group:String, person:Person)", context); + repl.interpret( + "val gr = sc.parallelize(Seq(People(\"g1\", Person(\"moon\",33)), People(\"g2\", Person(\"sun\",11))))", + context); + if (isDataFrameSupported()) { + repl.interpret("gr.toDF.registerTempTable(\"gr\")", context); + } else { + repl.interpret("gr.registerTempTable(\"gr\")", context); + } + + InterpreterResult ret = sql.interpret("select * from gr", context); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + } + + @Test + public void test_null_value_in_row() { + repl.interpret("import org.apache.spark.sql._", context); + if (isDataFrameSupported()) { + repl.interpret( + "import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}", + context); + } + repl.interpret( + "def toInt(s:String): Any = {try { s.trim().toInt} catch {case e:Exception => null}}", + context); + repl.interpret( + "val schema = StructType(Seq(StructField(\"name\", StringType, false),StructField(\"age\" , IntegerType, true),StructField(\"other\" , StringType, false)))", + context); + repl.interpret( + "val csv = sc.parallelize(Seq((\"jobs, 51, apple\"), (\"gates, , microsoft\")))", + context); + repl.interpret( + "val raw = csv.map(_.split(\",\")).map(p => Row(p(0),toInt(p(1)),p(2)))", + context); + repl.interpret("val people = z.sqlContext.applySchema(raw, schema)", + context); + if (isDataFrameSupported()) { + repl.interpret("people.toDF.registerTempTable(\"people\")", context); + } else { + repl.interpret("people.registerTempTable(\"people\")", context); + } + + InterpreterResult ret = sql.interpret( + "select name, age from people where name = 'gates'", context); + System.err.println("RET=" + ret.message()); + assertEquals(InterpreterResult.Code.SUCCESS, ret.code()); + assertEquals(Type.TABLE, ret.type()); + assertEquals("name\tage\ngates\tnull\n", ret.message()); + } }
