Repository: incubator-zeppelin Updated Branches: refs/heads/master b554f9a31 -> 01f4884a3
ZEPPELIN-441 Make SparkInterpreter work even if Spark version is not listed https://issues.apache.org/jira/browse/ZEPPELIN-441 Currently when spark version is not listed in SparkVersion.java, it throws ``` java.lang.IllegalArgumentException at org.apache.zeppelin.spark.SparkVersion.fromVersionString(SparkVersion.java:58) at org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:477) ``` This PR changes SparkVersion from 'enum' to 'class' and make Spark Interpreter work with unlisted version. Author: Lee moon soo <[email protected]> Closes #446 from Leemoonsoo/ZEPPELIN-441 and squashes the following commits: ddd2db4 [Lee moon soo] Add test and print version check message in SparkSqlInterpreter and SparkInterpreter 5e97d60 [Lee moon soo] Show error message on Unsupported version of spark is detected f6d3007 [Lee moon soo] enum -> class to handle unlisted Spark version Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/01f4884a Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/01f4884a Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/01f4884a Branch: refs/heads/master Commit: 01f4884a3a971ece49d668a9783d6b705cf6dbb5 Parents: b554f9a Author: Lee moon soo <[email protected]> Authored: Fri Nov 20 17:45:50 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Sun Nov 22 10:47:37 2015 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 7 +- .../apache/zeppelin/spark/SparkInterpreter.java | 5 ++ .../zeppelin/spark/SparkSqlInterpreter.java | 7 +- .../org/apache/zeppelin/spark/SparkVersion.java | 71 +++++++++++++------- .../apache/zeppelin/spark/SparkVersionTest.java | 17 ++++- 5 files changed, 78 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 9bd258b..273c897 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -284,6 +284,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand @Override public InterpreterResult interpret(String st, InterpreterContext context) { + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) { + return new InterpreterResult(Code.ERROR, "Spark " + + sparkInterpreter.getSparkVersion().toString() + " is not supported"); + } + if (!pythonscriptRunning) { return new InterpreterResult(Code.ERROR, "python process not running" + outputStream.toString()); @@ -314,7 +320,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand + outputStream.toString()); } - SparkInterpreter sparkInterpreter = getSparkInterpreter(); if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) { return new InterpreterResult(Code.ERROR, "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 528005e..54691c4 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -605,6 +605,11 @@ public class SparkInterpreter extends Interpreter { */ @Override public InterpreterResult interpret(String line, InterpreterContext context) { + if (sparkVersion.isUnsupportedVersion()) { + return new InterpreterResult(Code.ERROR, "Spark " + sparkVersion.toString() + + " is not supported"); + } + z.setInterpreterContext(context); if (line == null || line.trim().length() == 0) { return new InterpreterResult(Code.SUCCESS); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 1ee5f9c..88582cd 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -115,9 +115,14 @@ public class SparkSqlInterpreter extends Interpreter { @Override public InterpreterResult interpret(String st, InterpreterContext context) { SQLContext sqlc = null; + SparkInterpreter sparkInterpreter = getSparkInterpreter(); - sqlc = getSparkInterpreter().getSQLContext(); + if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) { + return new InterpreterResult(Code.ERROR, "Spark " + + sparkInterpreter.getSparkVersion().toString() + " is not supported"); + } + sqlc = getSparkInterpreter().getSQLContext(); SparkContext sc = sqlc.sparkContext(); if (concurrentSQL()) { sc.setLocalProperty("spark.scheduler.pool", "fair"); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java index e5256b7..a362938 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java @@ -16,29 +16,47 @@ */ package org.apache.zeppelin.spark; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Provide reading comparing capability of spark version returned from SparkContext.version() */ -public enum SparkVersion { - SPARK_1_0_0, - SPARK_1_0_1, - SPARK_1_1_0, - SPARK_1_1_1, - SPARK_1_2_0, - SPARK_1_2_1, - SPARK_1_2_2, - SPARK_1_3_0, - SPARK_1_3_1, - SPARK_1_4_0, - SPARK_1_4_1, - SPARK_1_5_0, - SPARK_1_5_1, - SPARK_1_5_2; +public class SparkVersion { + Logger logger = LoggerFactory.getLogger(SparkVersion.class); + + public static final SparkVersion SPARK_1_0_0 = SparkVersion.fromVersionString("1.0.0"); + public static final SparkVersion SPARK_1_1_0 = SparkVersion.fromVersionString("1.1.0"); + public static final SparkVersion SPARK_1_2_0 = SparkVersion.fromVersionString("1.2.0"); + public static final SparkVersion SPARK_1_3_0 = SparkVersion.fromVersionString("1.3.0"); + public static final SparkVersion SPARK_1_4_0 = SparkVersion.fromVersionString("1.4.0"); + public static final SparkVersion SPARK_1_5_0 = SparkVersion.fromVersionString("1.5.0"); + public static final SparkVersion SPARK_1_6_0 = SparkVersion.fromVersionString("1.6.0"); + + public static final SparkVersion MIN_SUPPORTED_VERSION = SPARK_1_0_0; + public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_1_6_0; private int version; + private String versionString; + + SparkVersion(String versionString) { + this.versionString = versionString; - SparkVersion() { - version = Integer.parseInt(name().substring("SPARK_".length()).replaceAll("_", "")); + try { + int pos = versionString.indexOf('-'); + + String numberPart = versionString; + if (pos > 0) { + numberPart = versionString.substring(0, pos); + } + version = Integer.parseInt(numberPart.replaceAll("\\.", "")); + } catch (Exception e) { + logger.error("Can not recognize Spark version " + versionString + + ". Assume it's a future release", e); + + // assume it is future release + version = 999; + } } public int toNumber() { @@ -46,17 +64,16 @@ public enum SparkVersion { } public String toString() { - return name().substring("SPARK_".length()).replaceAll("_", "."); + return versionString; } + public boolean isUnsupportedVersion() { + return olderThan(MIN_SUPPORTED_VERSION) || newerThanEquals(UNSUPPORTED_FUTURE_VERSION); + } + + public static SparkVersion fromVersionString(String versionString) { - for (SparkVersion v : values()) { - // Check for the beginning of the version string to allow for "1.5.0-SNAPSHOT" - if (versionString.startsWith(v.toString())) { - return v; - } - } - throw new IllegalArgumentException(); + return new SparkVersion(versionString); } public boolean isPysparkSupported() { @@ -79,6 +96,10 @@ public enum SparkVersion { return this.olderThan(SPARK_1_3_0); } + public boolean equals(Object versionToCompare) { + return version == ((SparkVersion) versionToCompare).version; + } + public boolean newerThan(SparkVersion versionToCompare) { return version > versionToCompare.version; } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java index ab54b65..5783c1e 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java @@ -23,10 +23,23 @@ import org.junit.Test; public class SparkVersionTest { @Test + public void testUnknownSparkVersion() { + assertEquals(999, SparkVersion.fromVersionString("DEV-10.10").toNumber()); + } + + @Test + public void testUnsupportedVersion() { + assertTrue(SparkVersion.fromVersionString("9.9.9").isUnsupportedVersion()); + assertFalse(SparkVersion.fromVersionString("1.5.9").isUnsupportedVersion()); + assertTrue(SparkVersion.fromVersionString("0.9.0").isUnsupportedVersion()); + assertTrue(SparkVersion.UNSUPPORTED_FUTURE_VERSION.isUnsupportedVersion()); + } + + @Test public void testSparkVersion() { // test equals - assertTrue(SparkVersion.SPARK_1_2_0 == SparkVersion.fromVersionString("1.2.0")); - assertTrue(SparkVersion.SPARK_1_5_0 == SparkVersion.fromVersionString("1.5.0-SNAPSHOT")); + assertEquals(SparkVersion.SPARK_1_2_0, SparkVersion.fromVersionString("1.2.0")); + assertEquals(SparkVersion.SPARK_1_5_0, SparkVersion.fromVersionString("1.5.0-SNAPSHOT")); // test newer than assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0));
