This is an automated email from the ASF dual-hosted git repository. sarutak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 546e39c5dab [SPARK-44490][WEBUI] Remove unused `TaskPagedTable` in StagePage 546e39c5dab is described below commit 546e39c5dabc1111243ab81b6238dc893d9993e0 Author: sychen <syc...@ctrip.com> AuthorDate: Tue Aug 1 15:37:27 2023 +0900 [SPARK-44490][WEBUI] Remove unused `TaskPagedTable` in StagePage ### What changes were proposed in this pull request? Remove `TaskPagedTable` ### Why are the changes needed? In [SPARK-21809](https://issues.apache.org/jira/browse/SPARK-21809), we introduced `stagespage-template.html` to show the running status of Stage. `TaskPagedTable` is no longer effective, but there are still many PRs updating related codes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? local test Closes #42085 from cxzl25/SPARK-44490. Authored-by: sychen <syc...@ctrip.com> Signed-off-by: Kousuke Saruta <saru...@apache.org> --- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 301 +-------------------- .../scala/org/apache/spark/ui/StagePageSuite.scala | 12 +- 2 files changed, 13 insertions(+), 300 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 02aece6e50a..d50ccdadff5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -17,17 +17,12 @@ package org.apache.spark.ui.jobs -import java.net.URLEncoder -import java.nio.charset.StandardCharsets.UTF_8 import java.util.Date -import java.util.concurrent.TimeUnit import javax.servlet.http.HttpServletRequest -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable.HashSet import scala.xml.{Node, Unparsed} -import org.apache.commons.text.StringEscapeUtils - import org.apache.spark.internal.config.UI._ import org.apache.spark.scheduler.TaskLocality import org.apache.spark.status._ @@ -209,32 +204,20 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We val dagViz = UIUtils.showDagVizForStage(stageId, stageGraph) val currentTime = System.currentTimeMillis() - val taskTable = try { - val _taskTable = new TaskPagedTable( - stageData, - UIUtils.prependBaseUri(request, parent.basePath) + - s"/stages/stage/?id=${stageId}&attempt=${stageAttemptId}", - pageSize = taskPageSize, - sortColumn = taskSortColumn, - desc = taskSortDesc, - store = parent.store - ) - _taskTable - } catch { - case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) => - null - } val content = summary ++ dagViz ++ <div id="showAdditionalMetrics"></div> ++ makeTimeline( // Only show the tasks in the table - Option(taskTable).map({ taskPagedTable => + () => { val from = (eventTimelineTaskPage - 1) * eventTimelineTaskPageSize - val to = taskPagedTable.dataSource.dataSize.min( - eventTimelineTaskPage * eventTimelineTaskPageSize) - taskPagedTable.dataSource.sliceData(from, to)}).getOrElse(Nil), currentTime, + val dataSize = store.taskCount(stageData.stageId, stageData.attemptId).toInt + val to = dataSize.min(eventTimelineTaskPage * eventTimelineTaskPageSize) + val sliceData = store.taskList(stageData.stageId, stageData.attemptId, from, to - from, + indexName(taskSortColumn), !taskSortDesc) + sliceData + }, currentTime, eventTimelineTaskPage, eventTimelineTaskPageSize, eventTimelineTotalPages, stageId, stageAttemptId, totalTasks) ++ <div id="parent-container"> @@ -246,8 +229,8 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We } - def makeTimeline( - tasks: Seq[TaskData], + private def makeTimeline( + tasksFunc: () => Seq[TaskData], currentTime: Long, page: Int, pageSize: Int, @@ -258,6 +241,8 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We if (!TIMELINE_ENABLED) return Seq.empty[Node] + val tasks = tasksFunc() + val executorsSet = new HashSet[(String, String)] var minLaunchTime = Long.MaxValue var maxFinishTime = Long.MinValue @@ -453,268 +438,6 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We } -private[ui] class TaskDataSource( - stage: StageData, - pageSize: Int, - sortColumn: String, - desc: Boolean, - store: AppStatusStore) extends PagedDataSource[TaskData](pageSize) { - import ApiHelper._ - - // Keep an internal cache of executor log maps so that long task lists render faster. - private val executorIdToLogs = new HashMap[String, Map[String, String]]() - - private var _tasksToShow: Seq[TaskData] = null - - override def dataSize: Int = store.taskCount(stage.stageId, stage.attemptId).toInt - - override def sliceData(from: Int, to: Int): Seq[TaskData] = { - if (_tasksToShow == null) { - _tasksToShow = store.taskList(stage.stageId, stage.attemptId, from, to - from, - indexName(sortColumn), !desc) - } - _tasksToShow - } - - def executorLogs(id: String): Map[String, String] = { - executorIdToLogs.getOrElseUpdate(id, - store.asOption(store.executorSummary(id)).map(_.executorLogs).getOrElse(Map.empty)) - } - -} - -private[ui] class TaskPagedTable( - stage: StageData, - basePath: String, - pageSize: Int, - sortColumn: String, - desc: Boolean, - store: AppStatusStore) extends PagedTable[TaskData] { - - import ApiHelper._ - - private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) - - override def tableId: String = "task-table" - - override def tableCssClass: String = - "table table-bordered table-sm table-striped table-head-clickable" - - override def pageSizeFormField: String = "task.pageSize" - - override def pageNumberFormField: String = "task.page" - - override val dataSource: TaskDataSource = new TaskDataSource( - stage, - pageSize, - sortColumn, - desc, - store) - - override def pageLink(page: Int): String = { - basePath + - s"&$pageNumberFormField=$page" + - s"&task.sort=$encodedSortColumn" + - s"&task.desc=$desc" + - s"&$pageSizeFormField=$pageSize" - } - - override def goButtonFormPath: String = s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc" - - def headers: Seq[Node] = { - import ApiHelper._ - - val taskHeadersAndCssClasses: Seq[(String, String)] = - Seq( - (HEADER_TASK_INDEX, ""), (HEADER_ID, ""), (HEADER_ATTEMPT, ""), (HEADER_STATUS, ""), - (HEADER_LOCALITY, ""), (HEADER_EXECUTOR, ""), (HEADER_HOST, ""), (HEADER_LAUNCH_TIME, ""), - (HEADER_DURATION, ""), (HEADER_SCHEDULER_DELAY, TaskDetailsClassNames.SCHEDULER_DELAY), - (HEADER_DESER_TIME, TaskDetailsClassNames.TASK_DESERIALIZATION_TIME), - (HEADER_GC_TIME, ""), - (HEADER_SER_TIME, TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), - (HEADER_GETTING_RESULT_TIME, TaskDetailsClassNames.GETTING_RESULT_TIME), - (HEADER_PEAK_MEM, TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++ - {if (hasAccumulators(stage)) Seq((HEADER_ACCUMULATORS, "")) else Nil} ++ - {if (hasInput(stage)) Seq((HEADER_INPUT_SIZE, "")) else Nil} ++ - {if (hasOutput(stage)) Seq((HEADER_OUTPUT_SIZE, "")) else Nil} ++ - {if (hasShuffleRead(stage)) { - Seq((HEADER_SHUFFLE_READ_FETCH_WAIT_TIME, - TaskDetailsClassNames.SHUFFLE_READ_FETCH_WAIT_TIME), - (HEADER_SHUFFLE_TOTAL_READS, ""), - (HEADER_SHUFFLE_REMOTE_READS, TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE)) - } else { - Nil - }} ++ - {if (hasShuffleWrite(stage)) { - Seq((HEADER_SHUFFLE_WRITE_TIME, ""), (HEADER_SHUFFLE_WRITE_SIZE, "")) - } else { - Nil - }} ++ - {if (hasBytesSpilled(stage)) { - Seq((HEADER_MEM_SPILL, ""), (HEADER_DISK_SPILL, "")) - } else { - Nil - }} ++ - Seq((HEADER_ERROR, "")) - - if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) { - throw new IllegalArgumentException(s"Unknown column: $sortColumn") - } - - val headerRow: Seq[Node] = { - taskHeadersAndCssClasses.map { case (header, cssClass) => - if (header == sortColumn) { - val headerLink = Unparsed( - basePath + - s"&task.sort=${URLEncoder.encode(header, UTF_8.name())}" + - s"&task.desc=${!desc}" + - s"&task.pageSize=$pageSize") - val arrow = if (desc) "▾" else "▴" // UP or DOWN - <th class={cssClass}> - <a href={headerLink}> - {header} - <span> {Unparsed(arrow)}</span> - </a> - </th> - } else { - val headerLink = Unparsed( - basePath + - s"&task.sort=${URLEncoder.encode(header, UTF_8.name())}" + - s"&task.pageSize=$pageSize") - <th class={cssClass}> - <a href={headerLink}> - {header} - </a> - </th> - } - } - } - <thead>{headerRow}</thead> - } - - def row(task: TaskData): Seq[Node] = { - def formatDuration(value: Option[Long], hideZero: Boolean = false): String = { - value.map { v => - if (v > 0 || !hideZero) UIUtils.formatDuration(v) else "" - }.getOrElse("") - } - - def formatBytes(value: Option[Long]): String = { - Utils.bytesToString(value.getOrElse(0L)) - } - - <tr> - <td>{task.index}</td> - <td>{task.taskId}</td> - <td>{if (task.speculative) s"${task.attempt} (speculative)" else task.attempt.toString}</td> - <td>{task.status}</td> - <td>{task.taskLocality}</td> - <td>{task.executorId}</td> - <td> - <div style="float: left">{task.host}</div> - <div style="float: right"> - { - dataSource.executorLogs(task.executorId).map { - case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div> - } - } - </div> - </td> - <td>{UIUtils.formatDate(task.launchTime)}</td> - <td>{formatDuration(task.taskMetrics.map(_.executorRunTime))}</td> - <td class={TaskDetailsClassNames.SCHEDULER_DELAY}> - {UIUtils.formatDuration(AppStatusUtils.schedulerDelay(task))} - </td> - <td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}> - {formatDuration(task.taskMetrics.map(_.executorDeserializeTime))} - </td> - <td> - {formatDuration(task.taskMetrics.map(_.jvmGcTime), hideZero = true)} - </td> - <td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}> - {formatDuration(task.taskMetrics.map(_.resultSerializationTime))} - </td> - <td class={TaskDetailsClassNames.GETTING_RESULT_TIME}> - {UIUtils.formatDuration(AppStatusUtils.gettingResultTime(task))} - </td> - <td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}> - {formatBytes(task.taskMetrics.map(_.peakExecutionMemory))} - </td> - {if (hasAccumulators(stage)) { - <td>{accumulatorsInfo(task)}</td> - }} - {if (hasInput(stage)) { - <td>{ - metricInfo(task) { m => - val bytesRead = Utils.bytesToString(m.inputMetrics.bytesRead) - val records = m.inputMetrics.recordsRead - Unparsed(s"$bytesRead / $records") - } - }</td> - }} - {if (hasOutput(stage)) { - <td>{ - metricInfo(task) { m => - val bytesWritten = Utils.bytesToString(m.outputMetrics.bytesWritten) - val records = m.outputMetrics.recordsWritten - Unparsed(s"$bytesWritten / $records") - } - }</td> - }} - {if (hasShuffleRead(stage)) { - <td class={TaskDetailsClassNames.SHUFFLE_READ_FETCH_WAIT_TIME}> - {formatDuration(task.taskMetrics.map(_.shuffleReadMetrics.fetchWaitTime))} - </td> - <td>{ - metricInfo(task) { m => - val bytesRead = Utils.bytesToString(totalBytesRead(m.shuffleReadMetrics)) - val records = m.shuffleReadMetrics.recordsRead - Unparsed(s"$bytesRead / $records") - } - }</td> - <td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}> - {formatBytes(task.taskMetrics.map(_.shuffleReadMetrics.remoteBytesRead))} - </td> - }} - {if (hasShuffleWrite(stage)) { - <td>{ - formatDuration( - task.taskMetrics.map { m => - TimeUnit.NANOSECONDS.toMillis(m.shuffleWriteMetrics.writeTime) - }, - hideZero = true) - }</td> - <td>{ - metricInfo(task) { m => - val bytesWritten = Utils.bytesToString(m.shuffleWriteMetrics.bytesWritten) - val records = m.shuffleWriteMetrics.recordsWritten - Unparsed(s"$bytesWritten / $records") - } - }</td> - }} - {if (hasBytesSpilled(stage)) { - <td>{formatBytes(task.taskMetrics.map(_.memoryBytesSpilled))}</td> - <td>{formatBytes(task.taskMetrics.map(_.diskBytesSpilled))}</td> - }} - {UIUtils.errorMessageCell(task.errorMessage.getOrElse(""))} - </tr> - } - - private def accumulatorsInfo(task: TaskData): Seq[Node] = { - task.accumulatorUpdates.flatMap { acc => - if (acc.name != null && acc.update.isDefined) { - Unparsed(StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")) ++ <br /> - } else { - Nil - } - } - } - - private def metricInfo(task: TaskData)(fn: TaskMetrics => Seq[Node]): Seq[Node] = { - task.taskMetrics.map(fn).getOrElse(Nil) - } -} - private[spark] object ApiHelper { val HEADER_ID = "ID" diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 3108dca5faf..6565b9b50a9 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler._ import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, StageData, StageStatus} -import org.apache.spark.ui.jobs.{ApiHelper, StagePage, StagesTab, TaskPagedTable} +import org.apache.spark.ui.jobs.{StagePage, StagesTab} class StagePageSuite extends SparkFunSuite with LocalSparkContext { @@ -110,16 +110,6 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { isShufflePushEnabled = false, shuffleMergersCount = 0 ) - val taskTable = new TaskPagedTable( - stageData, - basePath = "/a/b/c", - pageSize = 10, - sortColumn = "Index", - desc = false, - store = statusStore - ) - val columnNames = (taskTable.headers \ "th" \ "a").map(_.child(1).text).toSet - assert(columnNames === ApiHelper.COLUMN_TO_INDEX.keySet) } finally { statusStore.close() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org