Repository: spark Updated Branches: refs/heads/master 380dfcc0d -> 74c30049a
[SPARK-2533] Add locality levels on stage summary view Author: Jean-Baptiste Onofré <jbono...@apache.org> Closes #9487 from jbonofre/SPARK-2533-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74c30049 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74c30049 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74c30049 Branch: refs/heads/master Commit: 74c30049a8bf9841eeca48f827572c2044912e21 Parents: 380dfcc Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Thu Nov 12 15:46:21 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Thu Nov 12 15:46:21 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/ui/jobs/StagePage.scala | 21 +++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/74c30049/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 51425e5..1b34ba9 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.{InternalAccumulator, SparkConf} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} +import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality} import org.apache.spark.ui._ import org.apache.spark.ui.jobs.UIData._ import org.apache.spark.util.{Utils, Distribution} @@ -70,6 +70,21 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true) + private def getLocalitySummaryString(stageData: StageUIData): String = { + val localities = stageData.taskData.values.map(_.taskInfo.taskLocality) + val localityCounts = localities.groupBy(identity).mapValues(_.size) + val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) => + val localityName = locality match { + case TaskLocality.PROCESS_LOCAL => "Process local" + case TaskLocality.NODE_LOCAL => "Node local" + case TaskLocality.RACK_LOCAL => "Rack local" + case TaskLocality.ANY => "Any" + } + s"$localityName: $count" + } + localityNamesAndCounts.sorted.mkString("; ") + } + def render(request: HttpServletRequest): Seq[Node] = { progressListener.synchronized { val parameterId = request.getParameter("id") @@ -129,6 +144,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { <strong>Total Time Across All Tasks: </strong> {UIUtils.formatDuration(stageData.executorRunTime)} </li> + <li> + <strong>Locality Level Summary: </strong> + {getLocalitySummaryString(stageData)} + </li> {if (stageData.hasInput) { <li> <strong>Input Size / Records: </strong> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org