Repository: incubator-zeppelin
Updated Branches:
  refs/heads/master b554f9a31 -> 01f4884a3


ZEPPELIN-441 Make SparkInterpreter work even if Spark version is not listed

https://issues.apache.org/jira/browse/ZEPPELIN-441

Currently when spark version is not listed in SparkVersion.java, it throws

```
java.lang.IllegalArgumentException
    at 
org.apache.zeppelin.spark.SparkVersion.fromVersionString(SparkVersion.java:58)
    at 
org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:477)
```

This PR changes SparkVersion from 'enum' to 'class' and make Spark Interpreter 
work with unlisted version.

Author: Lee moon soo <[email protected]>

Closes #446 from Leemoonsoo/ZEPPELIN-441 and squashes the following commits:

ddd2db4 [Lee moon soo] Add test and print version check message in 
SparkSqlInterpreter and SparkInterpreter
5e97d60 [Lee moon soo] Show error message on Unsupported version of spark is 
detected
f6d3007 [Lee moon soo] enum -> class to handle unlisted Spark version


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/01f4884a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/01f4884a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/01f4884a

Branch: refs/heads/master
Commit: 01f4884a3a971ece49d668a9783d6b705cf6dbb5
Parents: b554f9a
Author: Lee moon soo <[email protected]>
Authored: Fri Nov 20 17:45:50 2015 +0900
Committer: Lee moon soo <[email protected]>
Committed: Sun Nov 22 10:47:37 2015 +0900

----------------------------------------------------------------------
 .../zeppelin/spark/PySparkInterpreter.java      |  7 +-
 .../apache/zeppelin/spark/SparkInterpreter.java |  5 ++
 .../zeppelin/spark/SparkSqlInterpreter.java     |  7 +-
 .../org/apache/zeppelin/spark/SparkVersion.java | 71 +++++++++++++-------
 .../apache/zeppelin/spark/SparkVersionTest.java | 17 ++++-
 5 files changed, 78 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
index 9bd258b..273c897 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java
@@ -284,6 +284,12 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
 
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
+    SparkInterpreter sparkInterpreter = getSparkInterpreter();
+    if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
+      return new InterpreterResult(Code.ERROR, "Spark "
+          + sparkInterpreter.getSparkVersion().toString() + " is not 
supported");
+    }
+
     if (!pythonscriptRunning) {
       return new InterpreterResult(Code.ERROR, "python process not running"
           + outputStream.toString());
@@ -314,7 +320,6 @@ public class PySparkInterpreter extends Interpreter 
implements ExecuteResultHand
           + outputStream.toString());
     }
 
-    SparkInterpreter sparkInterpreter = getSparkInterpreter();
     if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
       return new InterpreterResult(Code.ERROR, "pyspark "
           + sparkInterpreter.getSparkContext().version() + " is not 
supported");

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 528005e..54691c4 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -605,6 +605,11 @@ public class SparkInterpreter extends Interpreter {
    */
   @Override
   public InterpreterResult interpret(String line, InterpreterContext context) {
+    if (sparkVersion.isUnsupportedVersion()) {
+      return new InterpreterResult(Code.ERROR, "Spark " + 
sparkVersion.toString()
+          + " is not supported");
+    }
+
     z.setInterpreterContext(context);
     if (line == null || line.trim().length() == 0) {
       return new InterpreterResult(Code.SUCCESS);

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 1ee5f9c..88582cd 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -115,9 +115,14 @@ public class SparkSqlInterpreter extends Interpreter {
   @Override
   public InterpreterResult interpret(String st, InterpreterContext context) {
     SQLContext sqlc = null;
+    SparkInterpreter sparkInterpreter = getSparkInterpreter();
 
-    sqlc = getSparkInterpreter().getSQLContext();
+    if (sparkInterpreter.getSparkVersion().isUnsupportedVersion()) {
+      return new InterpreterResult(Code.ERROR, "Spark "
+          + sparkInterpreter.getSparkVersion().toString() + " is not 
supported");
+    }
 
+    sqlc = getSparkInterpreter().getSQLContext();
     SparkContext sc = sqlc.sparkContext();
     if (concurrentSQL()) {
       sc.setLocalProperty("spark.scheduler.pool", "fair");

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index e5256b7..a362938 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -16,29 +16,47 @@
  */
 package org.apache.zeppelin.spark;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Provide reading comparing capability of spark version returned from 
SparkContext.version()
  */
-public enum SparkVersion {
-  SPARK_1_0_0,
-  SPARK_1_0_1,
-  SPARK_1_1_0,
-  SPARK_1_1_1,
-  SPARK_1_2_0,
-  SPARK_1_2_1,
-  SPARK_1_2_2,
-  SPARK_1_3_0,
-  SPARK_1_3_1,
-  SPARK_1_4_0,
-  SPARK_1_4_1,
-  SPARK_1_5_0,
-  SPARK_1_5_1,
-  SPARK_1_5_2;
+public class SparkVersion {
+  Logger logger = LoggerFactory.getLogger(SparkVersion.class);
+
+  public static final SparkVersion SPARK_1_0_0 = 
SparkVersion.fromVersionString("1.0.0");
+  public static final SparkVersion SPARK_1_1_0 = 
SparkVersion.fromVersionString("1.1.0");
+  public static final SparkVersion SPARK_1_2_0 = 
SparkVersion.fromVersionString("1.2.0");
+  public static final SparkVersion SPARK_1_3_0 = 
SparkVersion.fromVersionString("1.3.0");
+  public static final SparkVersion SPARK_1_4_0 = 
SparkVersion.fromVersionString("1.4.0");
+  public static final SparkVersion SPARK_1_5_0 = 
SparkVersion.fromVersionString("1.5.0");
+  public static final SparkVersion SPARK_1_6_0 = 
SparkVersion.fromVersionString("1.6.0");
+
+  public static final SparkVersion MIN_SUPPORTED_VERSION =  SPARK_1_0_0;
+  public static final SparkVersion UNSUPPORTED_FUTURE_VERSION = SPARK_1_6_0;
 
   private int version;
+  private String versionString;
+
+  SparkVersion(String versionString) {
+    this.versionString = versionString;
 
-  SparkVersion() {
-    version = 
Integer.parseInt(name().substring("SPARK_".length()).replaceAll("_", ""));
+    try {
+      int pos = versionString.indexOf('-');
+
+      String numberPart = versionString;
+      if (pos > 0) {
+        numberPart = versionString.substring(0, pos);
+      }
+      version = Integer.parseInt(numberPart.replaceAll("\\.", ""));
+    } catch (Exception e) {
+      logger.error("Can not recognize Spark version " + versionString +
+          ". Assume it's a future release", e);
+
+      // assume it is future release
+      version = 999;
+    }
   }
 
   public int toNumber() {
@@ -46,17 +64,16 @@ public enum SparkVersion {
   }
 
   public String toString() {
-    return name().substring("SPARK_".length()).replaceAll("_", ".");
+    return versionString;
   }
 
+  public boolean isUnsupportedVersion() {
+    return olderThan(MIN_SUPPORTED_VERSION) || 
newerThanEquals(UNSUPPORTED_FUTURE_VERSION);
+  }
+
+
   public static SparkVersion fromVersionString(String versionString) {
-    for (SparkVersion v : values()) {
-      // Check for the beginning of the version string to allow for 
"1.5.0-SNAPSHOT"
-      if (versionString.startsWith(v.toString())) {
-        return v;
-      }
-    }
-    throw new IllegalArgumentException();
+    return new SparkVersion(versionString);
   }
 
   public boolean isPysparkSupported() {
@@ -79,6 +96,10 @@ public enum SparkVersion {
     return this.olderThan(SPARK_1_3_0);
   }
 
+  public boolean equals(Object versionToCompare) {
+    return version == ((SparkVersion) versionToCompare).version;
+  }
+
   public boolean newerThan(SparkVersion versionToCompare) {
     return version > versionToCompare.version;
   }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/01f4884a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
----------------------------------------------------------------------
diff --git 
a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java 
b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
index ab54b65..5783c1e 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkVersionTest.java
@@ -23,10 +23,23 @@ import org.junit.Test;
 public class SparkVersionTest {
 
   @Test
+  public void testUnknownSparkVersion() {
+    assertEquals(999, SparkVersion.fromVersionString("DEV-10.10").toNumber());
+  }
+
+  @Test
+  public void testUnsupportedVersion() {
+    assertTrue(SparkVersion.fromVersionString("9.9.9").isUnsupportedVersion());
+    
assertFalse(SparkVersion.fromVersionString("1.5.9").isUnsupportedVersion());
+    assertTrue(SparkVersion.fromVersionString("0.9.0").isUnsupportedVersion());
+    assertTrue(SparkVersion.UNSUPPORTED_FUTURE_VERSION.isUnsupportedVersion());
+  }
+
+  @Test
   public void testSparkVersion() {
     // test equals
-    assertTrue(SparkVersion.SPARK_1_2_0 == 
SparkVersion.fromVersionString("1.2.0"));
-    assertTrue(SparkVersion.SPARK_1_5_0 == 
SparkVersion.fromVersionString("1.5.0-SNAPSHOT"));
+    assertEquals(SparkVersion.SPARK_1_2_0, 
SparkVersion.fromVersionString("1.2.0"));
+    assertEquals(SparkVersion.SPARK_1_5_0, 
SparkVersion.fromVersionString("1.5.0-SNAPSHOT"));
 
     // test newer than
     assertFalse(SparkVersion.SPARK_1_2_0.newerThan(SparkVersion.SPARK_1_2_0));

Reply via email to