This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 546e39c5dab [SPARK-44490][WEBUI] Remove unused `TaskPagedTable` in 
StagePage
546e39c5dab is described below

commit 546e39c5dabc1111243ab81b6238dc893d9993e0
Author: sychen <syc...@ctrip.com>
AuthorDate: Tue Aug 1 15:37:27 2023 +0900

    [SPARK-44490][WEBUI] Remove unused `TaskPagedTable` in StagePage
    
    ### What changes were proposed in this pull request?
     Remove `TaskPagedTable`
    
    ### Why are the changes needed?
    In [SPARK-21809](https://issues.apache.org/jira/browse/SPARK-21809), we 
introduced `stagespage-template.html` to show the running status of Stage.
    `TaskPagedTable` is no longer effective, but there are still many PRs 
updating related codes.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    local test
    
    Closes #42085 from cxzl25/SPARK-44490.
    
    Authored-by: sychen <syc...@ctrip.com>
    Signed-off-by: Kousuke Saruta <saru...@apache.org>
---
 .../scala/org/apache/spark/ui/jobs/StagePage.scala | 301 +--------------------
 .../scala/org/apache/spark/ui/StagePageSuite.scala |  12 +-
 2 files changed, 13 insertions(+), 300 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 02aece6e50a..d50ccdadff5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -17,17 +17,12 @@
 
 package org.apache.spark.ui.jobs
 
-import java.net.URLEncoder
-import java.nio.charset.StandardCharsets.UTF_8
 import java.util.Date
-import java.util.concurrent.TimeUnit
 import javax.servlet.http.HttpServletRequest
 
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.HashSet
 import scala.xml.{Node, Unparsed}
 
-import org.apache.commons.text.StringEscapeUtils
-
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.scheduler.TaskLocality
 import org.apache.spark.status._
@@ -209,32 +204,20 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
     val dagViz = UIUtils.showDagVizForStage(stageId, stageGraph)
 
     val currentTime = System.currentTimeMillis()
-    val taskTable = try {
-      val _taskTable = new TaskPagedTable(
-        stageData,
-        UIUtils.prependBaseUri(request, parent.basePath) +
-          s"/stages/stage/?id=${stageId}&attempt=${stageAttemptId}",
-        pageSize = taskPageSize,
-        sortColumn = taskSortColumn,
-        desc = taskSortDesc,
-        store = parent.store
-      )
-      _taskTable
-    } catch {
-      case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) 
=>
-        null
-    }
 
     val content =
       summary ++
       dagViz ++ <div id="showAdditionalMetrics"></div> ++
       makeTimeline(
         // Only show the tasks in the table
-        Option(taskTable).map({ taskPagedTable =>
+        () => {
           val from = (eventTimelineTaskPage - 1) * eventTimelineTaskPageSize
-          val to = taskPagedTable.dataSource.dataSize.min(
-            eventTimelineTaskPage * eventTimelineTaskPageSize)
-          taskPagedTable.dataSource.sliceData(from, to)}).getOrElse(Nil), 
currentTime,
+          val dataSize = store.taskCount(stageData.stageId, 
stageData.attemptId).toInt
+          val to = dataSize.min(eventTimelineTaskPage * 
eventTimelineTaskPageSize)
+          val sliceData = store.taskList(stageData.stageId, 
stageData.attemptId, from, to - from,
+            indexName(taskSortColumn), !taskSortDesc)
+          sliceData
+        }, currentTime,
         eventTimelineTaskPage, eventTimelineTaskPageSize, 
eventTimelineTotalPages, stageId,
         stageAttemptId, totalTasks) ++
         <div id="parent-container">
@@ -246,8 +229,8 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 
   }
 
-  def makeTimeline(
-      tasks: Seq[TaskData],
+  private def makeTimeline(
+      tasksFunc: () => Seq[TaskData],
       currentTime: Long,
       page: Int,
       pageSize: Int,
@@ -258,6 +241,8 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 
     if (!TIMELINE_ENABLED) return Seq.empty[Node]
 
+    val tasks = tasksFunc()
+
     val executorsSet = new HashSet[(String, String)]
     var minLaunchTime = Long.MaxValue
     var maxFinishTime = Long.MinValue
@@ -453,268 +438,6 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
 
 }
 
-private[ui] class TaskDataSource(
-    stage: StageData,
-    pageSize: Int,
-    sortColumn: String,
-    desc: Boolean,
-    store: AppStatusStore) extends PagedDataSource[TaskData](pageSize) {
-  import ApiHelper._
-
-  // Keep an internal cache of executor log maps so that long task lists 
render faster.
-  private val executorIdToLogs = new HashMap[String, Map[String, String]]()
-
-  private var _tasksToShow: Seq[TaskData] = null
-
-  override def dataSize: Int = store.taskCount(stage.stageId, 
stage.attemptId).toInt
-
-  override def sliceData(from: Int, to: Int): Seq[TaskData] = {
-    if (_tasksToShow == null) {
-      _tasksToShow = store.taskList(stage.stageId, stage.attemptId, from, to - 
from,
-        indexName(sortColumn), !desc)
-    }
-    _tasksToShow
-  }
-
-  def executorLogs(id: String): Map[String, String] = {
-    executorIdToLogs.getOrElseUpdate(id,
-      
store.asOption(store.executorSummary(id)).map(_.executorLogs).getOrElse(Map.empty))
-  }
-
-}
-
-private[ui] class TaskPagedTable(
-    stage: StageData,
-    basePath: String,
-    pageSize: Int,
-    sortColumn: String,
-    desc: Boolean,
-    store: AppStatusStore) extends PagedTable[TaskData] {
-
-  import ApiHelper._
-
-  private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
-
-  override def tableId: String = "task-table"
-
-  override def tableCssClass: String =
-    "table table-bordered table-sm table-striped table-head-clickable"
-
-  override def pageSizeFormField: String = "task.pageSize"
-
-  override def pageNumberFormField: String = "task.page"
-
-  override val dataSource: TaskDataSource = new TaskDataSource(
-    stage,
-    pageSize,
-    sortColumn,
-    desc,
-    store)
-
-  override def pageLink(page: Int): String = {
-    basePath +
-      s"&$pageNumberFormField=$page" +
-      s"&task.sort=$encodedSortColumn" +
-      s"&task.desc=$desc" +
-      s"&$pageSizeFormField=$pageSize"
-  }
-
-  override def goButtonFormPath: String = 
s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc"
-
-  def headers: Seq[Node] = {
-    import ApiHelper._
-
-    val taskHeadersAndCssClasses: Seq[(String, String)] =
-      Seq(
-        (HEADER_TASK_INDEX, ""), (HEADER_ID, ""), (HEADER_ATTEMPT, ""), 
(HEADER_STATUS, ""),
-        (HEADER_LOCALITY, ""), (HEADER_EXECUTOR, ""), (HEADER_HOST, ""), 
(HEADER_LAUNCH_TIME, ""),
-        (HEADER_DURATION, ""), (HEADER_SCHEDULER_DELAY, 
TaskDetailsClassNames.SCHEDULER_DELAY),
-        (HEADER_DESER_TIME, TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
-        (HEADER_GC_TIME, ""),
-        (HEADER_SER_TIME, TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
-        (HEADER_GETTING_RESULT_TIME, 
TaskDetailsClassNames.GETTING_RESULT_TIME),
-        (HEADER_PEAK_MEM, TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
-        {if (hasAccumulators(stage)) Seq((HEADER_ACCUMULATORS, "")) else Nil} 
++
-        {if (hasInput(stage)) Seq((HEADER_INPUT_SIZE, "")) else Nil} ++
-        {if (hasOutput(stage)) Seq((HEADER_OUTPUT_SIZE, "")) else Nil} ++
-        {if (hasShuffleRead(stage)) {
-          Seq((HEADER_SHUFFLE_READ_FETCH_WAIT_TIME,
-            TaskDetailsClassNames.SHUFFLE_READ_FETCH_WAIT_TIME),
-            (HEADER_SHUFFLE_TOTAL_READS, ""),
-            (HEADER_SHUFFLE_REMOTE_READS, 
TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
-        } else {
-          Nil
-        }} ++
-        {if (hasShuffleWrite(stage)) {
-          Seq((HEADER_SHUFFLE_WRITE_TIME, ""), (HEADER_SHUFFLE_WRITE_SIZE, ""))
-        } else {
-          Nil
-        }} ++
-        {if (hasBytesSpilled(stage)) {
-          Seq((HEADER_MEM_SPILL, ""), (HEADER_DISK_SPILL, ""))
-        } else {
-          Nil
-        }} ++
-        Seq((HEADER_ERROR, ""))
-
-    if (!taskHeadersAndCssClasses.map(_._1).contains(sortColumn)) {
-      throw new IllegalArgumentException(s"Unknown column: $sortColumn")
-    }
-
-    val headerRow: Seq[Node] = {
-      taskHeadersAndCssClasses.map { case (header, cssClass) =>
-        if (header == sortColumn) {
-          val headerLink = Unparsed(
-            basePath +
-              s"&task.sort=${URLEncoder.encode(header, UTF_8.name())}" +
-              s"&task.desc=${!desc}" +
-              s"&task.pageSize=$pageSize")
-          val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
-          <th class={cssClass}>
-            <a href={headerLink}>
-              {header}
-              <span>&nbsp;{Unparsed(arrow)}</span>
-            </a>
-          </th>
-        } else {
-          val headerLink = Unparsed(
-            basePath +
-              s"&task.sort=${URLEncoder.encode(header, UTF_8.name())}" +
-              s"&task.pageSize=$pageSize")
-          <th class={cssClass}>
-            <a href={headerLink}>
-              {header}
-            </a>
-          </th>
-        }
-      }
-    }
-    <thead>{headerRow}</thead>
-  }
-
-  def row(task: TaskData): Seq[Node] = {
-    def formatDuration(value: Option[Long], hideZero: Boolean = false): String 
= {
-      value.map { v =>
-        if (v > 0 || !hideZero) UIUtils.formatDuration(v) else ""
-      }.getOrElse("")
-    }
-
-    def formatBytes(value: Option[Long]): String = {
-      Utils.bytesToString(value.getOrElse(0L))
-    }
-
-    <tr>
-      <td>{task.index}</td>
-      <td>{task.taskId}</td>
-      <td>{if (task.speculative) s"${task.attempt} (speculative)" else 
task.attempt.toString}</td>
-      <td>{task.status}</td>
-      <td>{task.taskLocality}</td>
-      <td>{task.executorId}</td>
-      <td>
-        <div style="float: left">{task.host}</div>
-        <div style="float: right">
-        {
-          dataSource.executorLogs(task.executorId).map {
-            case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
-          }
-        }
-        </div>
-      </td>
-      <td>{UIUtils.formatDate(task.launchTime)}</td>
-      <td>{formatDuration(task.taskMetrics.map(_.executorRunTime))}</td>
-      <td class={TaskDetailsClassNames.SCHEDULER_DELAY}>
-        {UIUtils.formatDuration(AppStatusUtils.schedulerDelay(task))}
-      </td>
-      <td class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
-        {formatDuration(task.taskMetrics.map(_.executorDeserializeTime))}
-      </td>
-      <td>
-        {formatDuration(task.taskMetrics.map(_.jvmGcTime), hideZero = true)}
-      </td>
-      <td class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
-        {formatDuration(task.taskMetrics.map(_.resultSerializationTime))}
-      </td>
-      <td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
-        {UIUtils.formatDuration(AppStatusUtils.gettingResultTime(task))}
-      </td>
-      <td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
-        {formatBytes(task.taskMetrics.map(_.peakExecutionMemory))}
-      </td>
-      {if (hasAccumulators(stage)) {
-        <td>{accumulatorsInfo(task)}</td>
-      }}
-      {if (hasInput(stage)) {
-        <td>{
-          metricInfo(task) { m =>
-            val bytesRead = Utils.bytesToString(m.inputMetrics.bytesRead)
-            val records = m.inputMetrics.recordsRead
-            Unparsed(s"$bytesRead / $records")
-          }
-        }</td>
-      }}
-      {if (hasOutput(stage)) {
-        <td>{
-          metricInfo(task) { m =>
-            val bytesWritten = 
Utils.bytesToString(m.outputMetrics.bytesWritten)
-            val records = m.outputMetrics.recordsWritten
-            Unparsed(s"$bytesWritten / $records")
-          }
-        }</td>
-      }}
-      {if (hasShuffleRead(stage)) {
-        <td class={TaskDetailsClassNames.SHUFFLE_READ_FETCH_WAIT_TIME}>
-          
{formatDuration(task.taskMetrics.map(_.shuffleReadMetrics.fetchWaitTime))}
-        </td>
-        <td>{
-          metricInfo(task) { m =>
-            val bytesRead = 
Utils.bytesToString(totalBytesRead(m.shuffleReadMetrics))
-            val records = m.shuffleReadMetrics.recordsRead
-            Unparsed(s"$bytesRead / $records")
-          }
-        }</td>
-        <td class={TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE}>
-          
{formatBytes(task.taskMetrics.map(_.shuffleReadMetrics.remoteBytesRead))}
-        </td>
-      }}
-      {if (hasShuffleWrite(stage)) {
-        <td>{
-          formatDuration(
-            task.taskMetrics.map { m =>
-              TimeUnit.NANOSECONDS.toMillis(m.shuffleWriteMetrics.writeTime)
-            },
-            hideZero = true)
-        }</td>
-        <td>{
-          metricInfo(task) { m =>
-            val bytesWritten = 
Utils.bytesToString(m.shuffleWriteMetrics.bytesWritten)
-            val records = m.shuffleWriteMetrics.recordsWritten
-            Unparsed(s"$bytesWritten / $records")
-          }
-        }</td>
-      }}
-      {if (hasBytesSpilled(stage)) {
-        <td>{formatBytes(task.taskMetrics.map(_.memoryBytesSpilled))}</td>
-        <td>{formatBytes(task.taskMetrics.map(_.diskBytesSpilled))}</td>
-      }}
-      {UIUtils.errorMessageCell(task.errorMessage.getOrElse(""))}
-    </tr>
-  }
-
-  private def accumulatorsInfo(task: TaskData): Seq[Node] = {
-    task.accumulatorUpdates.flatMap { acc =>
-      if (acc.name != null && acc.update.isDefined) {
-        Unparsed(StringEscapeUtils.escapeHtml4(s"${acc.name}: 
${acc.update.get}")) ++ <br />
-      } else {
-        Nil
-      }
-    }
-  }
-
-  private def metricInfo(task: TaskData)(fn: TaskMetrics => Seq[Node]): 
Seq[Node] = {
-    task.taskMetrics.map(fn).getOrElse(Nil)
-  }
-}
-
 private[spark] object ApiHelper {
 
   val HEADER_ID = "ID"
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 3108dca5faf..6565b9b50a9 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler._
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1.{AccumulableInfo => UIAccumulableInfo, 
StageData, StageStatus}
-import org.apache.spark.ui.jobs.{ApiHelper, StagePage, StagesTab, 
TaskPagedTable}
+import org.apache.spark.ui.jobs.{StagePage, StagesTab}
 
 class StagePageSuite extends SparkFunSuite with LocalSparkContext {
 
@@ -110,16 +110,6 @@ class StagePageSuite extends SparkFunSuite with 
LocalSparkContext {
         isShufflePushEnabled = false,
         shuffleMergersCount = 0
       )
-      val taskTable = new TaskPagedTable(
-        stageData,
-        basePath = "/a/b/c",
-        pageSize = 10,
-        sortColumn = "Index",
-        desc = false,
-        store = statusStore
-      )
-      val columnNames = (taskTable.headers \ "th" \ 
"a").map(_.child(1).text).toSet
-      assert(columnNames === ApiHelper.COLUMN_TO_INDEX.keySet)
     } finally {
       statusStore.close()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to