Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/10835#discussion_r50802781 --- Diff: core/src/main/scala/org/apache/spark/InternalAccumulator.scala --- @@ -17,42 +17,193 @@ package org.apache.spark +import org.apache.spark.storage.{BlockId, BlockStatus} -// This is moved to its own file because many more things will be added to it in SPARK-10620. + +/** + * A collection of fields and methods concerned with internal accumulators that represent + * task level metrics. + */ private[spark] object InternalAccumulator { - val PEAK_EXECUTION_MEMORY = "peakExecutionMemory" - val TEST_ACCUMULATOR = "testAccumulator" - - // For testing only. - // This needs to be a def since we don't want to reuse the same accumulator across stages. - private def maybeTestAccumulator: Option[Accumulator[Long]] = { - if (sys.props.contains("spark.testing")) { - Some(new Accumulator( - 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true)) - } else { - None + + import AccumulatorParam._ + + // Prefixes used in names of internal task level metrics + val METRICS_PREFIX = "internal.metrics." + val SHUFFLE_READ_METRICS_PREFIX = METRICS_PREFIX + "shuffle.read." + val SHUFFLE_WRITE_METRICS_PREFIX = METRICS_PREFIX + "shuffle.write." + val OUTPUT_METRICS_PREFIX = METRICS_PREFIX + "output." + val INPUT_METRICS_PREFIX = METRICS_PREFIX + "input." + + // Names of internal task level metrics + val EXECUTOR_DESERIALIZE_TIME = METRICS_PREFIX + "executorDeserializeTime" + val EXECUTOR_RUN_TIME = METRICS_PREFIX + "executorRunTime" + val RESULT_SIZE = METRICS_PREFIX + "resultSize" + val JVM_GC_TIME = METRICS_PREFIX + "jvmGCTime" + val RESULT_SERIALIZATION_TIME = METRICS_PREFIX + "resultSerializationTime" + val MEMORY_BYTES_SPILLED = METRICS_PREFIX + "memoryBytesSpilled" + val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled" + val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory" + val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses" + val TEST_ACCUM = METRICS_PREFIX + "testAccumulator" + + // scalastyle:off + + // Names of shuffle read metrics + object shuffleRead { + val REMOTE_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "remoteBlocksFetched" + val LOCAL_BLOCKS_FETCHED = SHUFFLE_READ_METRICS_PREFIX + "localBlocksFetched" + val REMOTE_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "remoteBytesRead" + val LOCAL_BYTES_READ = SHUFFLE_READ_METRICS_PREFIX + "localBytesRead" + val FETCH_WAIT_TIME = SHUFFLE_READ_METRICS_PREFIX + "fetchWaitTime" + val RECORDS_READ = SHUFFLE_READ_METRICS_PREFIX + "recordsRead" + } + + // Names of shuffle write metrics + object shuffleWrite { + val BYTES_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "bytesWritten" + val RECORDS_WRITTEN = SHUFFLE_WRITE_METRICS_PREFIX + "recordsWritten" + val WRITE_TIME = SHUFFLE_WRITE_METRICS_PREFIX + "writeTime" + } + + // Names of output metrics + object output { + val WRITE_METHOD = OUTPUT_METRICS_PREFIX + "writeMethod" + val BYTES_WRITTEN = OUTPUT_METRICS_PREFIX + "bytesWritten" + val RECORDS_WRITTEN = OUTPUT_METRICS_PREFIX + "recordsWritten" + } + + // Names of input metrics + object input { + val READ_METHOD = INPUT_METRICS_PREFIX + "readMethod" + val BYTES_READ = INPUT_METRICS_PREFIX + "bytesRead" + val RECORDS_READ = INPUT_METRICS_PREFIX + "recordsRead" + } + + // scalastyle:on + + /** + * Create an internal [[Accumulator]] by name, which must begin with [[METRICS_PREFIX]]. + */ + def create(name: String): Accumulator[_] = { + assert(name.startsWith(METRICS_PREFIX), + s"internal accumulator name must start with '$METRICS_PREFIX': $name") + getParam(name) match { + case p @ LongAccumulatorParam => newMetric[Long](0L, name, p) + case p @ IntAccumulatorParam => newMetric[Int](0, name, p) + case p @ StringAccumulatorParam => newMetric[String]("", name, p) + case p @ UpdatedBlockStatusesAccumulatorParam => + newMetric[Seq[(BlockId, BlockStatus)]](Seq(), name, p) + case p => throw new IllegalArgumentException( + s"unsupported accumulator param '${p.getClass.getSimpleName}' for metric '$name'.") + } + } + + /** + * Get the [[AccumulatorParam]] associated with the internal metric name, + * which must begin with [[METRICS_PREFIX]]. + */ + def getParam(name: String): AccumulatorParam[_] = { + assert(name.startsWith(METRICS_PREFIX), + s"internal accumulator name must start with '$METRICS_PREFIX': $name") + name match { + case UPDATED_BLOCK_STATUSES => UpdatedBlockStatusesAccumulatorParam + case shuffleRead.LOCAL_BLOCKS_FETCHED => IntAccumulatorParam + case shuffleRead.REMOTE_BLOCKS_FETCHED => IntAccumulatorParam + case input.READ_METHOD => StringAccumulatorParam + case output.WRITE_METHOD => StringAccumulatorParam + case _ => LongAccumulatorParam } } /** * Accumulators for tracking internal metrics. + */ + def create(): Seq[Accumulator[_]] = { --- End diff -- I'd also be cool with renaming this from `create` to `createAll` or `createAccumulators` or something plural, since right now I suppose that `InternalAccumulator.create()` might be mistook for a factory method which returns a single instance of `InternalAccumulator`. This is a really minor nit, though.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org