Repository: zeppelin Updated Branches: refs/heads/branch-0.8 d7918ab8d -> d17535bca
ZEPPELIN-2221 Show `Jobs` page when `jobId` is missing ### What is this PR for? Because of yarn's bug, Spark's `jobId` is not passed. This causes some Spark UI link looks broken. In this kind of case, showing `Jobs` page looks reasonable. Yarn's bug is already fixed with the latest version only, so we need to handle it in Zeppelin side. ### What type of PR is it? [Bug Fix] ### Todos * [x] - Return `Jobs` page when `Job` page is not available ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2221 ### How should this be tested? 1. Install outdated yarn 2. Run script 3. Click `Spark UI` to show `Jobs` page instead of 404 ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jongyoul Lee <jongy...@gmail.com> Closes #2964 from jongyoul/ZEPPELIN-2221 and squashes the following commits: d595028b5 [Jongyoul Lee] Remove unused packages from import statements Add spark's master value as a parameter of setupSparkListener Add parameterized test for various yarn versions Add a test checking jobUrl Add a logic to check a hadoop version and yarn version to support passing `get` parameters Add hadoop dependency as a `provided` scope 8b6f9076d [Jongyoul Lee] Fix to cover spark1 and spark2 8fdc46763 [Jongyoul Lee] Fix to cover spark1 and spark2 7ffcc55ca [Jongyoul Lee] Consume httpEntity 431457308 [Jongyoul Lee] Add httpClient to test if `jobUrl` is valid or not Add test cases for that feature (cherry picked from commit b6beda64e7d25fa93958737fc8d2dad4eae309cc) Signed-off-by: Jongyoul Lee <jongy...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d17535bc Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d17535bc Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d17535bc Branch: refs/heads/branch-0.8 Commit: d17535bca7db8f41a037e8f8026bec4b221632e7 Parents: d7918ab Author: Jongyoul Lee <jongy...@gmail.com> Authored: Mon May 21 15:15:48 2018 +0900 Committer: Jongyoul Lee <jongy...@apache.org> Committed: Wed May 30 13:03:27 2018 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/NewSparkInterpreter.java | 8 +- .../zeppelin/spark/OldSparkInterpreter.java | 4 +- .../apache/zeppelin/spark/SparkShimsTest.java | 153 +++++++++++++++++++ spark/spark-shims/pom.xml | 11 ++ .../org/apache/zeppelin/spark/SparkShims.java | 59 +++++-- .../org/apache/zeppelin/spark/Spark1Shims.java | 25 +-- .../org/apache/zeppelin/spark/Spark2Shims.java | 4 +- 7 files changed, 217 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d17535bc/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java index 591ef96..2fa1093 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java @@ -23,10 +23,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.sql.SQLContext; -import org.apache.spark.ui.jobs.JobProgressListener; -import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.DefaultInterpreterProperty; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -34,7 +31,6 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterHookRegistry; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.WrappedInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; @@ -42,8 +38,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -119,7 +113,7 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { sparkSession = this.innerInterpreter.sparkSession(); sparkUrl = this.innerInterpreter.sparkUrl(); sparkShims = SparkShims.getInstance(sc.version()); - sparkShims.setupSparkListener(sparkUrl); + sparkShims.setupSparkListener(sc.master(), sparkUrl); hooks = getInterpreterGroup().getInterpreterHookRegistry(); z = new SparkZeppelinContext(sc, hooks, http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d17535bc/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java index 0dfe3cb..83d3d6a 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java @@ -163,7 +163,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { this.sc = sc; env = SparkEnv.get(); sparkShims = SparkShims.getInstance(sc.version()); - sparkShims.setupSparkListener(sparkUrl); + sparkShims.setupSparkListener(sc.master(), sparkUrl); } public SparkContext getSparkContext() { @@ -873,7 +873,7 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { sparkUrl = getSparkUIUrl(); sparkShims = SparkShims.getInstance(sc.version()); - sparkShims.setupSparkListener(sparkUrl); + sparkShims.setupSparkListener(sc.master(), sparkUrl); numReferenceOfSparkContext.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d17535bc/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java ---------------------------------------------------------------------- diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java new file mode 100644 index 0000000..25afd4e --- /dev/null +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkShimsTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Properties; +import org.apache.hadoop.util.VersionInfo; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(Enclosed.class) +public class SparkShimsTest { + + @RunWith(Parameterized.class) + public static class ParamTests { + @Parameters(name = "Hadoop {0} supports jobUrl: {1}") + public static Collection<Object[]> data() { + return Arrays.asList( + new Object[][] { + {"2.6.0", false}, + {"2.6.1", false}, + {"2.6.2", false}, + {"2.6.3", false}, + {"2.6.4", false}, + {"2.6.5", false}, + {"2.6.6", true}, // The latest fixed version + {"2.6.7", true}, // Future version + {"2.7.0", false}, + {"2.7.1", false}, + {"2.7.2", false}, + {"2.7.3", false}, + {"2.7.4", true}, // The latest fixed version + {"2.7.5", true}, // Future versions + {"2.8.0", false}, + {"2.8.1", false}, + {"2.8.2", true}, // The latest fixed version + {"2.8.3", true}, // Future versions + {"2.9.0", true}, // The latest fixed version + {"2.9.1", true}, // Future versions + {"3.0.0", true}, // The latest fixed version + {"3.0.0-alpha4", true}, // The latest fixed version + {"3.0.1", true}, // Future versions + }); + } + + @Parameter public String version; + + @Parameter(1) + public boolean expected; + + @Test + public void checkYarnVersionTest() { + SparkShims sparkShims = + new SparkShims() { + @Override + public void setupSparkListener(String master, String sparkWebUrl) {} + }; + assertEquals(expected, sparkShims.supportYarn6615(version)); + } + } + + @RunWith(PowerMockRunner.class) + @PrepareForTest({BaseZeppelinContext.class, VersionInfo.class}) + @PowerMockIgnore({"javax.net.*", "javax.security.*"}) + public static class SingleTests { + @Mock Properties mockProperties; + @Captor ArgumentCaptor<Map<String, String>> argumentCaptor; + + SparkShims sparkShims; + + @Before + public void setUp() { + PowerMockito.mockStatic(BaseZeppelinContext.class); + RemoteEventClientWrapper mockRemoteEventClientWrapper = mock(RemoteEventClientWrapper.class); + + when(BaseZeppelinContext.getEventClient()).thenReturn(mockRemoteEventClientWrapper); + doNothing() + .when(mockRemoteEventClientWrapper) + .onParaInfosReceived(anyString(), anyString(), argumentCaptor.capture()); + + when(mockProperties.getProperty("spark.jobGroup.id")).thenReturn("job-note-paragraph"); + + try { + sparkShims = SparkShims.getInstance(SparkVersion.SPARK_2_0_0.toString()); + } catch (Throwable ignore) { + sparkShims = SparkShims.getInstance(SparkVersion.SPARK_1_6_0.toString()); + } + } + + @Test + public void runUnerLocalTest() { + sparkShims.buildSparkJobUrl("local", "http://sparkurl", 0, mockProperties); + + Map<String, String> mapValue = argumentCaptor.getValue(); + assertTrue(mapValue.keySet().contains("jobUrl")); + assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id=")); + } + + @Test + public void runUnerYarnTest() { + + sparkShims.buildSparkJobUrl("yarn", "http://sparkurl", 0, mockProperties); + + Map<String, String> mapValue = argumentCaptor.getValue(); + assertTrue(mapValue.keySet().contains("jobUrl")); + + if (sparkShims.supportYarn6615(VersionInfo.getVersion())) { + assertTrue(mapValue.get("jobUrl").contains("/jobs/job?id=")); + } else { + assertFalse(mapValue.get("jobUrl").contains("/jobs/job?id=")); + } + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d17535bc/spark/spark-shims/pom.xml ---------------------------------------------------------------------- diff --git a/spark/spark-shims/pom.xml b/spark/spark-shims/pom.xml index 81275e4..03b6bf9 100644 --- a/spark/spark-shims/pom.xml +++ b/spark/spark-shims/pom.xml @@ -41,6 +41,17 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + + <!-- + This is for ZEPPELIN-2221 using VersionInfo for check the version of Yarn. + It's checked that VersionInfo is compatible at least 2.2.0 to the latest one. + --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.2.0</version> + <scope>provided</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d17535bc/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java ---------------------------------------------------------------------- diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java index acf717c..1d7323b 100644 --- a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java +++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java @@ -15,10 +15,10 @@ * limitations under the License. */ - package org.apache.zeppelin.spark; - +import org.apache.hadoop.util.VersionInfo; +import org.apache.hadoop.util.VersionUtil; import org.apache.zeppelin.interpreter.BaseZeppelinContext; import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.slf4j.Logger; @@ -29,11 +29,21 @@ import java.util.Map; import java.util.Properties; /** - * This is abstract class for anything that is api incompatible between spark1 and spark2. - * It will load the correct version of SparkShims based on the version of Spark. + * This is abstract class for anything that is api incompatible between spark1 and spark2. It will + * load the correct version of SparkShims based on the version of Spark. */ public abstract class SparkShims { + // the following lines for checking specific versions + private static final String HADOOP_VERSION_2_6_6 = "2.6.6"; + private static final String HADOOP_VERSION_2_7_0 = "2.7.0"; + private static final String HADOOP_VERSION_2_7_4 = "2.7.4"; + private static final String HADOOP_VERSION_2_8_0 = "2.8.0"; + private static final String HADOOP_VERSION_2_8_2 = "2.8.2"; + private static final String HADOOP_VERSION_2_9_0 = "2.9.0"; + private static final String HADOOP_VERSION_3_0_0 = "3.0.0"; + private static final String HADOOP_VERSION_3_0_0_ALPHA4 = "3.0.0-alpha4"; + private static final Logger LOGGER = LoggerFactory.getLogger(SparkShims.class); private static SparkShims sparkShims; @@ -69,11 +79,10 @@ public abstract class SparkShims { } /** - * This is due to SparkListener api change between spark1 and spark2. - * SparkListener is trait in spark1 while it is abstract class in spark2. + * This is due to SparkListener api change between spark1 and spark2. SparkListener is trait in + * spark1 while it is abstract class in spark2. */ - public abstract void setupSparkListener(String sparkWebUrl); - + public abstract void setupSparkListener(String master, String sparkWebUrl); protected String getNoteId(String jobgroupId) { int indexOf = jobgroupId.indexOf("-"); @@ -87,18 +96,24 @@ public abstract class SparkShims { return jobgroupId.substring(secondIndex + 1, jobgroupId.length()); } - protected void buildSparkJobUrl(String sparkWebUrl, int jobId, Properties jobProperties) { + protected void buildSparkJobUrl( + String master, String sparkWebUrl, int jobId, Properties jobProperties) { String jobGroupId = jobProperties.getProperty("spark.jobGroup.id"); String uiEnabled = jobProperties.getProperty("spark.ui.enabled"); String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId; + + String version = VersionInfo.getVersion(); + if (master.toLowerCase().contains("yarn") && !supportYarn6615(version)) { + jobUrl = sparkWebUrl + "/jobs"; + } + String noteId = getNoteId(jobGroupId); String paragraphId = getParagraphId(jobGroupId); // Button visible if Spark UI property not set, set as invalid boolean or true - boolean showSparkUI = - uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); - if (showSparkUI && jobUrl != null) { + boolean showSparkUI = uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); + if (showSparkUI) { RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); - Map<String, String> infos = new java.util.HashMap<String, String>(); + Map<String, String> infos = new java.util.HashMap<>(); infos.put("jobUrl", jobUrl); infos.put("label", "SPARK JOB"); infos.put("tooltip", "View in Spark web UI"); @@ -107,4 +122,22 @@ public abstract class SparkShims { } } } + + /** + * This is temporal patch for support old versions of Yarn which is not adopted YARN-6615 + * + * @return true if YARN-6615 is patched, false otherwise + */ + protected boolean supportYarn6615(String version) { + return (VersionUtil.compareVersions(HADOOP_VERSION_2_6_6, version) <= 0 + && VersionUtil.compareVersions(HADOOP_VERSION_2_7_0, version) > 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_2_7_4, version) <= 0 + && VersionUtil.compareVersions(HADOOP_VERSION_2_8_0, version) > 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_2_8_2, version) <= 0 + && VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) > 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_2_9_0, version) <= 0 + && VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) > 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0_ALPHA4, version) <= 0) + || (VersionUtil.compareVersions(HADOOP_VERSION_3_0_0, version) <= 0); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d17535bc/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java ---------------------------------------------------------------------- diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java index 8934c1e..c842188 100644 --- a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java +++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java @@ -19,41 +19,20 @@ package org.apache.zeppelin.spark; import org.apache.spark.SparkContext; -import org.apache.spark.scheduler.SparkListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; -import org.apache.spark.scheduler.SparkListenerApplicationStart; -import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; -import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; -import org.apache.spark.scheduler.SparkListenerBlockUpdated; -import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; -import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; -import org.apache.spark.scheduler.SparkListenerExecutorRemoved; -import org.apache.spark.scheduler.SparkListenerJobEnd; import org.apache.spark.scheduler.SparkListenerJobStart; -import org.apache.spark.scheduler.SparkListenerStageCompleted; -import org.apache.spark.scheduler.SparkListenerStageSubmitted; -import org.apache.spark.scheduler.SparkListenerTaskEnd; -import org.apache.spark.scheduler.SparkListenerTaskGettingResult; -import org.apache.spark.scheduler.SparkListenerTaskStart; -import org.apache.spark.scheduler.SparkListenerUnpersistRDD; import org.apache.spark.ui.jobs.JobProgressListener; -import org.apache.zeppelin.interpreter.BaseZeppelinContext; -import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; - -import java.util.Map; /** * Shims for Spark 1.x */ public class Spark1Shims extends SparkShims { - public void setupSparkListener(final String sparkWebUrl) { + public void setupSparkListener(final String master, final String sparkWebUrl) { SparkContext sc = SparkContext.getOrCreate(); sc.addSparkListener(new JobProgressListener(sc.getConf()) { @Override public void onJobStart(SparkListenerJobStart jobStart) { - buildSparkJobUrl(sparkWebUrl, jobStart.jobId(), jobStart.properties()); + buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties()); } }); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d17535bc/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java ---------------------------------------------------------------------- diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java index e4aea5c..5a6c5b2 100644 --- a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java +++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java @@ -27,12 +27,12 @@ import org.apache.spark.scheduler.SparkListenerJobStart; */ public class Spark2Shims extends SparkShims { - public void setupSparkListener(final String sparkWebUrl) { + public void setupSparkListener(final String master, final String sparkWebUrl) { SparkContext sc = SparkContext.getOrCreate(); sc.addSparkListener(new SparkListener() { @Override public void onJobStart(SparkListenerJobStart jobStart) { - buildSparkJobUrl(sparkWebUrl, jobStart.jobId(), jobStart.properties()); + buildSparkJobUrl(master, sparkWebUrl, jobStart.jobId(), jobStart.properties()); } }); }