Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r160004385
  
    --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
    @@ -110,107 +114,240 @@ private[spark] class AppStatusStore(
         if (details) stageWithDetails(stage) else stage
       }
     
    +  def taskCount(stageId: Int, stageAttemptId: Int): Long = {
    +    store.count(classOf[TaskDataWrapper], "stage", Array(stageId, 
stageAttemptId))
    +  }
    +
    +  def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, 
Long] = {
    +    store.read(classOf[StageDataWrapper], Array(stageId, 
stageAttemptId)).locality
    +  }
    +
    +  /**
    +   * Calculates a summary of the task metrics for the given stage attempt, 
returning the
    +   * requested quantiles for the recorded metrics.
    +   *
    +   * This method can be expensive if the requested quantiles are not 
cached; the method
    +   * will only cache certain quantiles (every 0.05 step), so it's 
recommended to stick to
    +   * those to avoid expensive scans of all task data.
    +   */
       def taskSummary(
           stageId: Int,
           stageAttemptId: Int,
    -      quantiles: Array[Double]): v1.TaskMetricDistributions = {
    -
    -    val stage = Array(stageId, stageAttemptId)
    -
    -    val rawMetrics = store.view(classOf[TaskDataWrapper])
    -      .index("stage")
    -      .first(stage)
    -      .last(stage)
    -      .asScala
    -      .flatMap(_.info.taskMetrics)
    -      .toList
    -      .view
    -
    -    def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
    -      Distribution(rawMetrics.map { d => f(d) 
}).get.getQuantiles(quantiles)
    -
    -    // We need to do a lot of similar munging to nested metrics here.  For 
each one,
    -    // we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
    -    // (c) shove the distribution into the right field in our return type 
and (d) only return
    -    // a result if the option is defined for any of the tasks.  
MetricHelper is a little util
    -    // to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
    -    // implement one "build" method, which just builds the quantiles for 
each field.
    -
    -    val inputMetrics =
    -      new MetricHelper[v1.InputMetrics, 
v1.InputMetricDistributions](rawMetrics, quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = 
raw.inputMetrics
    -
    -        def build: v1.InputMetricDistributions = new 
v1.InputMetricDistributions(
    -          bytesRead = submetricQuantiles(_.bytesRead),
    -          recordsRead = submetricQuantiles(_.recordsRead)
    -        )
    -      }.build
    -
    -    val outputMetrics =
    -      new MetricHelper[v1.OutputMetrics, 
v1.OutputMetricDistributions](rawMetrics, quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = 
raw.outputMetrics
    -
    -        def build: v1.OutputMetricDistributions = new 
v1.OutputMetricDistributions(
    -          bytesWritten = submetricQuantiles(_.bytesWritten),
    -          recordsWritten = submetricQuantiles(_.recordsWritten)
    -        )
    -      }.build
    -
    -    val shuffleReadMetrics =
    -      new MetricHelper[v1.ShuffleReadMetrics, 
v1.ShuffleReadMetricDistributions](rawMetrics,
    -        quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics =
    -          raw.shuffleReadMetrics
    -
    -        def build: v1.ShuffleReadMetricDistributions = new 
v1.ShuffleReadMetricDistributions(
    -          readBytes = submetricQuantiles { s => s.localBytesRead + 
s.remoteBytesRead },
    -          readRecords = submetricQuantiles(_.recordsRead),
    -          remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
    -          remoteBytesReadToDisk = 
submetricQuantiles(_.remoteBytesReadToDisk),
    -          remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
    -          localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
    -          totalBlocksFetched = submetricQuantiles { s =>
    -            s.localBlocksFetched + s.remoteBlocksFetched
    -          },
    -          fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
    -        )
    -      }.build
    -
    -    val shuffleWriteMetrics =
    -      new MetricHelper[v1.ShuffleWriteMetrics, 
v1.ShuffleWriteMetricDistributions](rawMetrics,
    -        quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics =
    -          raw.shuffleWriteMetrics
    -
    -        def build: v1.ShuffleWriteMetricDistributions = new 
v1.ShuffleWriteMetricDistributions(
    -          writeBytes = submetricQuantiles(_.bytesWritten),
    -          writeRecords = submetricQuantiles(_.recordsWritten),
    -          writeTime = submetricQuantiles(_.writeTime)
    -        )
    -      }.build
    -
    -    new v1.TaskMetricDistributions(
    +      unsortedQuantiles: Array[Double]): 
Option[v1.TaskMetricDistributions] = {
    +    val stageKey = Array(stageId, stageAttemptId)
    +    val quantiles = unsortedQuantiles.sorted
    +
    +    // We don't know how many tasks remain in the store that actually have 
metrics. So scan one
    +    // metric and count how many valid tasks there are. Use skip() instead 
of next() since it's
    +    // cheaper for disk stores (avoids deserialization).
    +    val count = {
    +      Utils.tryWithResource(
    +        store.view(classOf[TaskDataWrapper])
    +          .parent(stageKey)
    +          .index(TaskIndexNames.EXEC_RUN_TIME)
    +          .first(0L)
    +          .closeableIterator()
    +      ) { it =>
    +        var _count = 0L
    +        while (it.hasNext()) {
    +          _count += 1
    +          it.skip(1)
    +        }
    +        _count
    +      }
    +    }
    +
    +    if (count <= 0) {
    +      return None
    +    }
    +
    +    // Find out which quantiles are already cached. The data in the store 
must match the expected
    +    // task count to be considered, otherwise it will be re-scanned and 
overwritten.
    +    val cachedQuantiles = quantiles.filter(shouldCacheQuantile).flatMap { 
q =>
    +      val qkey = Array(stageId, stageAttemptId, quantileToString(q))
    +      asOption(store.read(classOf[CachedQuantile], 
qkey)).filter(_.taskCount == count)
    +    }
    +
    +    // If there are no missing quantiles, return the data. Otherwise, just 
compute everything
    +    // to make the code simpler.
    +    if (cachedQuantiles.size == quantiles.size) {
    +      def toValues(fn: CachedQuantile => Double): IndexedSeq[Double] = {
    +        cachedQuantiles.map(fn).toIndexedSeq
    +      }
    +
    +      val distributions = new v1.TaskMetricDistributions(
    +        quantiles = quantiles,
    +        executorDeserializeTime = toValues(_.executorDeserializeTime),
    +        executorDeserializeCpuTime = 
toValues(_.executorDeserializeCpuTime),
    +        executorRunTime = toValues(_.executorRunTime),
    +        executorCpuTime = toValues(_.executorCpuTime),
    +        resultSize = toValues(_.resultSize),
    +        jvmGcTime = toValues(_.jvmGcTime),
    +        resultSerializationTime = toValues(_.resultSerializationTime),
    +        gettingResultTime = toValues(_.gettingResultTime),
    +        schedulerDelay = toValues(_.schedulerDelay),
    +        peakExecutionMemory = toValues(_.peakExecutionMemory),
    +        memoryBytesSpilled = toValues(_.memoryBytesSpilled),
    +        diskBytesSpilled = toValues(_.diskBytesSpilled),
    +        inputMetrics = new v1.InputMetricDistributions(
    +          toValues(_.bytesRead),
    +          toValues(_.recordsRead)),
    +        outputMetrics = new v1.OutputMetricDistributions(
    +          toValues(_.bytesWritten),
    +          toValues(_.recordsWritten)),
    +        shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
    +          toValues(_.shuffleReadBytes),
    +          toValues(_.shuffleRecordsRead),
    +          toValues(_.shuffleRemoteBlocksFetched),
    +          toValues(_.shuffleLocalBlocksFetched),
    +          toValues(_.shuffleFetchWaitTime),
    +          toValues(_.shuffleRemoteBytesRead),
    +          toValues(_.shuffleRemoteBytesReadToDisk),
    +          toValues(_.shuffleTotalBlocksFetched)),
    +        shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
    +          toValues(_.shuffleWriteBytes),
    +          toValues(_.shuffleWriteRecords),
    +          toValues(_.shuffleWriteTime)))
    +
    +      return Some(distributions)
    +    }
    +
    +    // Compute quantiles by scanning the tasks in the store. This is not 
really stable for live
    +    // stages (e.g. the number of recorded tasks may change while this 
code is running), but should
    +    // stabilize once the stage finishes. It's also slow, especially with 
disk stores.
    +    val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
    +
    +    def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
    --- End diff --
    
    Funny you mention that, my first implementation actually used t-digest. But 
it's a bit slow for this purpose (the unreleased version at the time was 
faster) and uses quite a bit of memory, so I gave up trying that for now. It 
also changes the computed values (since they'd be approximations), which 
requires changing all the golden files (argh).
    
    There's some code for approximate quantiles in the sql module that could 
potentially be refactored.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to