Repository: spark Updated Branches: refs/heads/master 8f6d5734d -> 9c21ece35
[SPARK-22836][UI] Show driver logs in UI when available. Port code from the old executors listener to the new one, so that the driver logs present in the application start event are kept. Author: Marcelo Vanzin <van...@cloudera.com> Closes #20038 from vanzin/SPARK-22836. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c21ece3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c21ece3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c21ece3 Branch: refs/heads/master Commit: 9c21ece35edc175edde949175a4ce701679824c8 Parents: 8f6d573 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Thu Dec 28 15:41:16 2017 -0600 Committer: Imran Rashid <iras...@cloudera.com> Committed: Thu Dec 28 15:41:16 2017 -0600 ---------------------------------------------------------------------- .../apache/spark/status/AppStatusListener.scala | 11 +++++++++++ .../spark/status/AppStatusListenerSuite.scala | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9c21ece3/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5253297..487a782 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -119,6 +119,17 @@ private[spark] class AppStatusListener( kvstore.write(new ApplicationInfoWrapper(appInfo)) kvstore.write(appSummary) + + // Update the driver block manager with logs from this event. The SparkContext initialization + // code registers the driver before this event is sent. + event.driverLogs.foreach { logs => + val driver = liveExecutors.get(SparkContext.DRIVER_IDENTIFIER) + .orElse(liveExecutors.get(SparkContext.LEGACY_DRIVER_IDENTIFIER)) + driver.foreach { d => + d.executorLogs = logs.toMap + update(d, System.nanoTime()) + } + } } override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/9c21ece3/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index c0b3a79..997c7de 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -942,6 +942,24 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { } } + test("driver logs") { + val listener = new AppStatusListener(store, conf, true) + + val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(time, driver, 42L)) + listener.onApplicationStart(SparkListenerApplicationStart( + "name", + Some("id"), + time, + "user", + Some("attempt"), + Some(Map("stdout" -> "file.txt")))) + + check[ExecutorSummaryWrapper](SparkContext.DRIVER_IDENTIFIER) { d => + assert(d.info.executorLogs("stdout") === "file.txt") + } + } + private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId) private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org