Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10835#discussion_r51062164
  
    --- Diff: core/src/main/scala/org/apache/spark/InternalAccumulator.scala ---
    @@ -17,42 +17,193 @@
     
     package org.apache.spark
     
    +import org.apache.spark.storage.{BlockId, BlockStatus}
     
    -// This is moved to its own file because many more things will be added to 
it in SPARK-10620.
    +
    +/**
    + * A collection of fields and methods concerned with internal accumulators 
that represent
    + * task level metrics.
    + */
     private[spark] object InternalAccumulator {
    -  val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
    -  val TEST_ACCUMULATOR = "testAccumulator"
    -
    -  // For testing only.
    -  // This needs to be a def since we don't want to reuse the same 
accumulator across stages.
    -  private def maybeTestAccumulator: Option[Accumulator[Long]] = {
    -    if (sys.props.contains("spark.testing")) {
    -      Some(new Accumulator(
    -        0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), 
internal = true))
    -    } else {
    -      None
    +
    +  import AccumulatorParam._
    +
    +  // Prefixes used in names of internal task level metrics
    +  val METRICS_PREFIX = "internal.metrics."
    +  val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read."
    +  val SHUFFLE_WRITE_METRICS_PREFIX = METRICS_PREFIX + "shuffle.write."
    +  val OUTPUT_METRICS_PREFIX = METRICS_PREFIX + "output."
    +  val INPUT_METRICS_PREFIX = METRICS_PREFIX + "input."
    +
    +  // Names of internal task level metrics
    +  val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + 
"executorDeserializeTime"
    +  val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime"
    +  val RESULT_SIZE = METRICS_PREFIX + "resultSize"
    +  val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime"
    +  val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + 
"resultSerializationTime"
    +  val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled"
    +  val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
    +  val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
    +  val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
    +  val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"
    +
    +  // scalastyle:off
    +
    +  // Names of shuffle read metrics
    +  object shuffleRead {
    +    val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + 
"remoteBlocksFetched"
    +    val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + 
"localBlocksFetched"
    +    val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead"
    +    val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead"
    +    val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime"
    +    val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead"
    +  }
    +
    +  // Names of shuffle write metrics
    +  object shuffleWrite {
    +    val BYTES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "bytesWritten"
    +    val RECORDS_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "recordsWritten"
    +    val WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "writeTime"
    +  }
    +
    +  // Names of output metrics
    +  object output {
    +    val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod"
    +    val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten"
    +    val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten"
    +  }
    +
    +  // Names of input metrics
    +  object input {
    +    val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod"
    +    val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead"
    +    val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead"
    +  }
    +
    +  // scalastyle:on
    +
    +  /**
    +   * Create an internal [[Accumulator]] by name, which must begin with 
[[METRICS_PREFIX]].
    +   */
    +  def create(name: String): Accumulator[_] = {
    +    assert(name.startsWith(METRICS_PREFIX),
    +      s"internal accumulator name must start with '$METRICS_PREFIX': 
$name")
    +    getParam(name) match {
    +      case p @ LongAccumulatorParam => newMetric[Long](0L, name, p)
    +      case p @ IntAccumulatorParam => newMetric[Int](0, name, p)
    +      case p @ StringAccumulatorParam => newMetric[String]("", name, p)
    +      case p @ UpdatedBlockStatusesAccumulatorParam =>
    +        newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p)
    +      case p => throw new IllegalArgumentException(
    +        s"unsupported accumulator param '${p.getClass.getSimpleName}' for 
metric '$name'.")
    +    }
    +  }
    +
    +  /**
    +   * Get the [[AccumulatorParam]] associated with the internal metric name,
    +   * which must begin with [[METRICS_PREFIX]].
    +   */
    +  def getParam(name: String): AccumulatorParam[_] = {
    +    assert(name.startsWith(METRICS_PREFIX),
    +      s"internal accumulator name must start with '$METRICS_PREFIX': 
$name")
    +    name match {
    +      case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam
    +      case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam
    +      case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam
    +      case input.READ_METHOD => StringAccumulatorParam
    +      case output.WRITE_METHOD => StringAccumulatorParam
    +      case _ => LongAccumulatorParam
         }
       }
     
       /**
        * Accumulators for tracking internal metrics.
    +   */
    +  def create(): Seq[Accumulator[_]] = {
    --- End diff --
    
    oops looks like I missed this. I will address it in 
https://github.com/apache/spark/pull/10958


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to