Repository: spark
Updated Branches:
  refs/heads/master 8f6d5734d -> 9c21ece35


[SPARK-22836][UI] Show driver logs in UI when available.

Port code from the old executors listener to the new one, so that
the driver logs present in the application start event are kept.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #20038 from vanzin/SPARK-22836.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c21ece3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c21ece3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c21ece3

Branch: refs/heads/master
Commit: 9c21ece35edc175edde949175a4ce701679824c8
Parents: 8f6d573
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Thu Dec 28 15:41:16 2017 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Thu Dec 28 15:41:16 2017 -0600

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusListener.scala   | 11 +++++++++++
 .../spark/status/AppStatusListenerSuite.scala     | 18 ++++++++++++++++++
 2 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c21ece3/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 5253297..487a782 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -119,6 +119,17 @@ private[spark] class AppStatusListener(
 
     kvstore.write(new ApplicationInfoWrapper(appInfo))
     kvstore.write(appSummary)
+
+    // Update the driver block manager with logs from this event. The 
SparkContext initialization
+    // code registers the driver before this event is sent.
+    event.driverLogs.foreach { logs =>
+      val driver = liveExecutors.get(SparkContext.DRIVER_IDENTIFIER)
+        .orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER))
+      driver.foreach { d =>
+        d.executorLogs = logs.toMap
+        update(d, System.nanoTime())
+      }
+    }
   }
 
   override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): 
Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9c21ece3/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index c0b3a79..997c7de 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -942,6 +942,24 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
     }
   }
 
+  test("driver logs") {
+    val listener = new AppStatusListener(store, conf, true)
+
+    val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 
42)
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(time, driver, 
42L))
+    listener.onApplicationStart(SparkListenerApplicationStart(
+      "name",
+      Some("id"),
+      time,
+      "user",
+      Some("attempt"),
+      Some(Map("stdout" -> "file.txt"))))
+
+    check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d =>
+      assert(d.info.executorLogs("stdout") === "file.txt")
+    }
+  }
+
   private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, 
stage.attemptId)
 
   private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to