Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r160332614
  
    --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
---
    @@ -110,107 +114,238 @@ private[spark] class AppStatusStore(
         if (details) stageWithDetails(stage) else stage
       }
     
    +  def taskCount(stageId: Int, stageAttemptId: Int): Long = {
    +    store.count(classOf[TaskDataWrapper], "stage", Array(stageId, 
stageAttemptId))
    +  }
    +
    +  def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, 
Long] = {
    +    store.read(classOf[StageDataWrapper], Array(stageId, 
stageAttemptId)).locality
    +  }
    +
    +  /**
    +   * Calculates a summary of the task metrics for the given stage attempt, 
returning the
    +   * requested quantiles for the recorded metrics.
    +   *
    +   * This method can be expensive if the requested quantiles are not 
cached; the method
    +   * will only cache certain quantiles (every 0.05 step), so it's 
recommended to stick to
    +   * those to avoid expensive scans of all task data.
    +   */
       def taskSummary(
           stageId: Int,
           stageAttemptId: Int,
    -      quantiles: Array[Double]): v1.TaskMetricDistributions = {
    -
    -    val stage = Array(stageId, stageAttemptId)
    -
    -    val rawMetrics = store.view(classOf[TaskDataWrapper])
    -      .index("stage")
    -      .first(stage)
    -      .last(stage)
    -      .asScala
    -      .flatMap(_.info.taskMetrics)
    -      .toList
    -      .view
    -
    -    def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
    -      Distribution(rawMetrics.map { d => f(d) 
}).get.getQuantiles(quantiles)
    -
    -    // We need to do a lot of similar munging to nested metrics here.  For 
each one,
    -    // we want (a) extract the values for nested metrics (b) make a 
distribution for each metric
    -    // (c) shove the distribution into the right field in our return type 
and (d) only return
    -    // a result if the option is defined for any of the tasks.  
MetricHelper is a little util
    -    // to make it a little easier to deal w/ all of the nested options.  
Mostly it lets us just
    -    // implement one "build" method, which just builds the quantiles for 
each field.
    -
    -    val inputMetrics =
    -      new MetricHelper[v1.InputMetrics, 
v1.InputMetricDistributions](rawMetrics, quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = 
raw.inputMetrics
    -
    -        def build: v1.InputMetricDistributions = new 
v1.InputMetricDistributions(
    -          bytesRead = submetricQuantiles(_.bytesRead),
    -          recordsRead = submetricQuantiles(_.recordsRead)
    -        )
    -      }.build
    -
    -    val outputMetrics =
    -      new MetricHelper[v1.OutputMetrics, 
v1.OutputMetricDistributions](rawMetrics, quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = 
raw.outputMetrics
    -
    -        def build: v1.OutputMetricDistributions = new 
v1.OutputMetricDistributions(
    -          bytesWritten = submetricQuantiles(_.bytesWritten),
    -          recordsWritten = submetricQuantiles(_.recordsWritten)
    -        )
    -      }.build
    -
    -    val shuffleReadMetrics =
    -      new MetricHelper[v1.ShuffleReadMetrics, 
v1.ShuffleReadMetricDistributions](rawMetrics,
    -        quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics =
    -          raw.shuffleReadMetrics
    -
    -        def build: v1.ShuffleReadMetricDistributions = new 
v1.ShuffleReadMetricDistributions(
    -          readBytes = submetricQuantiles { s => s.localBytesRead + 
s.remoteBytesRead },
    -          readRecords = submetricQuantiles(_.recordsRead),
    -          remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
    -          remoteBytesReadToDisk = 
submetricQuantiles(_.remoteBytesReadToDisk),
    -          remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
    -          localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
    -          totalBlocksFetched = submetricQuantiles { s =>
    -            s.localBlocksFetched + s.remoteBlocksFetched
    -          },
    -          fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
    -        )
    -      }.build
    -
    -    val shuffleWriteMetrics =
    -      new MetricHelper[v1.ShuffleWriteMetrics, 
v1.ShuffleWriteMetricDistributions](rawMetrics,
    -        quantiles) {
    -        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics =
    -          raw.shuffleWriteMetrics
    -
    -        def build: v1.ShuffleWriteMetricDistributions = new 
v1.ShuffleWriteMetricDistributions(
    -          writeBytes = submetricQuantiles(_.bytesWritten),
    -          writeRecords = submetricQuantiles(_.recordsWritten),
    -          writeTime = submetricQuantiles(_.writeTime)
    -        )
    -      }.build
    -
    -    new v1.TaskMetricDistributions(
    +      unsortedQuantiles: Array[Double]): 
Option[v1.TaskMetricDistributions] = {
    +    val stageKey = Array(stageId, stageAttemptId)
    +    val quantiles = unsortedQuantiles.sorted
    +
    +    // We don't know how many tasks remain in the store that actually have 
metrics. So scan one
    +    // metric and count how many valid tasks there are. Use skip() instead 
of next() since it's
    +    // cheaper for disk stores (avoids deserialization).
    +    val count = {
    +      Utils.tryWithResource(
    +        store.view(classOf[TaskDataWrapper])
    +          .parent(stageKey)
    +          .index(TaskIndexNames.EXEC_RUN_TIME)
    +          .first(0L)
    +          .closeableIterator()
    +      ) { it =>
    +        var _count = 0L
    +        while (it.hasNext()) {
    +          _count += 1
    +          it.skip(1)
    +        }
    +        _count
    +      }
    +    }
    +
    +    if (count <= 0) {
    +      return None
    +    }
    +
    +    // Find out which quantiles are already cached. The data in the store 
must match the expected
    +    // task count to be considered, otherwise it will be re-scanned and 
overwritten.
    +    val cachedQuantiles = quantiles.filter(shouldCacheQuantile).flatMap { 
q =>
    +      val qkey = Array(stageId, stageAttemptId, quantileToString(q))
    +      asOption(store.read(classOf[CachedQuantile], 
qkey)).filter(_.taskCount == count)
    +    }
    +
    +    // If there are no missing quantiles, return the data. Otherwise, just 
compute everything
    +    // to make the code simpler.
    +    if (cachedQuantiles.size == quantiles.size) {
    +      def toValues(fn: CachedQuantile => Double): IndexedSeq[Double] = 
cachedQuantiles.map(fn)
    +
    +      val distributions = new v1.TaskMetricDistributions(
    +        quantiles = quantiles,
    +        executorDeserializeTime = toValues(_.executorDeserializeTime),
    +        executorDeserializeCpuTime = 
toValues(_.executorDeserializeCpuTime),
    +        executorRunTime = toValues(_.executorRunTime),
    +        executorCpuTime = toValues(_.executorCpuTime),
    +        resultSize = toValues(_.resultSize),
    +        jvmGcTime = toValues(_.jvmGcTime),
    +        resultSerializationTime = toValues(_.resultSerializationTime),
    +        gettingResultTime = toValues(_.gettingResultTime),
    +        schedulerDelay = toValues(_.schedulerDelay),
    +        peakExecutionMemory = toValues(_.peakExecutionMemory),
    +        memoryBytesSpilled = toValues(_.memoryBytesSpilled),
    +        diskBytesSpilled = toValues(_.diskBytesSpilled),
    +        inputMetrics = new v1.InputMetricDistributions(
    +          toValues(_.bytesRead),
    +          toValues(_.recordsRead)),
    +        outputMetrics = new v1.OutputMetricDistributions(
    +          toValues(_.bytesWritten),
    +          toValues(_.recordsWritten)),
    +        shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
    +          toValues(_.shuffleReadBytes),
    +          toValues(_.shuffleRecordsRead),
    +          toValues(_.shuffleRemoteBlocksFetched),
    +          toValues(_.shuffleLocalBlocksFetched),
    +          toValues(_.shuffleFetchWaitTime),
    +          toValues(_.shuffleRemoteBytesRead),
    +          toValues(_.shuffleRemoteBytesReadToDisk),
    +          toValues(_.shuffleTotalBlocksFetched)),
    +        shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
    +          toValues(_.shuffleWriteBytes),
    +          toValues(_.shuffleWriteRecords),
    +          toValues(_.shuffleWriteTime)))
    +
    +      return Some(distributions)
    +    }
    +
    +    // Compute quantiles by scanning the tasks in the store. This is not 
really stable for live
    +    // stages (e.g. the number of recorded tasks may change while this 
code is running), but should
    +    // stabilize once the stage finishes. It's also slow, especially with 
disk stores.
    +    val indices = quantiles.map { q => math.min((q * count).toLong, count 
- 1) }
    +
    +    def scanTasks(index: String)(fn: TaskDataWrapper => Long): 
IndexedSeq[Double] = {
    +      Utils.tryWithResource(
    +        store.view(classOf[TaskDataWrapper])
    +          .parent(stageKey)
    +          .index(index)
    +          .first(0L)
    +          .closeableIterator()
    +      ) { it =>
    +        var last = Double.NaN
    +        var currentIdx = -1L
    +        indices.map { idx =>
    +          if (idx == currentIdx) {
    +            last
    +          } else {
    +            val diff = idx - currentIdx
    +            currentIdx = idx
    +            if (it.skip(diff - 1)) {
    +              last = fn(it.next()).toDouble
    +              last
    +            } else {
    +              Double.NaN
    +            }
    +          }
    +        }.toIndexedSeq
    +      }
    +    }
    +
    +    val computedQuantiles = new v1.TaskMetricDistributions(
           quantiles = quantiles,
    -      executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
    -      executorDeserializeCpuTime = 
metricQuantiles(_.executorDeserializeCpuTime),
    -      executorRunTime = metricQuantiles(_.executorRunTime),
    -      executorCpuTime = metricQuantiles(_.executorCpuTime),
    -      resultSize = metricQuantiles(_.resultSize),
    -      jvmGcTime = metricQuantiles(_.jvmGcTime),
    -      resultSerializationTime = metricQuantiles(_.resultSerializationTime),
    -      memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled),
    -      diskBytesSpilled = metricQuantiles(_.diskBytesSpilled),
    -      inputMetrics = inputMetrics,
    -      outputMetrics = outputMetrics,
    -      shuffleReadMetrics = shuffleReadMetrics,
    -      shuffleWriteMetrics = shuffleWriteMetrics
    -    )
    +      executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t =>
    +        t.executorDeserializeTime
    +      },
    +      executorDeserializeCpuTime = 
scanTasks(TaskIndexNames.DESER_CPU_TIME) { t =>
    +        t.executorDeserializeCpuTime
    +      },
    +      executorRunTime = scanTasks(TaskIndexNames.EXEC_RUN_TIME) { t => 
t.executorRunTime },
    +      executorCpuTime = scanTasks(TaskIndexNames.EXEC_CPU_TIME) { t => 
t.executorCpuTime },
    +      resultSize = scanTasks(TaskIndexNames.RESULT_SIZE) { t => 
t.resultSize },
    +      jvmGcTime = scanTasks(TaskIndexNames.GC_TIME) { t => t.jvmGcTime },
    +      resultSerializationTime = scanTasks(TaskIndexNames.SER_TIME) { t =>
    +        t.resultSerializationTime
    +      },
    +      gettingResultTime = scanTasks(TaskIndexNames.GETTING_RESULT_TIME) { 
t =>
    +        t.gettingResultTime
    +      },
    +      schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => 
t.schedulerDelay },
    +      peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => 
t.peakExecutionMemory },
    +      memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => 
t.memoryBytesSpilled },
    +      diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => 
t.diskBytesSpilled },
    +      inputMetrics = new v1.InputMetricDistributions(
    +        scanTasks(TaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead },
    +        scanTasks(TaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead 
}),
    +      outputMetrics = new v1.OutputMetricDistributions(
    +        scanTasks(TaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten 
},
    +        scanTasks(TaskIndexNames.OUTPUT_RECORDS) { t => 
t.outputRecordsWritten }),
    +      shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
    +        scanTasks(TaskIndexNames.SHUFFLE_TOTAL_READS) { m =>
    +          m.shuffleLocalBytesRead + m.shuffleRemoteBytesRead
    +        },
    +        scanTasks(TaskIndexNames.SHUFFLE_READ_RECORDS) { t => 
t.shuffleRecordsRead },
    +        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => 
t.shuffleRemoteBlocksFetched },
    +        scanTasks(TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => 
t.shuffleLocalBlocksFetched },
    +        scanTasks(TaskIndexNames.SHUFFLE_READ_TIME) { t => 
t.shuffleFetchWaitTime },
    +        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS) { t => 
t.shuffleRemoteBytesRead },
    +        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t =>
    +          t.shuffleRemoteBytesReadToDisk
    +        },
    +        scanTasks(TaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m =>
    +          m.shuffleLocalBlocksFetched + m.shuffleRemoteBlocksFetched
    +        }),
    +      shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
    +        scanTasks(TaskIndexNames.SHUFFLE_WRITE_SIZE) { t => 
t.shuffleBytesWritten },
    +        scanTasks(TaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => 
t.shuffleRecordsWritten },
    +        scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => 
t.shuffleWriteTime }))
    +
    +    // Go through the computed quantiles and cache the values that match 
the caching criteria.
    +    computedQuantiles.quantiles.zipWithIndex
    --- End diff --
    
    I think it makes sense to cache them for disk store, but may be an overkill 
for in-memory store.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to