This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new ec16666 [ZEPPELIN-4710]. Allow to inject application id into custom spark url ec16666 is described below commit ec16666bc4704842df0941cc7aee6ed441562e6f Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue Apr 7 17:15:45 2020 +0800 [ZEPPELIN-4710]. Allow to inject application id into custom spark url ### What is this PR for? This is for injecting appId into custom spark url. Currently we allow user to set `zeppelin.spark.uiWebUrl` for a custom spark ui link. But we didn't inject appId into it. this make it less flexible for some cases. So this PR is to allow inject application id into custom spark url. e.g. `url_prefix/{{applicationId}}` ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4710 ### How should this be tested? * Unit test added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3711 from zjffdu/ZEPPELIN-4710 and squashes the following commits: a887199db [Jeff Zhang] remove code duplicate 68fceecc8 [Jeff Zhang] [ZEPPELIN-4710]. Allow to inject application id into custom spark url --- .../apache/zeppelin/spark/SparkInterpreterTest.java | 5 +++-- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 21 ++++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index a15b046..f3f9dec 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -72,7 +72,7 @@ public class SparkInterpreterTest { properties.setProperty("spark.master", "local"); properties.setProperty("spark.app.name", "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); - properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl"); + properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl/{{applicationId}}"); // disable color output for easy testing properties.setProperty("zeppelin.spark.scala.color", "false"); properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); @@ -180,7 +180,8 @@ public class SparkInterpreterTest { // spark job url is sent ArgumentCaptor<Map> onParaInfosReceivedArg = ArgumentCaptor.forClass(Map.class); verify(mockRemoteEventClient).onParaInfosReceived(onParaInfosReceivedArg.capture()); - assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl")); + assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl/" + + interpreter.getJavaSparkContext().sc().applicationId())); // case class result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 994c7ca..0361c94 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -237,6 +237,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, case None => } + initSparkWebUrl() + val hiveSiteExisted: Boolean = Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null val hiveEnabled = conf.getBoolean("zeppelin.spark.useHiveContext", false) @@ -306,7 +308,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, case Some(url) => sparkUrl = url case None => } - useYarnProxyURLIfNeeded() + + initSparkWebUrl() bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient""")) bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) @@ -321,6 +324,15 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, scalaInterpret("print(\"\")") } + private def initSparkWebUrl(): Unit = { + val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); + if (!StringUtils.isBlank(webUiUrl)) { + this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId); + } else { + useYarnProxyURLIfNeeded() + } + } + protected def createZeppelinContext(): Unit = { var sparkShims: SparkShims = null @@ -329,13 +341,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } else { sparkShims = SparkShims.getInstance(sc.version, properties, sc) } - var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); - if (StringUtils.isBlank(webUiUrl)) { - webUiUrl = sparkUrl; - } - useYarnProxyURLIfNeeded() - sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get) + sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) z = new SparkZeppelinContext(sc, sparkShims, interpreterGroup.getInterpreterHookRegistry,