Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12612#discussion_r60852232
  
    --- Diff: core/src/main/scala/org/apache/spark/Accumulator.scala ---
    @@ -17,121 +17,281 @@
     
     package org.apache.spark
     
    +import java.{lang => jl}
    +import java.io.{ObjectInputStream, ObjectOutputStream, Serializable}
     import java.util.concurrent.atomic.AtomicLong
     import javax.annotation.concurrent.GuardedBy
     
    -import scala.collection.mutable
    -import scala.ref.WeakReference
    +import scala.collection.generic.Growable
    +import scala.reflect.ClassTag
     
    -import org.apache.spark.internal.Logging
    -import org.apache.spark.storage.{BlockId, BlockStatus}
    +import org.apache.spark.scheduler.AccumulableInfo
    +import org.apache.spark.serializer.JavaSerializer
    +import org.apache.spark.util.Utils
     
     
    -/**
    - * A simpler value of [[Accumulable]] where the result type being 
accumulated is the same
    - * as the types of elements being merged, i.e. variables that are only 
"added" to through an
    - * associative and commutative operation and can therefore be efficiently 
supported in parallel.
    - * They can be used to implement counters (as in MapReduce) or sums. Spark 
natively supports
    - * accumulators of numeric value types, and programmers can add support 
for new types.
    - *
    - * An accumulator is created from an initial value `v` by calling 
[[SparkContext#accumulator]].
    - * Tasks running on the cluster can then add to it using the 
[[Accumulable#+=]] operator.
    - * However, they cannot read its value. Only the driver program can read 
the accumulator's value,
    - * using its value method.
    - *
    - * The interpreter session below shows an accumulator being used to add up 
the elements of an array:
    - *
    - * {{{
    - * scala> val accum = sc.accumulator(0)
    - * accum: spark.Accumulator[Int] = 0
    - *
    - * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
    - * ...
    - * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    - *
    - * scala> accum.value
    - * res2: Int = 10
    - * }}}
    - *
    - * @param initialValue initial value of accumulator
    - * @param param helper object defining how to add elements of type `T`
    - * @param name human-readable name associated with this accumulator
    - * @param countFailedValues whether to accumulate values from failed tasks
    - * @tparam T result type
    - */
    -class Accumulator[T] private[spark] (
    -    // SI-8813: This must explicitly be a private val, or else scala 2.11 
doesn't compile
    -    @transient private val initialValue: T,
    -    param: AccumulatorParam[T],
    -    name: Option[String] = None,
    -    countFailedValues: Boolean = false)
    -  extends Accumulable[T, T](initialValue, param, name, countFailedValues)
    -
    -
    -// TODO: The multi-thread support in accumulators is kind of lame; check
    -// if there's a more intuitive way of doing it right
    -private[spark] object Accumulators extends Logging {
    +abstract class Accumulator[IN, OUT](
    +    val name: Option[String],
    +    private[spark] val countFailedValues: Boolean) extends Serializable {
    +  private[spark] val id = AccumulatorContext.newId()
    +  private[this] var atDriverSide = true
    +
    +  private[spark] def register(sc: SparkContext): Unit = {
    +    if (isRegistered) {
    +      throw new UnsupportedOperationException("Cannot register an 
Accumulator twice.")
    +    }
    +    AccumulatorContext.register(this)
    +    sc.cleaner.foreach(_.registerAccumulatorForCleanup(this))
    +  }
    +
    +  final def isRegistered: Boolean = 
AccumulatorContext.originals.containsKey(id)
    +
    +  def initialize(): Unit = {}
    +
    +  def add(v: IN): Unit
    +
    +  def +=(v: IN): Unit = add(v)
    +
    +  def merge(other: OUT): Unit
    --- End diff --
    
    `DAGScheduler` will collect the accumulator output from executors and 
aggregate them, so we need the `merge` method to operate on `OUT` directly.
    
    Actually this implies that we have to make the intermediate type same with 
output type, e.g. average accumulator can't implement `merge`.
    
    One way to fix it is: we should send around the intermedia value, not the 
final output, between executors and driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to