This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 4f1da64 [ZEPPELIN-4710]. Allow to inject application id into custom spark url 4f1da64 is described below commit 4f1da64d4aefaffee561a0db7f6eed912dcf4d9f Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Tue Apr 7 17:15:45 2020 +0800 [ZEPPELIN-4710]. Allow to inject application id into custom spark url ### What is this PR for? This is for injecting appId into custom spark url. Currently we allow user to set `zeppelin.spark.uiWebUrl` for a custom spark ui link. But we didn't inject appId into it. this make it less flexible for some cases. So this PR is to allow inject application id into custom spark url. e.g. `url_prefix/{{applicationId}}` ### What type of PR is it? [Improvement] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-4710 ### How should this be tested? * Unit test added ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3711 from zjffdu/ZEPPELIN-4710 and squashes the following commits: a887199db [Jeff Zhang] remove code duplicate 68fceecc8 [Jeff Zhang] [ZEPPELIN-4710]. Allow to inject application id into custom spark url (cherry picked from commit ec16666bc4704842df0941cc7aee6ed441562e6f) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../apache/zeppelin/spark/SparkInterpreterTest.java | 5 +++-- .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 21 ++++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index a15b046..f3f9dec 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -72,7 +72,7 @@ public class SparkInterpreterTest { properties.setProperty("spark.master", "local"); properties.setProperty("spark.app.name", "test"); properties.setProperty("zeppelin.spark.maxResult", "100"); - properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl"); + properties.setProperty("zeppelin.spark.uiWebUrl", "fake_spark_weburl/{{applicationId}}"); // disable color output for easy testing properties.setProperty("zeppelin.spark.scala.color", "false"); properties.setProperty("zeppelin.spark.deprecatedMsg.show", "false"); @@ -180,7 +180,8 @@ public class SparkInterpreterTest { // spark job url is sent ArgumentCaptor<Map> onParaInfosReceivedArg = ArgumentCaptor.forClass(Map.class); verify(mockRemoteEventClient).onParaInfosReceived(onParaInfosReceivedArg.capture()); - assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl")); + assertTrue(((String) onParaInfosReceivedArg.getValue().get("jobUrl")).startsWith("fake_spark_weburl/" + + interpreter.getJavaSparkContext().sc().applicationId())); // case class result = interpreter.interpret("val bankText = sc.textFile(\"bank.csv\")", getInterpreterContext()); diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala index 994c7ca..0361c94 100644 --- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala +++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala @@ -237,6 +237,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, case None => } + initSparkWebUrl() + val hiveSiteExisted: Boolean = Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null val hiveEnabled = conf.getBoolean("zeppelin.spark.useHiveContext", false) @@ -306,7 +308,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, case Some(url) => sparkUrl = url case None => } - useYarnProxyURLIfNeeded() + + initSparkWebUrl() bind("spark", sparkSession.getClass.getCanonicalName, sparkSession, List("""@transient""")) bind("sc", "org.apache.spark.SparkContext", sc, List("""@transient""")) @@ -321,6 +324,15 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, scalaInterpret("print(\"\")") } + private def initSparkWebUrl(): Unit = { + val webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); + if (!StringUtils.isBlank(webUiUrl)) { + this.sparkUrl = webUiUrl.replace("{{applicationId}}", sc.applicationId); + } else { + useYarnProxyURLIfNeeded() + } + } + protected def createZeppelinContext(): Unit = { var sparkShims: SparkShims = null @@ -329,13 +341,8 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf, } else { sparkShims = SparkShims.getInstance(sc.version, properties, sc) } - var webUiUrl = properties.getProperty("zeppelin.spark.uiWebUrl"); - if (StringUtils.isBlank(webUiUrl)) { - webUiUrl = sparkUrl; - } - useYarnProxyURLIfNeeded() - sparkShims.setupSparkListener(sc.master, webUiUrl, InterpreterContext.get) + sparkShims.setupSparkListener(sc.master, sparkUrl, InterpreterContext.get) z = new SparkZeppelinContext(sc, sparkShims, interpreterGroup.getInterpreterHookRegistry,