Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12612#discussion_r61370129
  
    --- Diff: core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
---
    @@ -175,124 +172,143 @@ class TaskMetrics private[spark] () extends 
Serializable {
       }
     
       // Only used for test
    -  private[spark] val testAccum =
    -    sys.props.get("spark.testing").map(_ => 
TaskMetrics.createLongAccum(TEST_ACCUM))
    -
    -  @transient private[spark] lazy val internalAccums: Seq[Accumulable[_, 
_]] = {
    -    val in = inputMetrics
    -    val out = outputMetrics
    -    val sr = shuffleReadMetrics
    -    val sw = shuffleWriteMetrics
    -    Seq(_executorDeserializeTime, _executorRunTime, _resultSize, 
_jvmGCTime,
    -      _resultSerializationTime, _memoryBytesSpilled, _diskBytesSpilled, 
_peakExecutionMemory,
    -      _updatedBlockStatuses, sr._remoteBlocksFetched, 
sr._localBlocksFetched, sr._remoteBytesRead,
    -      sr._localBytesRead, sr._fetchWaitTime, sr._recordsRead, 
sw._bytesWritten, sw._recordsWritten,
    -      sw._writeTime, in._bytesRead, in._recordsRead, out._bytesWritten, 
out._recordsWritten) ++
    -      testAccum
    -  }
    +  private[spark] val testAccum = sys.props.get("spark.testing").map(_ => 
new LongAccumulator)
    +
    +
    +  import InternalAccumulator._
    +  @transient private[spark] lazy val nameToAccums = LinkedHashMap(
    +    EXECUTOR_DESERIALIZE_TIME -> _executorDeserializeTime,
    +    EXECUTOR_RUN_TIME -> _executorRunTime,
    +    RESULT_SIZE -> _resultSize,
    +    JVM_GC_TIME -> _jvmGCTime,
    +    RESULT_SERIALIZATION_TIME -> _resultSerializationTime,
    +    MEMORY_BYTES_SPILLED -> _memoryBytesSpilled,
    +    DISK_BYTES_SPILLED -> _diskBytesSpilled,
    +    PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
    +    UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
    +    shuffleRead.REMOTE_BLOCKS_FETCHED -> 
shuffleReadMetrics._remoteBlocksFetched,
    +    shuffleRead.LOCAL_BLOCKS_FETCHED -> 
shuffleReadMetrics._localBlocksFetched,
    +    shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
    +    shuffleRead.LOCAL_BYTES_READ -> shuffleReadMetrics._localBytesRead,
    +    shuffleRead.FETCH_WAIT_TIME -> shuffleReadMetrics._fetchWaitTime,
    +    shuffleRead.RECORDS_READ -> shuffleReadMetrics._recordsRead,
    +    shuffleWrite.BYTES_WRITTEN -> shuffleWriteMetrics._bytesWritten,
    +    shuffleWrite.RECORDS_WRITTEN -> shuffleWriteMetrics._recordsWritten,
    +    shuffleWrite.WRITE_TIME -> shuffleWriteMetrics._writeTime,
    +    input.BYTES_READ -> inputMetrics._bytesRead,
    +    input.RECORDS_READ -> inputMetrics._recordsRead,
    +    output.BYTES_WRITTEN -> outputMetrics._bytesWritten,
    +    output.RECORDS_WRITTEN -> outputMetrics._recordsWritten
    +  ) ++ testAccum.map(TEST_ACCUM -> _)
    +
    +  @transient private[spark] lazy val internalAccums: Seq[NewAccumulator[_, 
_]] =
    +    nameToAccums.values.toIndexedSeq
     
       /* ========================== *
        |        OTHER THINGS        |
        * ========================== */
     
    -  private[spark] def registerForCleanup(sc: SparkContext): Unit = {
    -    internalAccums.foreach { accum =>
    -      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accum))
    +  private[spark] def register(sc: SparkContext): Unit = {
    +    nameToAccums.foreach {
    +      case (name, acc) => acc.register(sc, name = Some(name), 
countFailedValues = true)
         }
       }
     
       /**
        * External accumulators registered with this task.
        */
    -  @transient private lazy val externalAccums = new 
ArrayBuffer[Accumulable[_, _]]
    +  @transient private lazy val externalAccums = new 
ArrayBuffer[NewAccumulator[_, _]]
     
    -  private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
    +  private[spark] def registerAccumulator(a: NewAccumulator[_, _]): Unit = {
         externalAccums += a
       }
     
    -  /**
    -   * Return the latest updates of accumulators in this task.
    -   *
    -   * The [[AccumulableInfo.update]] field is always defined and the 
[[AccumulableInfo.value]]
    -   * field is always empty, since this represents the partial updates 
recorded in this task,
    -   * not the aggregated value across multiple tasks.
    -   */
    -  def accumulatorUpdates(): Seq[AccumulableInfo] = {
    -    (internalAccums ++ externalAccums).map { a => 
a.toInfo(Some(a.localValue), None) }
    -  }
    +  private[spark] def accumulators(): Seq[NewAccumulator[_, _]] = 
internalAccums ++ externalAccums
     }
     
    -/**
    - * Internal subclass of [[TaskMetrics]] which is used only for posting 
events to listeners.
    - * Its purpose is to obviate the need for the driver to reconstruct the 
original accumulators,
    - * which might have been garbage-collected. See SPARK-13407 for more 
details.
    - *
    - * Instances of this class should be considered read-only and users should 
not call `inc*()` or
    - * `set*()` methods. While we could override the setter methods to throw
    - * UnsupportedOperationException, we choose not to do so because the 
overrides would quickly become
    - * out-of-date when new metrics are added.
    - */
    -private[spark] class ListenerTaskMetrics(accumUpdates: 
Seq[AccumulableInfo]) extends TaskMetrics {
    -
    -  override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates
    -
    -  override private[spark] def registerAccumulator(a: Accumulable[_, _]): 
Unit = {
    -    throw new UnsupportedOperationException("This TaskMetrics is 
read-only")
    -  }
    -}
     
     private[spark] object TaskMetrics extends Logging {
    +  import InternalAccumulator._
     
       /**
        * Create an empty task metrics that doesn't register its accumulators.
        */
       def empty: TaskMetrics = {
    -    val metrics = new TaskMetrics
    -    metrics.internalAccums.foreach(acc => Accumulators.remove(acc.id))
    -    metrics
    +    val tm = new TaskMetrics
    +    tm.nameToAccums.foreach { case (name, acc) =>
    +      acc.metadata = AccumulatorMetadata(AccumulatorContext.newId(), 
Some(name), true)
    +    }
    +    tm
    +  }
    +
    +  def registered: TaskMetrics = {
    +    val tm = empty
    +    tm.internalAccums.foreach(AccumulatorContext.register)
    +    tm
       }
     
       /**
    -   * Create a new accumulator representing an internal task metric.
    +   * Construct a [[TaskMetrics]] object from a list of 
[[AccumulableInfo]], called on driver only.
    +   * The returned [[TaskMetrics]] is only used to get some internal 
metrics, we don't need to take
    +   * care of external accumulator info passed in.
        */
    -  private def newMetric[T](
    -      initialValue: T,
    -      name: String,
    -      param: AccumulatorParam[T]): Accumulator[T] = {
    -    new Accumulator[T](initialValue, param, Some(name), countFailedValues 
= true)
    +  def fromAccumulatorInfos(infos: Seq[AccumulableInfo]): TaskMetrics = {
    +    val tm = new TaskMetrics
    +    infos.filter(info => info.name.isDefined && 
info.update.isDefined).foreach { info =>
    +      val name = info.name.get
    +      val value = info.update.get
    +      if (name == UPDATED_BLOCK_STATUSES) {
    +        tm.setUpdatedBlockStatuses(value.asInstanceOf[Seq[(BlockId, 
BlockStatus)]])
    +      } else {
    +        tm.nameToAccums.get(name).foreach(
    +          
_.asInstanceOf[LongAccumulator].setValue(value.asInstanceOf[Long])
    +        )
    +      }
    +    }
    +    tm
       }
     
    -  def createLongAccum(name: String): Accumulator[Long] = {
    -    newMetric(0L, name, LongAccumulatorParam)
    -  }
    +  /**
    +   * Construct a [[TaskMetrics]] object from a list of accumulator 
updates, called on driver only.
    +   */
    +  def fromAccumulators(accums: Seq[NewAccumulator[_, _]]): TaskMetrics = {
    +    val tm = new TaskMetrics
    +    val (internalAccums, externalAccums) =
    +      accums.partition(a => a.name.isDefined && 
tm.nameToAccums.contains(a.name.get))
    +
    +    internalAccums.foreach { acc =>
    +      val tmAcc = 
tm.nameToAccums(acc.name.get).asInstanceOf[NewAccumulator[Any, Any]]
    +      tmAcc.metadata = acc.metadata
    +      tmAcc.merge(acc.asInstanceOf[NewAccumulator[Any, Any]])
    +    }
     
    -  def createIntAccum(name: String): Accumulator[Int] = {
    -    newMetric(0, name, IntAccumulatorParam)
    +    tm.externalAccums ++= externalAccums
    +    tm
       }
    +}
    +
    +
    +private[spark] class BlockStatusesAccumulator
    --- End diff --
    
    this is just the ListAccumulator?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to