Repository: spark Updated Branches: refs/heads/master ef8fb3612 -> c037d2548
[SPARK-12149][WEB UI] Executor UI improvement suggestions - Color UI Added color coding to the Executors page for Active Tasks, Failed Tasks, Completed Tasks and Task Time. Active Tasks is shaded blue with it's range based on percentage of total cores used. Failed Tasks is shaded red ranging over the first 10% of total tasks failed Completed Tasks is shaded green ranging over 10% of total tasks including failed and active tasks, but only when there are active or failed tasks on that executor. Task Time is shaded red when GC Time goes over 10% of total time with it's range directly corresponding to the percent of total time. Author: Alex Bozarth <ajboz...@us.ibm.com> Closes #10154 from ajbozarth/spark12149. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c037d254 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c037d254 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c037d254 Branch: refs/heads/master Commit: c037d25482ea63430fb42bfd86124c268be5a4a4 Parents: ef8fb36 Author: Alex Bozarth <ajboz...@us.ibm.com> Authored: Mon Jan 25 14:42:44 2016 -0600 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Mon Jan 25 14:42:44 2016 -0600 ---------------------------------------------------------------------- .../org/apache/spark/status/api/v1/api.scala | 2 + .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../scala/org/apache/spark/ui/ToolTips.scala | 3 + .../apache/spark/ui/exec/ExecutorsPage.scala | 98 ++++++++++++++++---- .../org/apache/spark/ui/exec/ExecutorsTab.scala | 10 +- .../executor_list_json_expectation.json | 2 + project/MimaExcludes.scala | 6 ++ 7 files changed, 103 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c037d254/core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index fe37211..3adf5b1 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -55,11 +55,13 @@ class ExecutorSummary private[spark]( val rddBlocks: Int, val memoryUsed: Long, val diskUsed: Long, + val maxTasks: Int, val activeTasks: Int, val failedTasks: Int, val completedTasks: Int, val totalTasks: Int, val totalDuration: Long, + val totalGCTime: Long, val totalInputBytes: Long, val totalShuffleRead: Long, val totalShuffleWrite: Long, http://git-wip-us.apache.org/repos/asf/spark/blob/c037d254/core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index eb53aa8..cf45414 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -195,7 +195,7 @@ private[spark] object SparkUI { val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener - val executorsListener = new ExecutorsListener(storageStatusListener) + val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) http://git-wip-us.apache.org/repos/asf/spark/blob/c037d254/core/src/main/scala/org/apache/spark/ui/ToolTips.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index cb122ea..2d2d80b 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -87,4 +87,7 @@ private[spark] object ToolTips { multiple operations (e.g. two map() functions) if they can be pipelined. Some operations also create multiple RDDs internally. Cached RDDs are shown in green. """ + + val TASK_TIME = + "Shaded red when garbage collection (GC) time is over 10% of task time" } http://git-wip-us.apache.org/repos/asf/spark/blob/c037d254/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 440dfa2..e36b96b 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -50,6 +50,8 @@ private[ui] class ExecutorsPage( threadDumpEnabled: Boolean) extends WebUIPage("") { private val listener = parent.listener + // When GCTimePercent is edited change ToolTips.TASK_TIME to match + private val GCTimePercent = 0.1 def render(request: HttpServletRequest): Seq[Node] = { val (storageStatusList, execInfo) = listener.synchronized { @@ -77,7 +79,7 @@ private[ui] class ExecutorsPage( <th>Failed Tasks</th> <th>Complete Tasks</th> <th>Total Tasks</th> - <th>Task Time</th> + <th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th> <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> <th> @@ -129,13 +131,8 @@ private[ui] class ExecutorsPage( <td sorttable_customkey={diskUsed.toString}> {Utils.bytesToString(diskUsed)} </td> - <td>{info.activeTasks}</td> - <td>{info.failedTasks}</td> - <td>{info.completedTasks}</td> - <td>{info.totalTasks}</td> - <td sorttable_customkey={info.totalDuration.toString}> - {Utils.msDurationToString(info.totalDuration)} - </td> + {taskData(info.maxTasks, info.activeTasks, info.failedTasks, info.completedTasks, + info.totalTasks, info.totalDuration, info.totalGCTime)} <td sorttable_customkey={info.totalInputBytes.toString}> {Utils.bytesToString(info.totalInputBytes)} </td> @@ -177,7 +174,6 @@ private[ui] class ExecutorsPage( val maximumMemory = execInfo.map(_.maxMemory).sum val memoryUsed = execInfo.map(_.memoryUsed).sum val diskUsed = execInfo.map(_.diskUsed).sum - val totalDuration = execInfo.map(_.totalDuration).sum val totalInputBytes = execInfo.map(_.totalInputBytes).sum val totalShuffleRead = execInfo.map(_.totalShuffleRead).sum val totalShuffleWrite = execInfo.map(_.totalShuffleWrite).sum @@ -192,13 +188,13 @@ private[ui] class ExecutorsPage( <td sorttable_customkey={diskUsed.toString}> {Utils.bytesToString(diskUsed)} </td> - <td>{execInfo.map(_.activeTasks).sum}</td> - <td>{execInfo.map(_.failedTasks).sum}</td> - <td>{execInfo.map(_.completedTasks).sum}</td> - <td>{execInfo.map(_.totalTasks).sum}</td> - <td sorttable_customkey={totalDuration.toString}> - {Utils.msDurationToString(totalDuration)} - </td> + {taskData(execInfo.map(_.maxTasks).sum, + execInfo.map(_.activeTasks).sum, + execInfo.map(_.failedTasks).sum, + execInfo.map(_.completedTasks).sum, + execInfo.map(_.totalTasks).sum, + execInfo.map(_.totalDuration).sum, + execInfo.map(_.totalGCTime).sum)} <td sorttable_customkey={totalInputBytes.toString}> {Utils.bytesToString(totalInputBytes)} </td> @@ -219,7 +215,7 @@ private[ui] class ExecutorsPage( <th>Failed Tasks</th> <th>Complete Tasks</th> <th>Total Tasks</th> - <th>Task Time</th> + <th data-toggle="tooltip" title={ToolTips.TASK_TIME}>Task Time (GC Time)</th> <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> <th> @@ -233,6 +229,70 @@ private[ui] class ExecutorsPage( </tbody> </table> } + + private def taskData( + maxTasks: Int, + activeTasks: Int, + failedTasks: Int, + completedTasks: Int, + totalTasks: Int, + totalDuration: Long, + totalGCTime: Long): + Seq[Node] = { + // Determine Color Opacity from 0.5-1 + // activeTasks range from 0 to maxTasks + val activeTasksAlpha = + if (maxTasks > 0) { + (activeTasks.toDouble / maxTasks) * 0.5 + 0.5 + } else { + 1 + } + // failedTasks range max at 10% failure, alpha max = 1 + val failedTasksAlpha = + if (totalTasks > 0) { + math.min(10 * failedTasks.toDouble / totalTasks, 1) * 0.5 + 0.5 + } else { + 1 + } + // totalDuration range from 0 to 50% GC time, alpha max = 1 + val totalDurationAlpha = + if (totalDuration > 0) { + math.min(totalGCTime.toDouble / totalDuration + 0.5, 1) + } else { + 1 + } + + val tableData = + <td style={ + if (activeTasks > 0) { + "background:hsla(240, 100%, 50%, " + activeTasksAlpha + ");color:white" + } else { + "" + } + }>{activeTasks}</td> + <td style={ + if (failedTasks > 0) { + "background:hsla(0, 100%, 50%, " + failedTasksAlpha + ");color:white" + } else { + "" + } + }>{failedTasks}</td> + <td>{completedTasks}</td> + <td>{totalTasks}</td> + <td sorttable_customkey={totalDuration.toString} style={ + // Red if GC time over GCTimePercent of total time + if (totalGCTime > GCTimePercent * totalDuration) { + "background:hsla(0, 100%, 50%, " + totalDurationAlpha + ");color:white" + } else { + "" + } + }> + {Utils.msDurationToString(totalDuration)} + ({Utils.msDurationToString(totalGCTime)}) + </td>; + + tableData + } } private[spark] object ExecutorsPage { @@ -245,11 +305,13 @@ private[spark] object ExecutorsPage { val memUsed = status.memUsed val maxMem = status.maxMem val diskUsed = status.diskUsed + val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0) val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0) val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks val totalDuration = listener.executorToDuration.getOrElse(execId, 0L) + val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L) val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) @@ -261,11 +323,13 @@ private[spark] object ExecutorsPage { rddBlocks, memUsed, diskUsed, + maxTasks, activeTasks, failedTasks, completedTasks, totalTasks, totalDuration, + totalGCTime, totalInputBytes, totalShuffleRead, totalShuffleWrite, http://git-wip-us.apache.org/repos/asf/spark/blob/c037d254/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 160d7a4..a9e926b 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec import scala.collection.mutable.HashMap -import org.apache.spark.{ExceptionFailure, Resubmitted, SparkContext} +import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageStatus, StorageStatusListener} @@ -43,11 +43,14 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec * A SparkListener that prepares information to be displayed on the ExecutorsTab */ @DeveloperApi -class ExecutorsListener(storageStatusListener: StorageStatusListener) extends SparkListener { +class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf) + extends SparkListener { + val executorToTasksMax = HashMap[String, Int]() val executorToTasksActive = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]() val executorToDuration = HashMap[String, Long]() + val executorToJvmGCTime = HashMap[String, Long]() val executorToInputBytes = HashMap[String, Long]() val executorToInputRecords = HashMap[String, Long]() val executorToOutputBytes = HashMap[String, Long]() @@ -62,6 +65,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized { val eid = executorAdded.executorId executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap + executorToTasksMax(eid) = + executorAdded.executorInfo.totalCores / conf.getInt("spark.task.cpus", 1) executorIdToData(eid) = ExecutorUIData(executorAdded.time) } @@ -131,6 +136,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp executorToShuffleWrite(eid) = executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten } + executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c037d254/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json ---------------------------------------------------------------------- diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json index cb622e1..94f8aea 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json @@ -4,11 +4,13 @@ "rddBlocks" : 8, "memoryUsed" : 28000128, "diskUsed" : 0, + "maxTasks" : 0, "activeTasks" : 0, "failedTasks" : 1, "completedTasks" : 31, "totalTasks" : 32, "totalDuration" : 8820, + "totalGCTime" : 352, "totalInputBytes" : 28000288, "totalShuffleRead" : 0, "totalShuffleWrite" : 13180, http://git-wip-us.apache.org/repos/asf/spark/blob/c037d254/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index c65fae4..501456b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -127,6 +127,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockManager"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.ExternalBlockStore") + ) ++ Seq( + // SPARK-12149 Added new fields to ExecutorSummary + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this") ) ++ // SPARK-12665 Remove deprecated and unused classes Seq( @@ -301,6 +304,9 @@ object MimaExcludes { // SPARK-3580 Add getNumPartitions method to JavaRDD ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.api.java.JavaRDDLike.getNumPartitions") + ) ++ Seq( + // SPARK-12149 Added new fields to ExecutorSummary + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorSummary.this") ) ++ // SPARK-11314: YARN backend moved to yarn sub-module and MiMA complains even though it's a // private class. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org