Repository: spark Updated Branches: refs/heads/master 935fe65ff -> 72e9021ea
[SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener This should reduce memory usage for the web ui as well as slightly increase its speed in draining the UI event queue. @andrewor14 Author: Reynold Xin <r...@apache.org> Closes #1262 from rxin/ui-consolidate-hashtables and squashes the following commits: 1ac3f97 [Reynold Xin] Oops. Properly handle description. f5736ad [Reynold Xin] Code review comments. b8828dc [Reynold Xin] Merge branch 'master' into ui-consolidate-hashtables 7a7b6c4 [Reynold Xin] Revert css change. f959bb8 [Reynold Xin] [SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener to speed it up. 63256f5 [Reynold Xin] [SPARK-2320] Reduce <pre> block font size. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72e9021e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72e9021e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72e9021e Branch: refs/heads/master Commit: 72e9021eaf26f31a82120505f8b764b18fbe8d48 Parents: 935fe65 Author: Reynold Xin <r...@apache.org> Authored: Thu Jul 17 18:58:48 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Thu Jul 17 18:58:48 2014 -0700 ---------------------------------------------------------------------- .../apache/spark/ui/jobs/ExecutorSummary.scala | 36 ----- .../apache/spark/ui/jobs/ExecutorTable.scala | 29 ++-- .../spark/ui/jobs/JobProgressListener.scala | 156 +++++++------------ .../org/apache/spark/ui/jobs/StagePage.scala | 37 ++--- .../org/apache/spark/ui/jobs/StageTable.scala | 73 ++++----- .../scala/org/apache/spark/ui/jobs/UIData.scala | 62 ++++++++ .../ui/jobs/JobProgressListenerSuite.scala | 36 +++-- 7 files changed, 205 insertions(+), 224 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala deleted file mode 100644 index c4a8996..0000000 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui.jobs - -import org.apache.spark.annotation.DeveloperApi - -/** - * :: DeveloperApi :: - * Class for reporting aggregated metrics for each executor in stage UI. - */ -@DeveloperApi -class ExecutorSummary { - var taskTime : Long = 0 - var failedTasks : Int = 0 - var succeededTasks : Int = 0 - var inputBytes: Long = 0 - var shuffleRead : Long = 0 - var shuffleWrite : Long = 0 - var memoryBytesSpilled : Long = 0 - var diskBytesSpilled : Long = 0 -} http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 5202095..0cc51c8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import scala.xml.Node import org.apache.spark.ui.{ToolTips, UIUtils} +import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils /** Page showing executor summary */ @@ -64,11 +65,9 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { executorIdToAddress.put(executorId, address) } - val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId) - executorIdToSummary match { - case Some(x) => - x.toSeq.sortBy(_._1).map { case (k, v) => { - // scalastyle:off + listener.stageIdToData.get(stageId) match { + case Some(stageData: StageUIData) => + stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) => <tr> <td>{k}</td> <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td> @@ -76,16 +75,20 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { <td>{v.failedTasks + v.succeededTasks}</td> <td>{v.failedTasks}</td> <td>{v.succeededTasks}</td> - <td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td> - <td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td> - <td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td> - <td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td> - <td sorttable_customekey={v.diskBytesSpilled.toString} >{Utils.bytesToString(v.diskBytesSpilled)}</td> + <td sorttable_customekey={v.inputBytes.toString}> + {Utils.bytesToString(v.inputBytes)}</td> + <td sorttable_customekey={v.shuffleRead.toString}> + {Utils.bytesToString(v.shuffleRead)}</td> + <td sorttable_customekey={v.shuffleWrite.toString}> + {Utils.bytesToString(v.shuffleWrite)}</td> + <td sorttable_customekey={v.memoryBytesSpilled.toString}> + {Utils.bytesToString(v.memoryBytesSpilled)}</td> + <td sorttable_customekey={v.diskBytesSpilled.toString}> + {Utils.bytesToString(v.diskBytesSpilled)}</td> </tr> - // scalastyle:on } - } - case _ => Seq[Node]() + case None => + Seq.empty[Node] } } } http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 2286a7f..efb527b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId +import org.apache.spark.ui.jobs.UIData._ /** * :: DeveloperApi :: @@ -35,7 +36,7 @@ import org.apache.spark.storage.BlockManagerId * updating the internal data structures concurrently. */ @DeveloperApi -class JobProgressListener(conf: SparkConf) extends SparkListener { +class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { import JobProgressListener._ @@ -46,20 +47,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() - // TODO: Should probably consolidate all following into a single hash map. - val stageIdToTime = HashMap[Int, Long]() - val stageIdToInputBytes = HashMap[Int, Long]() - val stageIdToShuffleRead = HashMap[Int, Long]() - val stageIdToShuffleWrite = HashMap[Int, Long]() - val stageIdToMemoryBytesSpilled = HashMap[Int, Long]() - val stageIdToDiskBytesSpilled = HashMap[Int, Long]() - val stageIdToTasksActive = HashMap[Int, HashMap[Long, TaskInfo]]() - val stageIdToTasksComplete = HashMap[Int, Int]() - val stageIdToTasksFailed = HashMap[Int, Int]() - val stageIdToTaskData = HashMap[Int, HashMap[Long, TaskUIData]]() - val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]() - val stageIdToPool = HashMap[Int, String]() - val stageIdToDescription = HashMap[Int, String]() + val stageIdToData = new HashMap[Int, StageUIData] + val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() val executorIdToBlockManagerId = HashMap[String, BlockManagerId]() @@ -71,8 +60,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo val stageId = stage.stageId - // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage - poolToActiveStages(stageIdToPool(stageId)).remove(stageId) + val stageData = stageIdToData.getOrElseUpdate(stageId, { + logWarning("Stage completed for unknown stage " + stageId) + new StageUIData + }) + + poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId)) activeStages.remove(stageId) if (stage.failureReason.isEmpty) { completedStages += stage @@ -87,21 +80,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => - stageIdToTime.remove(s.stageId) - stageIdToInputBytes.remove(s.stageId) - stageIdToShuffleRead.remove(s.stageId) - stageIdToShuffleWrite.remove(s.stageId) - stageIdToMemoryBytesSpilled.remove(s.stageId) - stageIdToDiskBytesSpilled.remove(s.stageId) - stageIdToTasksActive.remove(s.stageId) - stageIdToTasksComplete.remove(s.stageId) - stageIdToTasksFailed.remove(s.stageId) - stageIdToTaskData.remove(s.stageId) - stageIdToExecutorSummaries.remove(s.stageId) - stageIdToPool.remove(s.stageId) - stageIdToDescription.remove(s.stageId) - } + stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) } stages.trimStart(toRemove) } } @@ -114,26 +93,27 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { val poolName = Option(stageSubmitted.properties).map { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) - stageIdToPool(stage.stageId) = poolName - val description = Option(stageSubmitted.properties).flatMap { + val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData) + stageData.schedulingPool = poolName + + stageData.description = Option(stageSubmitted.properties).flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) } - description.map(d => stageIdToDescription(stage.stageId) = d) val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]()) stages(stage.stageId) = stage } override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { - val sid = taskStart.stageId val taskInfo = taskStart.taskInfo if (taskInfo != null) { - val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) - tasksActive(taskInfo.taskId) = taskInfo - val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) - taskMap(taskInfo.taskId) = new TaskUIData(taskInfo) - stageIdToTaskData(sid) = taskMap + val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, { + logWarning("Task start for unknown stage " + taskStart.stageId) + new StageUIData + }) + stageData.numActiveTasks += 1 + stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo)) } } @@ -143,88 +123,76 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { } override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - val sid = taskEnd.stageId val info = taskEnd.taskInfo - if (info != null) { + val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, { + logWarning("Task end for unknown stage " + taskEnd.stageId) + new StageUIData + }) + // create executor summary map if necessary - val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid, - op = new HashMap[String, ExecutorSummary]()) + val executorSummaryMap = stageData.executorSummary executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary) - val executorSummary = executorSummaryMap.get(info.executorId) - executorSummary match { - case Some(y) => { - // first update failed-task, succeed-task - taskEnd.reason match { - case Success => - y.succeededTasks += 1 - case _ => - y.failedTasks += 1 - } - - // update duration - y.taskTime += info.duration - - val metrics = taskEnd.taskMetrics - if (metrics != null) { - metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead } - metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } - metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } - y.memoryBytesSpilled += metrics.memoryBytesSpilled - y.diskBytesSpilled += metrics.diskBytesSpilled - } + executorSummaryMap.get(info.executorId).foreach { y => + // first update failed-task, succeed-task + taskEnd.reason match { + case Success => + y.succeededTasks += 1 + case _ => + y.failedTasks += 1 + } + + // update duration + y.taskTime += info.duration + + val metrics = taskEnd.taskMetrics + if (metrics != null) { + metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead } + metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } + metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } + y.memoryBytesSpilled += metrics.memoryBytesSpilled + y.diskBytesSpilled += metrics.diskBytesSpilled } - case _ => {} } - val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]()) - // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage - tasksActive.remove(info.taskId) + stageData.numActiveTasks -= 1 val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) = taskEnd.reason match { case org.apache.spark.Success => - stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1 + stageData.numCompleteTasks += 1 (None, Option(taskEnd.taskMetrics)) case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics - stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 + stageData.numFailedTasks += 1 (Some(e.toErrorString), e.metrics) case e: TaskFailedReason => // All other failure cases - stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1 + stageData.numFailedTasks += 1 (Some(e.toErrorString), None) } - stageIdToTime.getOrElseUpdate(sid, 0L) - val time = metrics.map(_.executorRunTime).getOrElse(0L) - stageIdToTime(sid) += time - stageIdToInputBytes.getOrElseUpdate(sid, 0L) + val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L) + stageData.executorRunTime += taskRunTime val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L) - stageIdToInputBytes(sid) += inputBytes + stageData.inputBytes += inputBytes - stageIdToShuffleRead.getOrElseUpdate(sid, 0L) val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L) - stageIdToShuffleRead(sid) += shuffleRead + stageData.shuffleReadBytes += shuffleRead - stageIdToShuffleWrite.getOrElseUpdate(sid, 0L) val shuffleWrite = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L) - stageIdToShuffleWrite(sid) += shuffleWrite + stageData.shuffleWriteBytes += shuffleWrite - stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L) val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L) - stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled + stageData.memoryBytesSpilled += memoryBytesSpilled - stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L) val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L) - stageIdToDiskBytesSpilled(sid) += diskBytesSpilled + stageData.diskBytesSpilled += diskBytesSpilled - val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]()) - taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage) - stageIdToTaskData(sid) = taskMap + stageData.taskData(info.taskId) = new TaskUIData(info, metrics, errorMessage) } - } + } // end of onTaskEnd override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { @@ -252,12 +220,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener { } -@DeveloperApi -case class TaskUIData( - taskInfo: TaskInfo, - taskMetrics: Option[TaskMetrics] = None, - errorMessage: Option[String] = None) - private object JobProgressListener { val DEFAULT_POOL_NAME = "default" val DEFAULT_RETAINED_STAGES = 1000 http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8c3821b..cab26b9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} +import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ @@ -34,8 +35,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt + val stageDataOption = listener.stageIdToData.get(stageId) - if (!listener.stageIdToTaskData.contains(stageId)) { + if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) { val content = <div> <h4>Summary Metrics</h4> No tasks have started yet @@ -45,23 +47,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { "Details for Stage %s".format(stageId), parent.headerTabs, parent) } - val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) + val stageData = stageDataOption.get + val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) val numCompleted = tasks.count(_.taskInfo.finished) - val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L) - val hasInput = inputBytes > 0 - val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L) - val hasShuffleRead = shuffleReadBytes > 0 - val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L) - val hasShuffleWrite = shuffleWriteBytes > 0 - val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L) - val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L) - val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 - - var activeTime = 0L - val now = System.currentTimeMillis - val tasksActive = listener.stageIdToTasksActive(stageId).values - tasksActive.foreach(activeTime += _.timeRunning(now)) + val hasInput = stageData.inputBytes > 0 + val hasShuffleRead = stageData.shuffleReadBytes > 0 + val hasShuffleWrite = stageData.shuffleWriteBytes > 0 + val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 // scalastyle:off val summary = @@ -69,34 +62,34 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { <ul class="unstyled"> <li> <strong>Total task time across all tasks: </strong> - {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} + {UIUtils.formatDuration(stageData.executorRunTime)} </li> {if (hasInput) <li> <strong>Input: </strong> - {Utils.bytesToString(inputBytes)} + {Utils.bytesToString(stageData.inputBytes)} </li> } {if (hasShuffleRead) <li> <strong>Shuffle read: </strong> - {Utils.bytesToString(shuffleReadBytes)} + {Utils.bytesToString(stageData.shuffleReadBytes)} </li> } {if (hasShuffleWrite) <li> <strong>Shuffle write: </strong> - {Utils.bytesToString(shuffleWriteBytes)} + {Utils.bytesToString(stageData.shuffleWriteBytes)} </li> } {if (hasBytesSpilled) <li> <strong>Shuffle spill (memory): </strong> - {Utils.bytesToString(memoryBytesSpilled)} + {Utils.bytesToString(stageData.memoryBytesSpilled)} </li> <li> <strong>Shuffle spill (disk): </strong> - {Utils.bytesToString(diskBytesSpilled)} + {Utils.bytesToString(stageData.diskBytesSpilled)} </li> } </ul> http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index fd8d0b5..5f45c0c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -17,12 +17,11 @@ package org.apache.spark.ui.jobs -import java.util.Date - -import scala.collection.mutable.HashMap import scala.xml.Node -import org.apache.spark.scheduler.{StageInfo, TaskInfo} +import java.util.Date + +import org.apache.spark.scheduler.StageInfo import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.util.Utils @@ -71,14 +70,14 @@ private[ui] class StageTableBase( </table> } - private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = + private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) val startWidth = "width: %s%%".format((started.toDouble/total)*100) <div class="progress"> <span style="text-align:center; position:absolute; width:100%; left:0;"> - {completed}/{total} {failed} + {completed}/{total} { if (failed > 0) s"($failed failed)" else "" } </span> <div class="bar bar-completed" style={completeWidth}></div> <div class="bar bar-running" style={startWidth}></div> @@ -108,13 +107,23 @@ private[ui] class StageTableBase( <pre class="stage-details collapsed">{s.details}</pre> } - listener.stageIdToDescription.get(s.stageId) - .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>) - .getOrElse(<div>{nameLink} {killLink} {details}</div>) + val stageDataOption = listener.stageIdToData.get(s.stageId) + // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively. + if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) { + val desc = stageDataOption.get.description + <div><em>{desc}</em></div><div>{nameLink} {killLink}</div> + } else { + <div>{killLink} {nameLink} {details}</div> + } } protected def stageRow(s: StageInfo): Seq[Node] = { - val poolName = listener.stageIdToPool.get(s.stageId) + val stageDataOption = listener.stageIdToData.get(s.stageId) + if (stageDataOption.isEmpty) { + return <td>{s.stageId}</td><td>No data available for this stage</td> + } + + val stageData = stageDataOption.get val submissionTime = s.submissionTime match { case Some(t) => UIUtils.formatDate(new Date(t)) case None => "Unknown" @@ -124,35 +133,20 @@ private[ui] class StageTableBase( if (finishTime > t) finishTime - t else System.currentTimeMillis - t } val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") - val startedTasks = - listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size - val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0) - val failedTasks = listener.stageIdToTasksFailed.getOrElse(s.stageId, 0) match { - case f if f > 0 => "(%s failed)".format(f) - case _ => "" - } - val totalTasks = s.numTasks - val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L) - val inputRead = inputSortable match { - case 0 => "" - case b => Utils.bytesToString(b) - } - val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L) - val shuffleRead = shuffleReadSortable match { - case 0 => "" - case b => Utils.bytesToString(b) - } - val shuffleWriteSortable = listener.stageIdToShuffleWrite.getOrElse(s.stageId, 0L) - val shuffleWrite = shuffleWriteSortable match { - case 0 => "" - case b => Utils.bytesToString(b) - } + + val inputRead = stageData.inputBytes + val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else "" + val shuffleRead = stageData.shuffleReadBytes + val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else "" + val shuffleWrite = stageData.shuffleWriteBytes + val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else "" + <td>{s.stageId}</td> ++ {if (isFairScheduler) { <td> <a href={"%s/stages/pool?poolname=%s" - .format(UIUtils.prependBaseUri(basePath), poolName.get)}> - {poolName.get} + .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}> + {stageData.schedulingPool} </a> </td> } else { @@ -162,11 +156,12 @@ private[ui] class StageTableBase( <td valign="middle">{submissionTime}</td> <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td> <td class="progress-cell"> - {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)} + {makeProgressBar(stageData.numActiveTasks, stageData.numCompleteTasks, + stageData.numFailedTasks, s.numTasks)} </td> - <td sorttable_customekey={inputSortable.toString}>{inputRead}</td> - <td sorttable_customekey={shuffleReadSortable.toString}>{shuffleRead}</td> - <td sorttable_customekey={shuffleWriteSortable.toString}>{shuffleWrite}</td> + <td sorttable_customekey={inputRead.toString}>{inputReadWithUnit}</td> + <td sorttable_customekey={shuffleRead.toString}>{shuffleReadWithUnit}</td> + <td sorttable_customekey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td> } /** Render an HTML row that represents a stage */ http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala new file mode 100644 index 0000000..be11a11 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.jobs + +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.scheduler.TaskInfo + +import scala.collection.mutable.HashMap + +private[jobs] object UIData { + + class ExecutorSummary { + var taskTime : Long = 0 + var failedTasks : Int = 0 + var succeededTasks : Int = 0 + var inputBytes : Long = 0 + var shuffleRead : Long = 0 + var shuffleWrite : Long = 0 + var memoryBytesSpilled : Long = 0 + var diskBytesSpilled : Long = 0 + } + + class StageUIData { + var numActiveTasks: Int = _ + var numCompleteTasks: Int = _ + var numFailedTasks: Int = _ + + var executorRunTime: Long = _ + + var inputBytes: Long = _ + var shuffleReadBytes: Long = _ + var shuffleWriteBytes: Long = _ + var memoryBytesSpilled: Long = _ + var diskBytesSpilled: Long = _ + + var schedulingPool: String = "" + var description: Option[String] = None + + var taskData = new HashMap[Long, TaskUIData] + var executorSummary = new HashMap[String, ExecutorSummary] + } + + case class TaskUIData( + taskInfo: TaskInfo, + taskMetrics: Option[TaskMetrics] = None, + errorMessage: Option[String] = None) +} http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index fa43b66..a855662 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -47,11 +47,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc } listener.completedStages.size should be (5) - listener.completedStages.filter(_.stageId == 50).size should be (1) - listener.completedStages.filter(_.stageId == 49).size should be (1) - listener.completedStages.filter(_.stageId == 48).size should be (1) - listener.completedStages.filter(_.stageId == 47).size should be (1) - listener.completedStages.filter(_.stageId == 46).size should be (1) + listener.completedStages.count(_.stageId == 50) should be (1) + listener.completedStages.count(_.stageId == 49) should be (1) + listener.completedStages.count(_.stageId == 48) should be (1) + listener.completedStages.count(_.stageId == 47) should be (1) + listener.completedStages.count(_.stageId == 46) should be (1) } test("test executor id to summary") { @@ -59,9 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val listener = new JobProgressListener(conf) val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics() - - // nothing in it - assert(listener.stageIdToExecutorSummaries.size == 0) + assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 @@ -71,8 +69,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc var task = new ShuffleMapTask(0, null, null, 0, null) val taskType = Utils.getFormattedClassName(task) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) - .shuffleRead == 1000) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) + .shuffleRead === 1000) // finish a task with unknown executor-id, nothing should happen taskInfo = @@ -80,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.size == 1) + assert(listener.stageIdToData.size === 1) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -89,8 +87,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail()) - .shuffleRead == 2000) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail()) + .shuffleRead === 2000) // finish this task, should get updated duration shuffleReadMetrics.remoteBytesRead = 1000 @@ -99,8 +97,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics)) - assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail()) - .shuffleRead == 1000) + assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail()) + .shuffleRead === 1000) } test("test task success vs failure counting for different task end reasons") { @@ -121,13 +119,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc TaskKilled, ExecutorLostFailure, UnknownReason) + var failCount = 0 for (reason <- taskFailedReasons) { listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics)) - assert(listener.stageIdToTasksComplete.get(task.stageId) === None) + failCount += 1 + assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0) + assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) } // Make sure we count success as success. listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics)) - assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1)) + assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1) + assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount) } }