Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r159992789
  
    --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
    @@ -110,107 +114,240 @@ private[spark] class AppStatusStore(
         if (details) stageWithDetails(stage) else stage
       }
     
    +  def taskCount(stageId: Int, stageAttemptId: Int): Long = {
    +    store.count(classOf[TaskDataWrapper], "stage", Array(stageId, 
stageAttemptId))
    +  }
    +
    +  def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, 
Long] = {
    +    store.read(classOf[StageDataWrapper], Array(stageId, 
stageAttemptId)).locality
    +  }
    +
    +  /**
    +   * Calculates a summary of the task metrics for the given stage attempt, 
returning the
    +   * requested quantiles for the recorded metrics.
    +   *
    +   * This method can be expensive if the requested quantiles are not 
cached; the method
    +   * will only cache certain quantiles (every 0.05 step), so it's 
recommended to stick to
    +   * those to avoid expensive scans of all task data.
    +   */
       def taskSummary(
           stageId: Int,
           stageAttemptId: Int,
    -      quantiles: Array[Double]): v1.TaskMetricDistributions = {
    -
    -    val stage = Array(stageId, stageAttemptId)
    -
    -    val rawMetrics = store.view(classOf[TaskDataWrapper])
    -      .index("stage")
    -      .first(stage)
    -      .last(stage)
    -      .asScala
    -      .flatMap(_.info.taskMetrics)
    -      .toList
    -      .view
    -
    -    def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
    -      Distribution(rawMetrics.map { d => f(d) 
}).get.getQuantiles(quantiles)
    -
    -    // We need to do a lot of similar munging to nested metrics here.  For 
each one,
    -    // we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
    -    // (c) shove the distribution into the right field in our return type 
and (d) only return
    -    // a result if the option is defined for any of the tasks.  
MetricHelper is a little util
    -    // to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
    -    // implement one "build" method, which just builds the quantiles for 
each field.
    -
    -    val inputMetrics =
    -      new MetricHelper[v1.InputMetrics, 
v1.InputMetricDistributions](rawMetrics, quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = 
raw.inputMetrics
    -
    -        def build: v1.InputMetricDistributions = new 
v1.InputMetricDistributions(
    -          bytesRead = submetricQuantiles(_.bytesRead),
    -          recordsRead = submetricQuantiles(_.recordsRead)
    -        )
    -      }.build
    -
    -    val outputMetrics =
    -      new MetricHelper[v1.OutputMetrics, 
v1.OutputMetricDistributions](rawMetrics, quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = 
raw.outputMetrics
    -
    -        def build: v1.OutputMetricDistributions = new 
v1.OutputMetricDistributions(
    -          bytesWritten = submetricQuantiles(_.bytesWritten),
    -          recordsWritten = submetricQuantiles(_.recordsWritten)
    -        )
    -      }.build
    -
    -    val shuffleReadMetrics =
    -      new MetricHelper[v1.ShuffleReadMetrics, 
v1.ShuffleReadMetricDistributions](rawMetrics,
    -        quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics =
    -          raw.shuffleReadMetrics
    -
    -        def build: v1.ShuffleReadMetricDistributions = new 
v1.ShuffleReadMetricDistributions(
    -          readBytes = submetricQuantiles { s => s.localBytesRead + 
s.remoteBytesRead },
    -          readRecords = submetricQuantiles(_.recordsRead),
    -          remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
    -          remoteBytesReadToDisk = 
submetricQuantiles(_.remoteBytesReadToDisk),
    -          remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
    -          localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
    -          totalBlocksFetched = submetricQuantiles { s =>
    -            s.localBlocksFetched + s.remoteBlocksFetched
    -          },
    -          fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
    -        )
    -      }.build
    -
    -    val shuffleWriteMetrics =
    -      new MetricHelper[v1.ShuffleWriteMetrics, 
v1.ShuffleWriteMetricDistributions](rawMetrics,
    -        quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics =
    -          raw.shuffleWriteMetrics
    -
    -        def build: v1.ShuffleWriteMetricDistributions = new 
v1.ShuffleWriteMetricDistributions(
    -          writeBytes = submetricQuantiles(_.bytesWritten),
    -          writeRecords = submetricQuantiles(_.recordsWritten),
    -          writeTime = submetricQuantiles(_.writeTime)
    -        )
    -      }.build
    -
    -    new v1.TaskMetricDistributions(
    +      unsortedQuantiles: Array[Double]): 
Option[v1.TaskMetricDistributions] = {
    +    val stageKey = Array(stageId, stageAttemptId)
    +    val quantiles = unsortedQuantiles.sorted
    +
    +    // We don't know how many tasks remain in the store that actually have 
metrics. So scan one
    +    // metric and count how many valid tasks there are. Use skip() instead 
of next() since it's
    +    // cheaper for disk stores (avoids deserialization).
    +    val count = {
    +      Utils.tryWithResource(
    +        store.view(classOf[TaskDataWrapper])
    +          .parent(stageKey)
    +          .index(TaskIndexNames.EXEC_RUN_TIME)
    +          .first(0L)
    +          .closeableIterator()
    +      ) { it =>
    +        var _count = 0L
    +        while (it.hasNext()) {
    +          _count += 1
    +          it.skip(1)
    +        }
    +        _count
    +      }
    +    }
    +
    +    if (count <= 0) {
    +      return None
    +    }
    +
    +    // Find out which quantiles are already cached. The data in the store 
must match the expected
    +    // task count to be considered, otherwise it will be re-scanned and 
overwritten.
    +    val cachedQuantiles = quantiles.filter(shouldCacheQuantile).flatMap { 
q =>
    +      val qkey = Array(stageId, stageAttemptId, quantileToString(q))
    +      asOption(store.read(classOf[CachedQuantile], 
qkey)).filter(_.taskCount == count)
    +    }
    +
    +    // If there are no missing quantiles, return the data. Otherwise, just 
compute everything
    +    // to make the code simpler.
    +    if (cachedQuantiles.size == quantiles.size) {
    +      def toValues(fn: CachedQuantile => Double): IndexedSeq[Double] = {
    +        cachedQuantiles.map(fn).toIndexedSeq
    +      }
    +
    +      val distributions = new v1.TaskMetricDistributions(
    +        quantiles = quantiles,
    +        executorDeserializeTime = toValues(_.executorDeserializeTime),
    +        executorDeserializeCpuTime = 
toValues(_.executorDeserializeCpuTime),
    +        executorRunTime = toValues(_.executorRunTime),
    +        executorCpuTime = toValues(_.executorCpuTime),
    +        resultSize = toValues(_.resultSize),
    +        jvmGcTime = toValues(_.jvmGcTime),
    +        resultSerializationTime = toValues(_.resultSerializationTime),
    +        gettingResultTime = toValues(_.gettingResultTime),
    +        schedulerDelay = toValues(_.schedulerDelay),
    +        peakExecutionMemory = toValues(_.peakExecutionMemory),
    +        memoryBytesSpilled = toValues(_.memoryBytesSpilled),
    +        diskBytesSpilled = toValues(_.diskBytesSpilled),
    +        inputMetrics = new v1.InputMetricDistributions(
    +          toValues(_.bytesRead),
    +          toValues(_.recordsRead)),
    +        outputMetrics = new v1.OutputMetricDistributions(
    +          toValues(_.bytesWritten),
    +          toValues(_.recordsWritten)),
    +        shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
    +          toValues(_.shuffleReadBytes),
    +          toValues(_.shuffleRecordsRead),
    +          toValues(_.shuffleRemoteBlocksFetched),
    +          toValues(_.shuffleLocalBlocksFetched),
    +          toValues(_.shuffleFetchWaitTime),
    +          toValues(_.shuffleRemoteBytesRead),
    +          toValues(_.shuffleRemoteBytesReadToDisk),
    +          toValues(_.shuffleTotalBlocksFetched)),
    +        shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
    +          toValues(_.shuffleWriteBytes),
    +          toValues(_.shuffleWriteRecords),
    +          toValues(_.shuffleWriteTime)))
    +
    +      return Some(distributions)
    +    }
    +
    +    // Compute quantiles by scanning the tasks in the store. This is not 
really stable for live
    +    // stages (e.g. the number of recorded tasks may change while this 
code is running), but should
    +    // stabilize once the stage finishes. It's also slow, especially with 
disk stores.
    +    val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
    +
    +    def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
    --- End diff --
    
    aside: an alternative would be to use a datastructure which supports online 
approximate quantile computation.  tdigest is the one I'm familiar with, 
perhaps there are others..  But lets leave exploring that for the future.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to