Repository: zeppelin
Updated Branches:
  refs/heads/master fc44693fe -> 64bbba479


ZEPPELIN-3255. Can not run spark1 and spark2 in one zeppelin instance

### What is this PR for?
Although #2750 enable the support of spark 2.3, it breaks the support of spark 
1.6. Users have to build zeppelin against spark 1.6 to make zeppelin work with 
that. But previous one zeppelin instance can work with multiple versions of 
spark. This PR introduce spark shims module which is to resolve the api 
incompatible issue between different versions of spark, so that one zeppelin 
instance can work with multiple versions of spark.

### What type of PR is it?
[ Improvement | Refactoring]

### Todos
* https://issues.apache.org/jira/browse/ZEPPELIN-3254  Although zeppelin should 
support to run multiple versions of spark in one instance, but our travis test 
doesn't cover it, ZEPPELIN-3254 would do that.

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3255

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2814 from zjffdu/ZEPPELIN-3255 and squashes the following commits:

68fa437 [Jeff Zhang] Remove akka from spark-dependencies
14fef45 [Jeff Zhang] ZEPPELIN-3255. Can not run spark1 and spark2 in one 
zeppelin instance


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/64bbba47
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/64bbba47
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/64bbba47

Branch: refs/heads/master
Commit: 64bbba4796fe1ddfd1ca1facde7dcda33ac86ef7
Parents: fc44693
Author: Jeff Zhang <zjf...@apache.org>
Authored: Sun Feb 25 21:24:08 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Tue Feb 27 14:41:57 2018 +0800

----------------------------------------------------------------------
 spark/interpreter/figure/null-1.png             | Bin 13599 -> 0 bytes
 spark/interpreter/pom.xml                       |  12 +
 .../zeppelin/spark/NewSparkInterpreter.java     |  67 +-----
 .../zeppelin/spark/OldSparkInterpreter.java     | 235 +------------------
 .../zeppelin/spark/OldSparkInterpreterTest.java |  10 +-
 spark/pom.xml                                   |  41 +---
 spark/spark-dependencies/pom.xml                |  41 +---
 spark/spark-shims/pom.xml                       |  70 ++++++
 .../org/apache/zeppelin/spark/SparkShims.java   | 110 +++++++++
 spark/spark1-shims/pom.xml                      |  89 +++++++
 .../org/apache/zeppelin/spark/Spark1Shims.java  |  57 +++++
 spark/spark2-shims/pom.xml                      |  88 +++++++
 .../org/apache/zeppelin/spark/Spark2Shims.java  |  36 +++
 13 files changed, 489 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/figure/null-1.png
----------------------------------------------------------------------
diff --git a/spark/interpreter/figure/null-1.png 
b/spark/interpreter/figure/null-1.png
deleted file mode 100644
index 8b1ce07..0000000
Binary files a/spark/interpreter/figure/null-1.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/pom.xml
----------------------------------------------------------------------
diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml
index e8d57a2..c89cfa6 100644
--- a/spark/interpreter/pom.xml
+++ b/spark/interpreter/pom.xml
@@ -83,6 +83,18 @@
 
     <dependency>
       <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark1-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark2-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-python</artifactId>
       <version>${project.version}</version>
       <exclusions>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
index 1d3ccd6..c8efa7a 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java
@@ -69,6 +69,7 @@ public class NewSparkInterpreter extends 
AbstractSparkInterpreter {
   private SparkVersion sparkVersion;
   private boolean enableSupportedVersionCheck;
   private String sparkUrl;
+  private SparkShims sparkShims;
 
   private static InterpreterHookRegistry hooks;
 
@@ -117,7 +118,8 @@ public class NewSparkInterpreter extends 
AbstractSparkInterpreter {
       sqlContext = this.innerInterpreter.sqlContext();
       sparkSession = this.innerInterpreter.sparkSession();
       sparkUrl = this.innerInterpreter.sparkUrl();
-      setupListeners();
+      sparkShims = SparkShims.getInstance(sc.version());
+      sparkShims.setupSparkListener(sparkUrl);
 
       hooks = getInterpreterGroup().getInterpreterHookRegistry();
       z = new SparkZeppelinContext(sc, hooks,
@@ -125,7 +127,7 @@ public class NewSparkInterpreter extends 
AbstractSparkInterpreter {
       this.innerInterpreter.bind("z", z.getClass().getCanonicalName(), z,
           Lists.newArrayList("@transient"));
     } catch (Exception e) {
-      LOGGER.error(ExceptionUtils.getStackTrace(e));
+      LOGGER.error("Fail to open SparkInterpreter", 
ExceptionUtils.getStackTrace(e));
       throw new InterpreterException("Fail to open SparkInterpreter", e);
     }
   }
@@ -213,67 +215,6 @@ public class NewSparkInterpreter extends 
AbstractSparkInterpreter {
     return innerInterpreter.getProgress(Utils.buildJobGroupId(context), 
context);
   }
 
-  private void setupListeners() {
-    JobProgressListener pl = new JobProgressListener(sc.getConf()) {
-      @Override
-      public synchronized void onJobStart(SparkListenerJobStart jobStart) {
-        super.onJobStart(jobStart);
-        int jobId = jobStart.jobId();
-        String jobGroupId = 
jobStart.properties().getProperty("spark.jobGroup.id");
-        String uiEnabled = 
jobStart.properties().getProperty("spark.ui.enabled");
-        String jobUrl = getJobUrl(jobId);
-        String noteId = Utils.getNoteId(jobGroupId);
-        String paragraphId = Utils.getParagraphId(jobGroupId);
-        // Button visible if Spark UI property not set, set as invalid boolean 
or true
-        java.lang.Boolean showSparkUI =
-            uiEnabled == null || 
!uiEnabled.trim().toLowerCase().equals("false");
-        if (showSparkUI && jobUrl != null) {
-          RemoteEventClientWrapper eventClient = 
BaseZeppelinContext.getEventClient();
-          Map<String, String> infos = new java.util.HashMap<>();
-          infos.put("jobUrl", jobUrl);
-          infos.put("label", "SPARK JOB");
-          infos.put("tooltip", "View in Spark web UI");
-          if (eventClient != null) {
-            eventClient.onParaInfosReceived(noteId, paragraphId, infos);
-          }
-        }
-      }
-
-      private String getJobUrl(int jobId) {
-        String jobUrl = null;
-        if (sparkUrl != null) {
-          jobUrl = sparkUrl + "/jobs/job?id=" + jobId;
-        }
-        return jobUrl;
-      }
-    };
-    try {
-      Object listenerBus = sc.getClass().getMethod("listenerBus").invoke(sc);
-      Method[] methods = listenerBus.getClass().getMethods();
-      Method addListenerMethod = null;
-      for (Method m : methods) {
-        if (!m.getName().equals("addListener")) {
-          continue;
-        }
-        Class<?>[] parameterTypes = m.getParameterTypes();
-        if (parameterTypes.length != 1) {
-          continue;
-        }
-        if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) {
-          continue;
-        }
-        addListenerMethod = m;
-        break;
-      }
-      if (addListenerMethod != null) {
-        addListenerMethod.invoke(listenerBus, pl);
-      }
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      LOGGER.error(e.toString(), e);
-    }
-  }
-
   public SparkZeppelinContext getZeppelinContext() {
     return this.z;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
index ff3a2ca..1f59d18 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java
@@ -151,6 +151,8 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
   private JavaSparkContext jsc;
   private boolean enableSupportedVersionCheck;
 
+  private SparkShims sparkShims;
+
   public OldSparkInterpreter(Properties property) {
     super(property);
     out = new InterpreterOutputStream(logger);
@@ -158,10 +160,10 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
 
   public OldSparkInterpreter(Properties property, SparkContext sc) {
     this(property);
-
     this.sc = sc;
     env = SparkEnv.get();
-    sparkListener = setupListeners(this.sc);
+    sparkShims = SparkShims.getInstance(sc.version());
+    sparkShims.setupSparkListener(sparkUrl);
   }
 
   public SparkContext getSparkContext() {
@@ -169,7 +171,6 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
       if (sc == null) {
         sc = createSparkContext();
         env = SparkEnv.get();
-        sparkListener = setupListeners(sc);
       }
       return sc;
     }
@@ -190,157 +191,6 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
     }
   }
 
-  static SparkListener setupListeners(SparkContext context) {
-    SparkListener pl = new SparkListener() {
-      @Override
-      public synchronized void onJobStart(SparkListenerJobStart jobStart) {
-        int jobId = jobStart.jobId();
-        String jobGroupId = 
jobStart.properties().getProperty("spark.jobGroup.id");
-        String uiEnabled = 
jobStart.properties().getProperty("spark.ui.enabled");
-        String jobUrl = getJobUrl(jobId);
-        String noteId = Utils.getNoteId(jobGroupId);
-        String paragraphId = Utils.getParagraphId(jobGroupId);
-        // Button visible if Spark UI property not set, set as invalid boolean 
or true
-        java.lang.Boolean showSparkUI =
-            uiEnabled == null || 
!uiEnabled.trim().toLowerCase().equals("false");
-        if (showSparkUI && jobUrl != null) {
-          RemoteEventClientWrapper eventClient = 
BaseZeppelinContext.getEventClient();
-          Map<String, String> infos = new java.util.HashMap<>();
-          infos.put("jobUrl", jobUrl);
-          infos.put("label", "SPARK JOB");
-          infos.put("tooltip", "View in Spark web UI");
-          if (eventClient != null) {
-            eventClient.onParaInfosReceived(noteId, paragraphId, infos);
-          }
-        }
-      }
-
-      private String getJobUrl(int jobId) {
-        String jobUrl = null;
-        if (sparkUrl != null) {
-          jobUrl = sparkUrl + "/jobs/job/?id=" + jobId;
-        }
-        return jobUrl;
-      }
-
-      @Override
-      public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
-
-      }
-
-      @Override
-      public void onExecutorRemoved(SparkListenerExecutorRemoved 
executorRemoved) {
-
-      }
-
-      @Override
-      public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
-
-      }
-
-      @Override
-      public void onExecutorMetricsUpdate(
-          SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
-
-      }
-
-      @Override
-      public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) 
{
-
-      }
-
-      @Override
-      public void onApplicationStart(SparkListenerApplicationStart 
applicationStart) {
-
-      }
-
-      @Override
-      public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
-
-      }
-
-      @Override
-      public void onBlockManagerAdded(SparkListenerBlockManagerAdded 
blockManagerAdded) {
-
-      }
-
-      @Override
-      public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved 
blockManagerRemoved) {
-
-      }
-
-      @Override
-      public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate 
environmentUpdate) {
-
-      }
-
-      @Override
-      public void onJobEnd(SparkListenerJobEnd jobEnd) {
-
-      }
-      
-      @Override
-      public void onStageCompleted(SparkListenerStageCompleted stageCompleted) 
{
-
-      }
-
-      @Override
-      public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) 
{
-
-      }
-
-      @Override
-      public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
-
-      }
-
-      @Override
-      public void onTaskGettingResult(SparkListenerTaskGettingResult 
taskGettingResult) {
-
-      }
-
-      @Override
-      public void onTaskStart(SparkListenerTaskStart taskStart) {
-
-      }
-    };
-    try {
-      Object listenerBus = 
context.getClass().getMethod("listenerBus").invoke(context);
-
-      Method[] methods = listenerBus.getClass().getMethods();
-      Method addListenerMethod = null;
-      for (Method m : methods) {
-        if (!m.getName().equals("addListener")) {
-          continue;
-        }
-
-        Class<?>[] parameterTypes = m.getParameterTypes();
-
-        if (parameterTypes.length != 1) {
-          continue;
-        }
-
-        if (!parameterTypes[0].isAssignableFrom(SparkListener.class)) {
-          continue;
-        }
-
-        addListenerMethod = m;
-        break;
-      }
-
-      if (addListenerMethod != null) {
-        addListenerMethod.invoke(listenerBus, pl);
-      } else {
-        return null;
-      }
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      logger.error(e.toString(), e);
-      return null;
-    }
-    return pl;
-  }
-
   private boolean useHiveContext() {
     return 
java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
   }
@@ -1020,6 +870,10 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
       }
     }
 
+    sparkUrl = getSparkUIUrl();
+    sparkShims = SparkShims.getInstance(sc.version());
+    sparkShims.setupSparkListener(sparkUrl);
+
     numReferenceOfSparkContext.incrementAndGet();
   }
 
@@ -1373,75 +1227,6 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
     return JobProgressUtil.progress(sc, jobGroup);
   }
 
-  private int[] getProgressFromStage_1_0x(SparkListener sparkListener, Object 
stage)
-      throws IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException, NoSuchMethodException, SecurityException {
-    int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
-    int completedTasks = 0;
-
-    int id = (int) stage.getClass().getMethod("id").invoke(stage);
-
-    Object completedTaskInfo = null;
-
-    completedTaskInfo = JavaConversions.mapAsJavaMap(
-        (HashMap<Object, Object>) sparkListener.getClass()
-            
.getMethod("stageIdToTasksComplete").invoke(sparkListener)).get(id);
-
-    if (completedTaskInfo != null) {
-      completedTasks += (int) completedTaskInfo;
-    }
-    List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) 
stage.getClass()
-        .getMethod("parents").invoke(stage));
-    if (parents != null) {
-      for (Object s : parents) {
-        int[] p = getProgressFromStage_1_0x(sparkListener, s);
-        numTasks += p[0];
-        completedTasks += p[1];
-      }
-    }
-
-    return new int[] {numTasks, completedTasks};
-  }
-
-  private int[] getProgressFromStage_1_1x(SparkListener sparkListener, Object 
stage)
-      throws IllegalAccessException, IllegalArgumentException,
-      InvocationTargetException, NoSuchMethodException, SecurityException {
-    int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage);
-    int completedTasks = 0;
-    int id = (int) stage.getClass().getMethod("id").invoke(stage);
-
-    try {
-      Method stageIdToData = 
sparkListener.getClass().getMethod("stageIdToData");
-      HashMap<Tuple2<Object, Object>, Object> stageIdData =
-          (HashMap<Tuple2<Object, Object>, Object>) 
stageIdToData.invoke(sparkListener);
-      Class<?> stageUIDataClass =
-          
this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData");
-
-      Method numCompletedTasks = 
stageUIDataClass.getMethod("numCompleteTasks");
-      Set<Tuple2<Object, Object>> keys =
-          JavaConverters.setAsJavaSetConverter(stageIdData.keySet()).asJava();
-      for (Tuple2<Object, Object> k : keys) {
-        if (id == (int) k._1()) {
-          Object uiData = stageIdData.get(k).get();
-          completedTasks += (int) numCompletedTasks.invoke(uiData);
-        }
-      }
-    } catch (Exception e) {
-      logger.error("Error on getting progress information", e);
-    }
-
-    List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) 
stage.getClass()
-        .getMethod("parents").invoke(stage));
-    if (parents != null) {
-      for (Object s : parents) {
-        int[] p = getProgressFromStage_1_1x(sparkListener, s);
-        numTasks += p[0];
-        completedTasks += p[1];
-      }
-    }
-    return new int[] {numTasks, completedTasks};
-  }
-
   private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
     if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
       return Code.SUCCESS;
@@ -1479,10 +1264,6 @@ public class OldSparkInterpreter extends 
AbstractSparkInterpreter {
     return FormType.NATIVE;
   }
 
-  public SparkListener getJobProgressListener() {
-    return sparkListener;
-  }
-
   @Override
   public Scheduler getScheduler() {
     return SchedulerFactory.singleton().createOrGetFIFOScheduler(

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
index 14214a2..068ff50 100644
--- 
a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
+++ 
b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java
@@ -192,13 +192,7 @@ public class OldSparkInterpreterTest {
   public void testEndWithComment() throws InterpreterException {
     assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val 
c=1\n//comment", context).code());
   }
-
-  @Test
-  public void testListener() {
-    SparkContext sc = repl.getSparkContext();
-    assertNotNull(OldSparkInterpreter.setupListeners(sc));
-  }
-
+  
   @Test
   public void testCreateDataFrame() throws InterpreterException {
     if (getSparkVersionNumber(repl) >= 13) {
@@ -362,7 +356,7 @@ public class OldSparkInterpreterTest {
     }
     String sparkUIUrl = repl.getSparkUIUrl();
     assertNotNull(jobUrl);
-    assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job/?id="));
+    assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id="));
 
   }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 7a0c7c2..def865e 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -63,21 +63,26 @@
         <module>scala-2.10</module>
         <module>scala-2.11</module>
         <module>spark-dependencies</module>
+        <module>spark-shims</module>
+        <module>spark1-shims</module>
+        <module>spark2-shims</module>
     </modules>
 
     <dependencies>
 
         <dependency>
-            <groupId>org.apache.zeppelin</groupId>
-            <artifactId>zeppelin-interpreter</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.zeppelin</groupId>
-            <artifactId>zeppelin-display</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
         </dependency>
 
         <dependency>
@@ -92,28 +97,6 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.datanucleus</groupId>
-            <artifactId>datanucleus-core</artifactId>
-            <version>${datanucleus.core.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.datanucleus</groupId>
-            <artifactId>datanucleus-api-jdo</artifactId>
-            <version>${datanucleus.apijdo.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.datanucleus</groupId>
-            <artifactId>datanucleus-rdbms</artifactId>
-            <version>${datanucleus.rdbms.version}</version>
-            <scope>test</scope>
-        </dependency>
-
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml
index 58977b4..4e90a93 100644
--- a/spark/spark-dependencies/pom.xml
+++ b/spark/spark-dependencies/pom.xml
@@ -294,46 +294,7 @@
       <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
     </dependency>
-
-
-    <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <version>${protobuf.version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-actor_${scala.binary.version}</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-remote_${scala.binary.version}</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-testkit_${scala.binary.version}</artifactId>
-      <version>${akka.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>${akka.group}</groupId>
-      <artifactId>akka-zeromq_${scala.binary.version}</artifactId>
-      <version>${akka.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>${akka.group}</groupId>
-          <artifactId>akka-actor_${scala.binary.version}</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
+    
     <!-- yarn (not supported for Spark v1.5.0 or higher) -->
     <dependency>
       <groupId>org.apache.spark</groupId>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark-shims/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark-shims/pom.xml b/spark/spark-shims/pom.xml
new file mode 100644
index 0000000..619c7a4
--- /dev/null
+++ b/spark/spark-shims/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <parent>
+    <artifactId>spark-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>spark-shims</artifactId>
+  <version>0.9.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <name>Zeppelin: Spark Shims</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-interpreter-setting</id>
+            <phase>none</phase>
+            <configuration>
+              <skip>true</skip>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
----------------------------------------------------------------------
diff --git 
a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java 
b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
new file mode 100644
index 0000000..acf717c
--- /dev/null
+++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.spark;
+
+
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This is abstract class for anything that is api incompatible between spark1 
and spark2.
+ * It will load the correct version of SparkShims based on the version of 
Spark.
+ */
+public abstract class SparkShims {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkShims.class);
+
+  private static SparkShims sparkShims;
+
+  private static SparkShims loadShims(String sparkVersion) throws 
ReflectiveOperationException {
+    Class<?> sparkShimsClass;
+    if ("2".equals(sparkVersion)) {
+      LOGGER.info("Initializing shims for Spark 2.x");
+      sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark2Shims");
+    } else {
+      LOGGER.info("Initializing shims for Spark 1.x");
+      sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims");
+    }
+
+    Constructor c = sparkShimsClass.getConstructor();
+    return (SparkShims) c.newInstance();
+  }
+
+  public static SparkShims getInstance(String sparkVersion) {
+    if (sparkShims == null) {
+      String sparkMajorVersion = getSparkMajorVersion(sparkVersion);
+      try {
+        sparkShims = loadShims(sparkMajorVersion);
+      } catch (ReflectiveOperationException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return sparkShims;
+  }
+
+  private static String getSparkMajorVersion(String sparkVersion) {
+    return sparkVersion.startsWith("2") ? "2" : "1";
+  }
+
+  /**
+   * This is due to SparkListener api change between spark1 and spark2.
+   * SparkListener is trait in spark1 while it is abstract class in spark2.
+   */
+  public abstract void setupSparkListener(String sparkWebUrl);
+
+
+  protected String getNoteId(String jobgroupId) {
+    int indexOf = jobgroupId.indexOf("-");
+    int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
+    return jobgroupId.substring(indexOf + 1, secondIndex);
+  }
+
+  protected String getParagraphId(String jobgroupId) {
+    int indexOf = jobgroupId.indexOf("-");
+    int secondIndex = jobgroupId.indexOf("-", indexOf + 1);
+    return jobgroupId.substring(secondIndex + 1, jobgroupId.length());
+  }
+
+  protected void buildSparkJobUrl(String sparkWebUrl, int jobId, Properties 
jobProperties) {
+    String jobGroupId = jobProperties.getProperty("spark.jobGroup.id");
+    String uiEnabled = jobProperties.getProperty("spark.ui.enabled");
+    String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId;
+    String noteId = getNoteId(jobGroupId);
+    String paragraphId = getParagraphId(jobGroupId);
+    // Button visible if Spark UI property not set, set as invalid boolean or 
true
+    boolean showSparkUI =
+        uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false");
+    if (showSparkUI && jobUrl != null) {
+      RemoteEventClientWrapper eventClient = 
BaseZeppelinContext.getEventClient();
+      Map<String, String> infos = new java.util.HashMap<String, String>();
+      infos.put("jobUrl", jobUrl);
+      infos.put("label", "SPARK JOB");
+      infos.put("tooltip", "View in Spark web UI");
+      if (eventClient != null) {
+        eventClient.onParaInfosReceived(noteId, paragraphId, infos);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark1-shims/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark1-shims/pom.xml b/spark/spark1-shims/pom.xml
new file mode 100644
index 0000000..93640c6
--- /dev/null
+++ b/spark/spark1-shims/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <parent>
+    <artifactId>spark-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>spark1-shims</artifactId>
+  <version>0.9.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <name>Zeppelin: Spark1 Shims</name>
+
+  <properties>
+    <scala.binary.version>2.10</scala.binary.version>
+    <spark.version>1.6.3</spark.version>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-interpreter-setting</id>
+            <phase>none</phase>
+            <configuration>
+              <skip>true</skip>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
----------------------------------------------------------------------
diff --git 
a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java 
b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
new file mode 100644
index 0000000..9f23313
--- /dev/null
+++ 
b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.spark;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerApplicationEnd;
+import org.apache.spark.scheduler.SparkListenerApplicationStart;
+import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
+import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
+import org.apache.spark.scheduler.SparkListenerBlockUpdated;
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorAdded;
+import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
+import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.SparkListenerTaskEnd;
+import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
+import org.apache.spark.scheduler.SparkListenerTaskStart;
+import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+import org.apache.spark.ui.jobs.JobProgressListener;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper;
+
+import java.util.Map;
+
+public class Spark1Shims extends SparkShims {
+
+  public void setupSparkListener(final String sparkWebUrl) {
+    SparkContext sc = SparkContext.getOrCreate();
+    sc.addSparkListener(new JobProgressListener(sc.getConf()) {
+      @Override
+      public void onJobStart(SparkListenerJobStart jobStart) {
+        buildSparkJobUrl(sparkWebUrl, jobStart.jobId(), jobStart.properties());
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark2-shims/pom.xml
----------------------------------------------------------------------
diff --git a/spark/spark2-shims/pom.xml b/spark/spark2-shims/pom.xml
new file mode 100644
index 0000000..000e3ab
--- /dev/null
+++ b/spark/spark2-shims/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <parent>
+    <artifactId>spark-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>spark2-shims</artifactId>
+  <version>0.9.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  <name>Zeppelin: Spark2 Shims</name>
+
+  <properties>
+    <scala.binary.version>2.11</scala.binary.version>
+    <spark.version>2.1.2</spark.version>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>spark-shims</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-interpreter-setting</id>
+            <phase>none</phase>
+            <configuration>
+              <skip>true</skip>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
----------------------------------------------------------------------
diff --git 
a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java 
b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
new file mode 100644
index 0000000..4b39610
--- /dev/null
+++ 
b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.spark;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+
+public class Spark2Shims extends SparkShims {
+
+  public void setupSparkListener(final String sparkWebUrl) {
+    SparkContext sc = SparkContext.getOrCreate();
+    sc.addSparkListener(new SparkListener() {
+      @Override
+      public void onJobStart(SparkListenerJobStart jobStart) {
+        buildSparkJobUrl(sparkWebUrl, jobStart.jobId(), jobStart.properties());
+      }
+    });
+  }
+}

Reply via email to