[ https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-14654: ------------------------------------ Assignee: (was: Apache Spark) > New accumulator API > ------------------- > > Key: SPARK-14654 > URL: https://issues.apache.org/jira/browse/SPARK-14654 > Project: Spark > Issue Type: Sub-task > Components: SQL > Reporter: Reynold Xin > > The current accumulator API has a few problems: > 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, > AccumulatorParam, AccumulableParam, etc. > 2. The intermediate buffer type must be the same as the output type, so there > is no way to define an accumulator that computes averages. > 3. It is very difficult to specialize the methods, leading to excessive > boxing and making accumulators bad for metrics that change for each record. > 4. There is not a single coherent API that works for both Java and Scala. > This is a proposed new API that addresses all of the above. In this new API: > 1. There is only a single class (Accumulator) that is user facing > 2. The intermediate value is stored in the accumulator itself and can be > different from the output type. > 3. Concrete implementations can provide its own specialized methods. > 4. Designed to work for both Java and Scala. > {code} > abstract class Accumulator[IN, OUT] extends Serializable { > def isRegistered: Boolean = ... > def register(metadata: AccumulatorMetadata): Unit = ... > def metadata: AccumulatorMetadata = ... > def reset(): Unit > def add(v: IN): Unit > def merge(other: Accumulator[IN, OUT]): Unit > def value: OUT > def localValue: OUT = value > final def registerAccumulatorOnExecutor(): Unit = { > // Automatically register the accumulator when it is deserialized with > the task closure. > // This is for external accumulators and internal ones that do not > represent task level > // metrics, e.g. internal SQL metrics, which are per-operator. > val taskContext = TaskContext.get() > if (taskContext != null) { > taskContext.registerAccumulator(this) > } > } > // Called by Java when deserializing an object > private def readObject(in: ObjectInputStream): Unit = > Utils.tryOrIOException { > in.defaultReadObject() > registerAccumulator() > } > } > {code} > Metadata, provided by Spark after registration: > {code} > class AccumulatorMetadata( > val id: Long, > val name: Option[String], > val countFailedValues: Boolean > ) extends Serializable > {code} > and an implementation that also offers specialized getters and setters > {code} > class LongAccumulator extends Accumulator[jl.Long, jl.Long] { > private[this] var _sum = 0L > override def reset(): Unit = _sum = 0L > override def add(v: jl.Long): Unit = { > _sum += v > } > override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other > match { > case o: LongAccumulator => _sum += o.sum > case _ => throw new UnsupportedOperationException( > s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") > } > override def value: jl.Long = _sum > def sum: Long = _sum > } > {code} > and SparkContext... > {code} > class SparkContext { > ... > def newLongAccumulator(): LongAccumulator > def newLongAccumulator(name: Long): LongAccumulator > def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator > def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): > Accumulator[IN, OUT] > ... > } > {code} > To use it ... > {code} > val acc = sc.newLongAccumulator() > sc.parallelize(1 to 1000).map { i => > acc.add(1) > i > } > {code} > A work-in-progress prototype here: > https://github.com/rxin/spark/tree/accumulator-refactor -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org