Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10835#discussion_r50757347
  
    --- Diff: core/src/main/scala/org/apache/spark/InternalAccumulator.scala ---
    @@ -17,42 +17,193 @@
     
     package org.apache.spark
     
    +import org.apache.spark.storage.{BlockId, BlockStatus}
     
    -// This is moved to its own file because many more things will be added to 
it in SPARK-10620.
    +
    +/**
    + * A collection of fields and methods concerned with internal accumulators 
that represent
    + * task level metrics.
    + */
     private[spark] object InternalAccumulator {
    -  val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
    -  val TEST_ACCUMULATOR = "testAccumulator"
    -
    -  // For testing only.
    -  // This needs to be a def since we don't want to reuse the same 
accumulator across stages.
    -  private def maybeTestAccumulator: Option[Accumulator[Long]] = {
    -    if (sys.props.contains("spark.testing")) {
    -      Some(new Accumulator(
    -        0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), 
internal = true))
    -    } else {
    -      None
    +
    +  import AccumulatorParam._
    +
    +  // Prefixes used in names of internal task level metrics
    +  val METRICS_PREFIX = "internal.metrics."
    +  val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read."
    +  val SHUFFLE_WRITE_METRICS_PREFIX = METRICS_PREFIX + "shuffle.write."
    +  val OUTPUT_METRICS_PREFIX = METRICS_PREFIX + "output."
    +  val INPUT_METRICS_PREFIX = METRICS_PREFIX + "input."
    +
    +  // Names of internal task level metrics
    +  val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + 
"executorDeserializeTime"
    +  val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime"
    +  val RESULT_SIZE = METRICS_PREFIX + "resultSize"
    +  val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime"
    +  val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + 
"resultSerializationTime"
    +  val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled"
    +  val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
    +  val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
    +  val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
    +  val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"
    +
    +  // scalastyle:off
    +
    +  // Names of shuffle read metrics
    +  object shuffleRead {
    +    val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + 
"remoteBlocksFetched"
    +    val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + 
"localBlocksFetched"
    +    val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead"
    +    val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
    +    val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
    +    val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
    +  }
    +
    +  // Names of shuffle write metrics
    +  object shuffleWrite {
    +    val BYTES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "bytesWritten"
    +    val RECORDS_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "recordsWritten"
    +    val WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "writeTime"
    +  }
    +
    +  // Names of output metrics
    +  object output {
    +    val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod"
    +    val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten"
    +    val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten"
    +  }
    +
    +  // Names of input metrics
    +  object input {
    +    val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod"
    +    val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead"
    +    val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead"
    +  }
    +
    +  // scalastyle:on
    +
    +  /**
    +   * Create an internal [[Accumulator]] by name, which must begin with 
[[METRICS_PREFIX]].
    +   */
    +  def create(name: String): Accumulator[_] = {
    +    assert(name.startsWith(METRICS_PREFIX),
    +      s"internal accumulator name must start with '$METRICS_PREFIX': 
$name")
    +    getParam(name) match {
    +      case p @ LongAccumulatorParam => newMetric[Long](0L, name, p)
    +      case p @ IntAccumulatorParam => newMetric[Int](0, name, p)
    +      case p @ StringAccumulatorParam => newMetric[String]("", name, p)
    +      case p @ UpdatedBlockStatusesAccumulatorParam =>
    +        newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p)
    +      case p => throw new IllegalArgumentException(
    +        s"unsupported accumulator param '${p.getClass.getSimpleName}' for 
metric '$name'.")
    +    }
    +  }
    +
    +  /**
    +   * Get the [[AccumulatorParam]] associated with the internal metric name,
    +   * which must begin with [[METRICS_PREFIX]].
    +   */
    +  def getParam(name: String): AccumulatorParam[_] = {
    +    assert(name.startsWith(METRICS_PREFIX),
    +      s"internal accumulator name must start with '$METRICS_PREFIX': 
$name")
    +    name match {
    +      case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
    +      case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
    +      case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
    +      case input.READ_METHOD => StringAccumulatorParam
    +      case output.WRITE_METHOD => StringAccumulatorParam
    +      case _ => LongAccumulatorParam
         }
       }
     
       /**
        * Accumulators for tracking internal metrics.
    +   */
    +  def create(): Seq[Accumulator[_]] = {
    +    Seq[String](
    +      EXECUTOR_DESERIALIZE_TIME,
    +      EXECUTOR_RUN_TIME,
    +      RESULT_SIZE,
    +      JVM_GC_TIME,
    +      RESULT_SERIALIZATION_TIME,
    +      MEMORY_BYTES_SPILLED,
    +      DISK_BYTES_SPILLED,
    +      PEAK_EXECUTION_MEMORY,
    +      UPDATED_BLOCK_STATUSES).map(create) ++
    +      createShuffleReadAccums() ++
    +      createShuffleWriteAccums() ++
    +      createInputAccums() ++
    +      createOutputAccums() ++
    +      sys.props.get("spark.testing").map(_ => create(TEST_ACCUM)).toSeq
    +  }
    +
    +  /**
    +   * Accumulators for tracking shuffle read metrics.
    +   */
    +  def createShuffleReadAccums(): Seq[Accumulator[_]] = {
    +    Seq[String](
    +      shuffleRead.REMOTE_BLOCKS_FETCHED,
    +      shuffleRead.LOCAL_BLOCKS_FETCHED,
    +      shuffleRead.REMOTE_BYTES_READ,
    +      shuffleRead.LOCAL_BYTES_READ,
    +      shuffleRead.FETCH_WAIT_TIME,
    +      shuffleRead.RECORDS_READ).map(create)
    +  }
    +
    +  /**
    +   * Accumulators for tracking shuffle write metrics.
    +   */
    +  def createShuffleWriteAccums(): Seq[Accumulator[_]] = {
    +    Seq[String](
    +      shuffleWrite.BYTES_WRITTEN,
    +      shuffleWrite.RECORDS_WRITTEN,
    +      shuffleWrite.WRITE_TIME).map(create)
    +  }
    +
    +  /**
    +   * Accumulators for tracking input metrics.
    +   */
    +  def createInputAccums(): Seq[Accumulator[_]] = {
    +    Seq[String](
    +      input.READ_METHOD,
    +      input.BYTES_READ,
    +      input.RECORDS_READ).map(create)
    +  }
    +
    +  /**
    +   * Accumulators for tracking output metrics.
    +   */
    +  def createOutputAccums(): Seq[Accumulator[_]] = {
    +    Seq[String](
    +      output.WRITE_METHOD,
    +      output.BYTES_WRITTEN,
    +      output.RECORDS_WRITTEN).map(create)
    +  }
    +
    +  /**
    +   * Accumulators for tracking internal metrics.
        *
        * These accumulators are created with the stage such that all tasks in 
the stage will
        * add to the same set of accumulators. We do this to report the 
distribution of accumulator
        * values across all tasks within each stage.
        */
    -  def create(sc: SparkContext): Seq[Accumulator[Long]] = {
    -    val internalAccumulators = Seq(
    -      // Execution memory refers to the memory used by internal data 
structures created
    -      // during shuffles, aggregations and joins. The value of this 
accumulator should be
    -      // approximately the sum of the peak sizes across all such data 
structures created
    -      // in this task. For SQL jobs, this only tracks all unsafe operators 
and ExternalSort.
    -      new Accumulator(
    -        0L, AccumulatorParam.LongAccumulatorParam, 
Some(PEAK_EXECUTION_MEMORY), internal = true)
    -    ) ++ maybeTestAccumulator.toSeq
    -    internalAccumulators.foreach { accumulator =>
    -      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
    +  def create(sc: SparkContext): Seq[Accumulator[_]] = {
    +    val accums = create()
    +    accums.foreach { accum =>
    +      Accumulators.register(accum)
    +      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
    --- End diff --
    
    also, `sc.cleaner` can be disabled, so we need to do a check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to