This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new cbb20fd  [SPARK-32598][SCHEDULER] Fix missing driver logs under UI 
App-Executors tab in standalone cluster mode
cbb20fd is described below

commit cbb20fdad101984a83afebc4288611f4fdb543ac
Author: KevinSmile <kevinwang...@hotmail.com>
AuthorDate: Fri Jan 15 09:01:26 2021 -0600

    [SPARK-32598][SCHEDULER] Fix missing driver logs under UI App-Executors tab 
in standalone cluster mode
    
    ### What changes were proposed in this pull request?
    Fix  [SPARK-32598] (missing driver logs under 
UI-ApplicationDetails-Executors tab in standalone cluster mode) .
    
    The direct bug is: the original author forgot to implement 
`getDriverLogUrls` in `StandaloneSchedulerBackend`
    
    
https://github.com/apache/spark/blob/1de272f98d0ff22d0dd151797f22b8faf310963a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala#L70-L75
    
    So we set DriverLogUrls as env in `DriverRunner`, and retrieve it at 
`StandaloneSchedulerBackend`.
    
    ### Why are the changes needed?
    Fix bug  [SPARK-32598].
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. User will see driver logs (standalone cluster mode) under 
UI-ApplicationDetails-Executors tab now.
    
    Before:
    
![image](https://user-images.githubusercontent.com/17903517/93901055-b5de8600-fd28-11ea-879a-d97e6f70cc6e.png)
    
    After:
    
![image](https://user-images.githubusercontent.com/17903517/93901080-baa33a00-fd28-11ea-8895-3787c5efbf88.png)
    
    ### How was this patch tested?
    Re-check the real case in [SPARK-32598] and found this user-facing bug 
fixed.
    
    Closes #29644 from KevinSmile/kw-dev-master.
    
    Authored-by: KevinSmile <kevinwang...@hotmail.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
    (cherry picked from commit c75c29dcaa9458a9ce0dd7a4d5fafbffb4b7f6a6)
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../scala/org/apache/spark/deploy/worker/DriverRunner.scala   | 11 +++++++++++
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala    |  1 +
 .../spark/scheduler/cluster/StandaloneSchedulerBackend.scala  |  8 ++++++++
 .../test/scala/org/apache/spark/deploy/DeployTestUtils.scala  |  1 +
 .../org/apache/spark/deploy/worker/DriverRunnerTest.scala     |  3 ++-
 5 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 4f9c497..1c3aa3a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -33,9 +33,11 @@ import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.deploy.master.DriverState.DriverState
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{DRIVER_RESOURCES_FILE, 
SPARK_DRIVER_PREFIX}
+import org.apache.spark.internal.config.UI.UI_REVERSE_PROXY
 import org.apache.spark.internal.config.Worker.WORKER_DRIVER_TERMINATE_TIMEOUT
 import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.ui.UIUtils
 import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}
 
 /**
@@ -50,6 +52,7 @@ private[deploy] class DriverRunner(
     val driverDesc: DriverDescription,
     val worker: RpcEndpointRef,
     val workerUrl: String,
+    val workerWebUiUrl: String,
     val securityManager: SecurityManager,
     val resources: Map[String, ResourceInformation] = Map.empty)
   extends Logging {
@@ -190,6 +193,14 @@ private[deploy] class DriverRunner(
     val builder = 
CommandUtils.buildProcessBuilder(driverDesc.command.copy(javaOpts = javaOpts),
       securityManager, driverDesc.mem, sparkHome.getAbsolutePath, 
substituteVariables)
 
+    // add WebUI driver log url to environment
+    val reverseProxy = conf.get(UI_REVERSE_PROXY)
+    val workerUrlRef = UIUtils.makeHref(reverseProxy, driverId, workerWebUiUrl)
+    builder.environment.put("SPARK_DRIVER_LOG_URL_STDOUT",
+      s"$workerUrlRef/logPage?driverId=$driverId&logType=stdout")
+    builder.environment.put("SPARK_DRIVER_LOG_URL_STDERR",
+      s"$workerUrlRef/logPage?driverId=$driverId&logType=stderr")
+
     runDriver(builder, driverDir, driverDesc.supervise)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a6092f6..fd14c15 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -651,6 +651,7 @@ private[deploy] class Worker(
         driverDesc.copy(command = 
Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
         self,
         workerUri,
+        workerWebUiUrl,
         securityMgr,
         resources_)
       drivers(driverId) = driver
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index c14b2d4..7a05569 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
+import java.util.Locale
 import java.util.concurrent.Semaphore
 import java.util.concurrent.atomic.AtomicBoolean
 
@@ -235,6 +236,13 @@ private[spark] class StandaloneSchedulerBackend(
     }
   }
 
+  override def getDriverLogUrls: Option[Map[String, String]] = {
+    val prefix = "SPARK_DRIVER_LOG_URL_"
+    val driverLogUrls = sys.env.filterKeys(_.startsWith(prefix))
+      .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), 
e._2)).toMap
+    if (driverLogUrls.nonEmpty) Some(driverLogUrls) else None
+  }
+
   private def waitForRegistration() = {
     registrationBarrier.acquire()
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala 
b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 31f065e..b182b11 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -102,6 +102,7 @@ private[deploy] object DeployTestUtils {
       createDriverDesc(),
       null,
       "spark://worker",
+      "http://publicAddress:80";,
       new SecurityManager(conf))
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index c3b580e..e429ddf 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -40,7 +40,8 @@ class DriverRunnerTest extends SparkFunSuite {
     val worker = mock(classOf[RpcEndpointRef])
     doNothing().when(worker).send(any())
     spy(new DriverRunner(conf, "driverId", new File("workDir"), new 
File("sparkHome"),
-      driverDescription, worker, "spark://1.2.3.4/worker/", new 
SecurityManager(conf)))
+      driverDescription, worker, "spark://1.2.3.4/worker/", 
"http://publicAddress:80";,
+      new SecurityManager(conf)))
   }
 
   private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) 
= {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to