Repository: incubator-zeppelin Updated Branches: refs/heads/master 48f875d42 -> 754c55ec4
ZEPPELIN-273 Spark 1.5 support https://issues.apache.org/jira/browse/ZEPPELIN-273 "spark-1.5" profile is added Author: Lee moon soo <[email protected]> Closes #269 from Leemoonsoo/spark_1.5 and squashes the following commits: 6ba2dce [Lee moon soo] Add missing import after rebase 5279d26 [Lee moon soo] improve 8c19b09 [Lee moon soo] Add SparkVersion enum and test 699b05b [Lee moon soo] Add spark-1.5 profile 67023fa [Lee moon soo] allow spark 1.5 Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/754c55ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/754c55ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/754c55ec Branch: refs/heads/master Commit: 754c55ec4aa5475014b7ae24dc866fe072728182 Parents: 48f875d Author: Lee moon soo <[email protected]> Authored: Thu Sep 3 19:37:38 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Fri Sep 4 08:12:55 2015 -0700 ---------------------------------------------------------------------- README.md | 4 + spark-dependencies/pom.xml | 13 +++ .../zeppelin/spark/PySparkInterpreter.java | 6 +- .../apache/zeppelin/spark/SparkInterpreter.java | 37 +++----- .../zeppelin/spark/SparkSqlInterpreter.java | 1 - .../org/apache/zeppelin/spark/SparkVersion.java | 94 ++++++++++++++++++++ .../main/resources/python/zeppelin_pyspark.py | 28 ++++-- .../apache/zeppelin/spark/SparkVersionTest.java | 52 +++++++++++ 8 files changed, 197 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index a3bc0ad..e7761f0 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,10 @@ Spark 1.1.x ``` mvn clean package -Pspark-1.1 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests ``` +Spark 1.5.x +``` +mvn clean package -Pspark-1.5 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests +``` CDH 5.X ``` mvn clean package -Pspark-1.2 -Dhadoop.version=2.5.0-cdh5.3.0 -Phadoop-2.4 -DskipTests http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/spark-dependencies/pom.xml b/spark-dependencies/pom.xml index 9d10cf8..eac301f 100644 --- a/spark-dependencies/pom.xml +++ b/spark-dependencies/pom.xml @@ -456,6 +456,19 @@ </profile> <profile> + <id>spark-1.5</id> + <properties> + <spark.version>1.5.0</spark.version> + <akka.group>com.typesafe.akka</akka.group> + <akka.version>2.3.11</akka.version> + <protobuf.version>2.5.0</protobuf.version> + </properties> + + <dependencies> + </dependencies> + </profile> + + <profile> <id>hadoop-0.23</id> <!-- SPARK-1121: Adds an explicit dependency on Avro to work around a Hadoop 0.23.X issue --> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index c579d21..0e58729 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -137,7 +137,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand CommandLine cmd = CommandLine.parse(getProperty("zeppelin.pyspark.python")); cmd.addArgument(scriptPath, false); cmd.addArgument(Integer.toString(port), false); - cmd.addArgument(getJavaSparkContext().version(), false); + cmd.addArgument(Integer.toString(getSparkInterpreter().getSparkVersion().toNumber()), false); executor = new DefaultExecutor(); outputStream = new ByteArrayOutputStream(); PipedOutputStream ps = new PipedOutputStream(); @@ -286,9 +286,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand } SparkInterpreter sparkInterpreter = getSparkInterpreter(); - if (!sparkInterpreter.getSparkContext().version().startsWith("1.2") && - !sparkInterpreter.getSparkContext().version().startsWith("1.3") && - !sparkInterpreter.getSparkContext().version().startsWith("1.4")) { + if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) { return new InterpreterResult(Code.ERROR, "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index e684c52..82f8556 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -128,6 +128,7 @@ public class SparkInterpreter extends Interpreter { private Map<String, Object> binder; private SparkEnv env; + private SparkVersion sparkVersion; public SparkInterpreter(Properties property) { @@ -438,6 +439,8 @@ public class SparkInterpreter extends Interpreter { sc.taskScheduler().rootPool().addSchedulable(pool); } + sparkVersion = SparkVersion.fromVersionString(sc.version()); + sqlc = getSQLContext(); dep = getDependencyResolver(); @@ -462,15 +465,9 @@ public class SparkInterpreter extends Interpreter { + "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]"); intp.interpret("import org.apache.spark.SparkContext._"); - if (sc.version().startsWith("1.1")) { - intp.interpret("import sqlContext._"); - } else if (sc.version().startsWith("1.2")) { + if (sparkVersion.oldSqlContextImplicits()) { intp.interpret("import sqlContext._"); - } else if (sc.version().startsWith("1.3")) { - intp.interpret("import sqlContext.implicits._"); - intp.interpret("import sqlContext.sql"); - intp.interpret("import org.apache.spark.sql.functions._"); - } else if (sc.version().startsWith("1.4")) { + } else { intp.interpret("import sqlContext.implicits._"); intp.interpret("import sqlContext.sql"); intp.interpret("import org.apache.spark.sql.functions._"); @@ -488,14 +485,10 @@ public class SparkInterpreter extends Interpreter { */ try { - if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) { + if (sparkVersion.oldLoadFilesMethodName()) { Method loadFiles = this.interpreter.getClass().getMethod("loadFiles", Settings.class); loadFiles.invoke(this.interpreter, settings); - } else if (sc.version().startsWith("1.3")) { - Method loadFiles = this.interpreter.getClass().getMethod( - "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); - loadFiles.invoke(this.interpreter, settings); - } else if (sc.version().startsWith("1.4")) { + } else { Method loadFiles = this.interpreter.getClass().getMethod( "org$apache$spark$repl$SparkILoop$$loadFiles", Settings.class); loadFiles.invoke(this.interpreter, settings); @@ -682,18 +675,10 @@ public class SparkInterpreter extends Interpreter { int[] progressInfo = null; try { Object finalStage = job.getClass().getMethod("finalStage").invoke(job); - if (sc.version().startsWith("1.0")) { + if (sparkVersion.getProgress1_0()) { progressInfo = getProgressFromStage_1_0x(sparkListener, finalStage); - } else if (sc.version().startsWith("1.1")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); - } else if (sc.version().startsWith("1.2")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); - } else if (sc.version().startsWith("1.3")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); - } else if (sc.version().startsWith("1.4")) { - progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); } else { - continue; + progressInfo = getProgressFromStage_1_1x(sparkListener, finalStage); } } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException @@ -818,4 +803,8 @@ public class SparkInterpreter extends Interpreter { public ZeppelinContext getZeppelinContext() { return z; } + + public SparkVersion getSparkVersion() { + return sparkVersion; + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 7c1ba11..7b26e34 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -162,7 +162,6 @@ public class SparkSqlInterpreter extends Interpreter { return sparkInterpreter.getProgress(context); } - @Override public Scheduler getScheduler() { if (concurrentSQL()) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java new file mode 100644 index 0000000..0a52fe2 --- /dev/null +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.spark; + +/** + * Provide reading comparing capability of spark version returned from SparkContext.version() + */ +public enum SparkVersion { + SPARK_1_0_0, + SPARK_1_0_1, + SPARK_1_1_0, + SPARK_1_1_1, + SPARK_1_2_0, + SPARK_1_2_1, + SPARK_1_2_2, + SPARK_1_3_0, + SPARK_1_3_1, + SPARK_1_4_0, + SPARK_1_4_1, + SPARK_1_5_0; + + private int version; + + SparkVersion() { + version = Integer.parseInt(name().substring("SPARK_".length()).replaceAll("_", "")); + } + + public int toNumber() { + return version; + } + + public String toString() { + return name().substring("SPARK_".length()).replaceAll("_", "."); + } + + public static SparkVersion fromVersionString(String versionString) { + for (SparkVersion v : values()) { + if (v.toString().equals(versionString)) { + return v; + } + } + throw new IllegalArgumentException(); + } + + public boolean isPysparkSupported() { + return this.newerThanEquals(SPARK_1_2_0); + } + + public boolean hasDataFrame() { + return this.newerThanEquals(SPARK_1_4_0); + } + + public boolean getProgress1_0() { + return this.olderThan(SPARK_1_1_0); + } + + public boolean oldLoadFilesMethodName() { + return this.olderThan(SPARK_1_3_0); + } + + public boolean oldSqlContextImplicits() { + return this.olderThan(SPARK_1_3_0); + } + + public boolean newerThan(SparkVersion versionToCompare) { + return version > versionToCompare.version; + } + + public boolean newerThanEquals(SparkVersion versionToCompare) { + return version >= versionToCompare.version; + } + + public boolean olderThan(SparkVersion versionToCompare) { + return version < versionToCompare.version; + } + + public boolean olderThanEquals(SparkVersion versionToCompare) { + return version <= versionToCompare.version; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 794fbc7..e57190e 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -78,15 +78,28 @@ class PyZeppelinContext(dict): def get(self, key): return self.__getitem__(key) +class SparkVersion(object): + SPARK_1_4_0 = 140 + SPARK_1_3_0 = 130 + + def __init__(self, versionNumber): + self.version = versionNumber + + def isAutoConvertEnabled(self): + return self.version >= self.SPARK_1_4_0 + + def isImportAllPackageUnderSparkSql(self): + return self.version >= self.SPARK_1_3_0 + output = Logger() sys.stdout = output sys.stderr = output client = GatewayClient(port=int(sys.argv[1])) -sparkVersion = sys.argv[2] +sparkVersion = SparkVersion(int(sys.argv[2])) -if sparkVersion.startswith("1.4"): +if sparkVersion.isAutoConvertEnabled(): gateway = JavaGateway(client, auto_convert = True) else: gateway = JavaGateway(client) @@ -102,17 +115,14 @@ intp.onPythonScriptInitialized() jsc = intp.getJavaSparkContext() -if sparkVersion.startswith("1.2"): +if sparkVersion.isImportAllPackageUnderSparkSql(): + java_import(gateway.jvm, "org.apache.spark.sql.*") + java_import(gateway.jvm, "org.apache.spark.sql.hive.*") +else: java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") -elif sparkVersion.startswith("1.3"): - java_import(gateway.jvm, "org.apache.spark.sql.*") - java_import(gateway.jvm, "org.apache.spark.sql.hive.*") -elif sparkVersion.startswith("1.4"): - java_import(gateway.jvm, "org.apache.spark.sql.*") - java_import(gateway.jvm, "org.apache.spark.sql.hive.*") java_import(gateway.jvm, "scala.Tuple2") http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/754c55ec/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java new file mode 100644 index 0000000..dd00f73 --- /dev/null +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.spark; + +import static org.junit.Assert.*; + +import org.junit.Test; + +public class SparkVersionTest { + + @Test + public void testSparkVersion() { + // test equals + assertTrue(SparkVersion.SPARK_1_2_0 == SparkVersion.fromVersionString("1.2.0")); + + // test newer than + assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0)); + assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_3_0)); + assertTrue(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_1_0)); + + assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_2_0)); + assertFalse(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_3_0)); + assertTrue(SparkVersion.SPARK_1_2_0.newerThanEquals(SparkVersion.SPARK_1_1_0)); + + // test older than + assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_2_0)); + assertFalse(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_1_0)); + assertTrue(SparkVersion.SPARK_1_2_0.olderThan(SparkVersion.SPARK_1_3_0)); + + assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_2_0)); + assertFalse(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_1_0)); + assertTrue(SparkVersion.SPARK_1_2_0.olderThanEquals(SparkVersion.SPARK_1_3_0)); + + // conversion + assertEquals(120, SparkVersion.SPARK_1_2_0.toNumber()); + assertEquals("1.2.0", SparkVersion.SPARK_1_2_0.toString()); + } +}
