Repository: zeppelin Updated Branches: refs/heads/master fc44693fe -> 64bbba479
ZEPPELIN-3255. Can not run spark1 and spark2 in one zeppelin instance ### What is this PR for? Although #2750 enable the support of spark 2.3, it breaks the support of spark 1.6. Users have to build zeppelin against spark 1.6 to make zeppelin work with that. But previous one zeppelin instance can work with multiple versions of spark. This PR introduce spark shims module which is to resolve the api incompatible issue between different versions of spark, so that one zeppelin instance can work with multiple versions of spark. ### What type of PR is it? [ Improvement | Refactoring] ### Todos * https://issues.apache.org/jira/browse/ZEPPELIN-3254 Although zeppelin should support to run multiple versions of spark in one instance, but our travis test doesn't cover it, ZEPPELIN-3254 would do that. ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3255 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #2814 from zjffdu/ZEPPELIN-3255 and squashes the following commits: 68fa437 [Jeff Zhang] Remove akka from spark-dependencies 14fef45 [Jeff Zhang] ZEPPELIN-3255. Can not run spark1 and spark2 in one zeppelin instance Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/64bbba47 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/64bbba47 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/64bbba47 Branch: refs/heads/master Commit: 64bbba4796fe1ddfd1ca1facde7dcda33ac86ef7 Parents: fc44693 Author: Jeff Zhang <zjf...@apache.org> Authored: Sun Feb 25 21:24:08 2018 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Tue Feb 27 14:41:57 2018 +0800 ---------------------------------------------------------------------- spark/interpreter/figure/null-1.png | Bin 13599 -> 0 bytes spark/interpreter/pom.xml | 12 + .../zeppelin/spark/NewSparkInterpreter.java | 67 +----- .../zeppelin/spark/OldSparkInterpreter.java | 235 +------------------ .../zeppelin/spark/OldSparkInterpreterTest.java | 10 +- spark/pom.xml | 41 +--- spark/spark-dependencies/pom.xml | 41 +--- spark/spark-shims/pom.xml | 70 ++++++ .../org/apache/zeppelin/spark/SparkShims.java | 110 +++++++++ spark/spark1-shims/pom.xml | 89 +++++++ .../org/apache/zeppelin/spark/Spark1Shims.java | 57 +++++ spark/spark2-shims/pom.xml | 88 +++++++ .../org/apache/zeppelin/spark/Spark2Shims.java | 36 +++ 13 files changed, 489 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/figure/null-1.png ---------------------------------------------------------------------- diff --git a/spark/interpreter/figure/null-1.png b/spark/interpreter/figure/null-1.png deleted file mode 100644 index 8b1ce07..0000000 Binary files a/spark/interpreter/figure/null-1.png and /dev/null differ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/pom.xml ---------------------------------------------------------------------- diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index e8d57a2..c89cfa6 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -83,6 +83,18 @@ <dependency> <groupId>org.apache.zeppelin</groupId> + <artifactId>spark1-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark2-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> <artifactId>zeppelin-python</artifactId> <version>${project.version}</version> <exclusions> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java index 1d3ccd6..c8efa7a 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java @@ -69,6 +69,7 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { private SparkVersion sparkVersion; private boolean enableSupportedVersionCheck; private String sparkUrl; + private SparkShims sparkShims; private static InterpreterHookRegistry hooks; @@ -117,7 +118,8 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { sqlContext = this.innerInterpreter.sqlContext(); sparkSession = this.innerInterpreter.sparkSession(); sparkUrl = this.innerInterpreter.sparkUrl(); - setupListeners(); + sparkShims = SparkShims.getInstance(sc.version()); + sparkShims.setupSparkListener(sparkUrl); hooks = getInterpreterGroup().getInterpreterHookRegistry(); z = new SparkZeppelinContext(sc, hooks, @@ -125,7 +127,7 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { this.innerInterpreter.bind("z", z.getClass().getCanonicalName(), z, Lists.newArrayList("@transient")); } catch (Exception e) { - LOGGER.error(ExceptionUtils.getStackTrace(e)); + LOGGER.error("Fail to open SparkInterpreter", ExceptionUtils.getStackTrace(e)); throw new InterpreterException("Fail to open SparkInterpreter", e); } } @@ -213,67 +215,6 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context); } - private void setupListeners() { - JobProgressListener pl = new JobProgressListener(sc.getConf()) { - @Override - public synchronized void onJobStart(SparkListenerJobStart jobStart) { - super.onJobStart(jobStart); - int jobId = jobStart.jobId(); - String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id"); - String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled"); - String jobUrl = getJobUrl(jobId); - String noteId = Utils.getNoteId(jobGroupId); - String paragraphId = Utils.getParagraphId(jobGroupId); - // Button visible if Spark UI property not set, set as invalid boolean or true - java.lang.Boolean showSparkUI = - uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); - if (showSparkUI && jobUrl != null) { - RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); - Map<String, String> infos = new java.util.HashMap<>(); - infos.put("jobUrl", jobUrl); - infos.put("label", "SPARK JOB"); - infos.put("tooltip", "View in Spark web UI"); - if (eventClient != null) { - eventClient.onParaInfosReceived(noteId, paragraphId, infos); - } - } - } - - private String getJobUrl(int jobId) { - String jobUrl = null; - if (sparkUrl != null) { - jobUrl = sparkUrl + "/jobs/job?id=" + jobId; - } - return jobUrl; - } - }; - try { - Object listenerBus = sc.getClass().getMethod("listenerBus").invoke(sc); - Method[] methods = listenerBus.getClass().getMethods(); - Method addListenerMethod = null; - for (Method m : methods) { - if (!m.getName().equals("addListener")) { - continue; - } - Class<?>[] parameterTypes = m.getParameterTypes(); - if (parameterTypes.length != 1) { - continue; - } - if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) { - continue; - } - addListenerMethod = m; - break; - } - if (addListenerMethod != null) { - addListenerMethod.invoke(listenerBus, pl); - } - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - LOGGER.error(e.toString(), e); - } - } - public SparkZeppelinContext getZeppelinContext() { return this.z; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java index ff3a2ca..1f59d18 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java @@ -151,6 +151,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { private JavaSparkContext jsc; private boolean enableSupportedVersionCheck; + private SparkShims sparkShims; + public OldSparkInterpreter(Properties property) { super(property); out = new InterpreterOutputStream(logger); @@ -158,10 +160,10 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { public OldSparkInterpreter(Properties property, SparkContext sc) { this(property); - this.sc = sc; env = SparkEnv.get(); - sparkListener = setupListeners(this.sc); + sparkShims = SparkShims.getInstance(sc.version()); + sparkShims.setupSparkListener(sparkUrl); } public SparkContext getSparkContext() { @@ -169,7 +171,6 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { if (sc == null) { sc = createSparkContext(); env = SparkEnv.get(); - sparkListener = setupListeners(sc); } return sc; } @@ -190,157 +191,6 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { } } - static SparkListener setupListeners(SparkContext context) { - SparkListener pl = new SparkListener() { - @Override - public synchronized void onJobStart(SparkListenerJobStart jobStart) { - int jobId = jobStart.jobId(); - String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id"); - String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled"); - String jobUrl = getJobUrl(jobId); - String noteId = Utils.getNoteId(jobGroupId); - String paragraphId = Utils.getParagraphId(jobGroupId); - // Button visible if Spark UI property not set, set as invalid boolean or true - java.lang.Boolean showSparkUI = - uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); - if (showSparkUI && jobUrl != null) { - RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); - Map<String, String> infos = new java.util.HashMap<>(); - infos.put("jobUrl", jobUrl); - infos.put("label", "SPARK JOB"); - infos.put("tooltip", "View in Spark web UI"); - if (eventClient != null) { - eventClient.onParaInfosReceived(noteId, paragraphId, infos); - } - } - } - - private String getJobUrl(int jobId) { - String jobUrl = null; - if (sparkUrl != null) { - jobUrl = sparkUrl + "/jobs/job/?id=" + jobId; - } - return jobUrl; - } - - @Override - public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { - - } - - @Override - public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { - - } - - @Override - public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { - - } - - @Override - public void onExecutorMetricsUpdate( - SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { - - } - - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { - - } - - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStart) { - - } - - @Override - public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { - - } - - @Override - public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { - - } - - @Override - public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { - - } - - @Override - public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { - - } - - @Override - public void onJobEnd(SparkListenerJobEnd jobEnd) { - - } - - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - - } - - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - - } - - @Override - public void onTaskEnd(SparkListenerTaskEnd taskEnd) { - - } - - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { - - } - - @Override - public void onTaskStart(SparkListenerTaskStart taskStart) { - - } - }; - try { - Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context); - - Method[] methods = listenerBus.getClass().getMethods(); - Method addListenerMethod = null; - for (Method m : methods) { - if (!m.getName().equals("addListener")) { - continue; - } - - Class<?>[] parameterTypes = m.getParameterTypes(); - - if (parameterTypes.length != 1) { - continue; - } - - if (!parameterTypes[0].isAssignableFrom(SparkListener.class)) { - continue; - } - - addListenerMethod = m; - break; - } - - if (addListenerMethod != null) { - addListenerMethod.invoke(listenerBus, pl); - } else { - return null; - } - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - logger.error(e.toString(), e); - return null; - } - return pl; - } - private boolean useHiveContext() { return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")); } @@ -1020,6 +870,10 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { } } + sparkUrl = getSparkUIUrl(); + sparkShims = SparkShims.getInstance(sc.version()); + sparkShims.setupSparkListener(sparkUrl); + numReferenceOfSparkContext.incrementAndGet(); } @@ -1373,75 +1227,6 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { return JobProgressUtil.progress(sc, jobGroup); } - private int[] getProgressFromStage_1_0x(SparkListener sparkListener, Object stage) - throws IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException, SecurityException { - int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage); - int completedTasks = 0; - - int id = (int) stage.getClass().getMethod("id").invoke(stage); - - Object completedTaskInfo = null; - - completedTaskInfo = JavaConversions.mapAsJavaMap( - (HashMap<Object, Object>) sparkListener.getClass() - .getMethod("stageIdToTasksComplete").invoke(sparkListener)).get(id); - - if (completedTaskInfo != null) { - completedTasks += (int) completedTaskInfo; - } - List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) stage.getClass() - .getMethod("parents").invoke(stage)); - if (parents != null) { - for (Object s : parents) { - int[] p = getProgressFromStage_1_0x(sparkListener, s); - numTasks += p[0]; - completedTasks += p[1]; - } - } - - return new int[] {numTasks, completedTasks}; - } - - private int[] getProgressFromStage_1_1x(SparkListener sparkListener, Object stage) - throws IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException, SecurityException { - int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage); - int completedTasks = 0; - int id = (int) stage.getClass().getMethod("id").invoke(stage); - - try { - Method stageIdToData = sparkListener.getClass().getMethod("stageIdToData"); - HashMap<Tuple2<Object, Object>, Object> stageIdData = - (HashMap<Tuple2<Object, Object>, Object>) stageIdToData.invoke(sparkListener); - Class<?> stageUIDataClass = - this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData"); - - Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks"); - Set<Tuple2<Object, Object>> keys = - JavaConverters.setAsJavaSetConverter(stageIdData.keySet()).asJava(); - for (Tuple2<Object, Object> k : keys) { - if (id == (int) k._1()) { - Object uiData = stageIdData.get(k).get(); - completedTasks += (int) numCompletedTasks.invoke(uiData); - } - } - } catch (Exception e) { - logger.error("Error on getting progress information", e); - } - - List<Object> parents = JavaConversions.seqAsJavaList((Seq<Object>) stage.getClass() - .getMethod("parents").invoke(stage)); - if (parents != null) { - for (Object s : parents) { - int[] p = getProgressFromStage_1_1x(sparkListener, s); - numTasks += p[0]; - completedTasks += p[1]; - } - } - return new int[] {numTasks, completedTasks}; - } - private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { return Code.SUCCESS; @@ -1479,10 +1264,6 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { return FormType.NATIVE; } - public SparkListener getJobProgressListener() { - return sparkListener; - } - @Override public Scheduler getScheduler() { return SchedulerFactory.singleton().createOrGetFIFOScheduler( http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java index 14214a2..068ff50 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java @@ -192,13 +192,7 @@ public class OldSparkInterpreterTest { public void testEndWithComment() throws InterpreterException { assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); } - - @Test - public void testListener() { - SparkContext sc = repl.getSparkContext(); - assertNotNull(OldSparkInterpreter.setupListeners(sc)); - } - + @Test public void testCreateDataFrame() throws InterpreterException { if (getSparkVersionNumber(repl) >= 13) { @@ -362,7 +356,7 @@ public class OldSparkInterpreterTest { } String sparkUIUrl = repl.getSparkUIUrl(); assertNotNull(jobUrl); - assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job/?id=")); + assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id=")); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index 7a0c7c2..def865e 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -63,21 +63,26 @@ <module>scala-2.10</module> <module>scala-2.11</module> <module>spark-dependencies</module> + <module>spark-shims</module> + <module>spark1-shims</module> + <module>spark2-shims</module> </modules> <dependencies> <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>zeppelin-interpreter</artifactId> - <version>${project.version}</version> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>org.apache.zeppelin</groupId> - <artifactId>zeppelin-display</artifactId> - <version>${project.version}</version> - <scope>test</scope> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> </dependency> <dependency> @@ -92,28 +97,6 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.datanucleus</groupId> - <artifactId>datanucleus-core</artifactId> - <version>${datanucleus.core.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.datanucleus</groupId> - <artifactId>datanucleus-api-jdo</artifactId> - <version>${datanucleus.apijdo.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.datanucleus</groupId> - <artifactId>datanucleus-rdbms</artifactId> - <version>${datanucleus.rdbms.version}</version> - <scope>test</scope> - </dependency> - </dependencies> <build> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark-dependencies/pom.xml ---------------------------------------------------------------------- diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml index 58977b4..4e90a93 100644 --- a/spark/spark-dependencies/pom.xml +++ b/spark/spark-dependencies/pom.xml @@ -294,46 +294,7 @@ <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> - - - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>${protobuf.version}</version> - </dependency> - - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-actor_${scala.binary.version}</artifactId> - <version>${akka.version}</version> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-remote_${scala.binary.version}</artifactId> - <version>${akka.version}</version> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-slf4j_${scala.binary.version}</artifactId> - <version>${akka.version}</version> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-testkit_${scala.binary.version}</artifactId> - <version>${akka.version}</version> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> - <artifactId>akka-zeromq_${scala.binary.version}</artifactId> - <version>${akka.version}</version> - <exclusions> - <exclusion> - <groupId>${akka.group}</groupId> - <artifactId>akka-actor_${scala.binary.version}</artifactId> - </exclusion> - </exclusions> - </dependency> - + <!-- yarn (not supported for Spark v1.5.0 or higher) --> <dependency> <groupId>org.apache.spark</groupId> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark-shims/pom.xml ---------------------------------------------------------------------- diff --git a/spark/spark-shims/pom.xml b/spark/spark-shims/pom.xml new file mode 100644 index 0000000..619c7a4 --- /dev/null +++ b/spark/spark-shims/pom.xml @@ -0,0 +1,70 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <artifactId>spark-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark-shims</artifactId> + <version>0.9.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Spark Shims</name> + + <dependencies> + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java ---------------------------------------------------------------------- diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java new file mode 100644 index 0000000..acf717c --- /dev/null +++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + + +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.util.Map; +import java.util.Properties; + +/** + * This is abstract class for anything that is api incompatible between spark1 and spark2. + * It will load the correct version of SparkShims based on the version of Spark. + */ +public abstract class SparkShims { + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkShims.class); + + private static SparkShims sparkShims; + + private static SparkShims loadShims(String sparkVersion) throws ReflectiveOperationException { + Class<?> sparkShimsClass; + if ("2".equals(sparkVersion)) { + LOGGER.info("Initializing shims for Spark 2.x"); + sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark2Shims"); + } else { + LOGGER.info("Initializing shims for Spark 1.x"); + sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims"); + } + + Constructor c = sparkShimsClass.getConstructor(); + return (SparkShims) c.newInstance(); + } + + public static SparkShims getInstance(String sparkVersion) { + if (sparkShims == null) { + String sparkMajorVersion = getSparkMajorVersion(sparkVersion); + try { + sparkShims = loadShims(sparkMajorVersion); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + return sparkShims; + } + + private static String getSparkMajorVersion(String sparkVersion) { + return sparkVersion.startsWith("2") ? "2" : "1"; + } + + /** + * This is due to SparkListener api change between spark1 and spark2. + * SparkListener is trait in spark1 while it is abstract class in spark2. + */ + public abstract void setupSparkListener(String sparkWebUrl); + + + protected String getNoteId(String jobgroupId) { + int indexOf = jobgroupId.indexOf("-"); + int secondIndex = jobgroupId.indexOf("-", indexOf + 1); + return jobgroupId.substring(indexOf + 1, secondIndex); + } + + protected String getParagraphId(String jobgroupId) { + int indexOf = jobgroupId.indexOf("-"); + int secondIndex = jobgroupId.indexOf("-", indexOf + 1); + return jobgroupId.substring(secondIndex + 1, jobgroupId.length()); + } + + protected void buildSparkJobUrl(String sparkWebUrl, int jobId, Properties jobProperties) { + String jobGroupId = jobProperties.getProperty("spark.jobGroup.id"); + String uiEnabled = jobProperties.getProperty("spark.ui.enabled"); + String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId; + String noteId = getNoteId(jobGroupId); + String paragraphId = getParagraphId(jobGroupId); + // Button visible if Spark UI property not set, set as invalid boolean or true + boolean showSparkUI = + uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); + if (showSparkUI && jobUrl != null) { + RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); + Map<String, String> infos = new java.util.HashMap<String, String>(); + infos.put("jobUrl", jobUrl); + infos.put("label", "SPARK JOB"); + infos.put("tooltip", "View in Spark web UI"); + if (eventClient != null) { + eventClient.onParaInfosReceived(noteId, paragraphId, infos); + } + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark1-shims/pom.xml ---------------------------------------------------------------------- diff --git a/spark/spark1-shims/pom.xml b/spark/spark1-shims/pom.xml new file mode 100644 index 0000000..93640c6 --- /dev/null +++ b/spark/spark1-shims/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <artifactId>spark-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark1-shims</artifactId> + <version>0.9.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Spark1 Shims</name> + + <properties> + <scala.binary.version>2.10</scala.binary.version> + <spark.version>1.6.3</spark.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java ---------------------------------------------------------------------- diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java new file mode 100644 index 0000000..9f23313 --- /dev/null +++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerBlockUpdated; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorAdded; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorRemoved; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; +import org.apache.spark.ui.jobs.JobProgressListener; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; + +import java.util.Map; + +public class Spark1Shims extends SparkShims { + + public void setupSparkListener(final String sparkWebUrl) { + SparkContext sc = SparkContext.getOrCreate(); + sc.addSparkListener(new JobProgressListener(sc.getConf()) { + @Override + public void onJobStart(SparkListenerJobStart jobStart) { + buildSparkJobUrl(sparkWebUrl, jobStart.jobId(), jobStart.properties()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark2-shims/pom.xml ---------------------------------------------------------------------- diff --git a/spark/spark2-shims/pom.xml b/spark/spark2-shims/pom.xml new file mode 100644 index 0000000..000e3ab --- /dev/null +++ b/spark/spark2-shims/pom.xml @@ -0,0 +1,88 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>spark-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark2-shims</artifactId> + <version>0.9.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>Zeppelin: Spark2 Shims</name> + + <properties> + <scala.binary.version>2.11</scala.binary.version> + <spark.version>2.1.2</spark.version> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>spark-shims</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <executions> + <execution> + <id>copy-interpreter-setting</id> + <phase>none</phase> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/64bbba47/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java ---------------------------------------------------------------------- diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java new file mode 100644 index 0000000..4b39610 --- /dev/null +++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerJobStart; + +public class Spark2Shims extends SparkShims { + + public void setupSparkListener(final String sparkWebUrl) { + SparkContext sc = SparkContext.getOrCreate(); + sc.addSparkListener(new SparkListener() { + @Override + public void onJobStart(SparkListenerJobStart jobStart) { + buildSparkJobUrl(sparkWebUrl, jobStart.jobId(), jobStart.properties()); + } + }); + } +}