[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15255312#comment-15255312
 ] 

Reynold Xin edited comment on SPARK-14654 at 4/23/16 6:02 PM:
--------------------------------------------------------------

I don't get what you are trying to accomplish. It seems like you enjoy the 
cuteness of reflection. With your proposal:

1. Specialization won't work, which is a big part of this new API.

2. It is less obvious what the return types should be.

3. It is strictly less type safe, and app developers won't know what the 
accepted input types are.

4. It is unclear what the semantics is when "1" is passed in as initial value 
rather than "0".

5. We would need to implement all the primitive types, which I don't think make 
sense. In my thing only double and long are implemented. I don't see why we 
should implement all the primitive types. Why have a "byte" accumulator when 
the long one captures almost all the use cases? How often would having a 
"Boolean" accumulator make sense?

You are keeping almost all the issues with the existing API. And users would 
know if they want an avg or a long in the new one, because they have different 
functions.




was (Author: rxin):
I don't get what you are trying to accomplish. It seems like you enjoy the 
cuteness of reflection. With your proposal:

1. Specialization won't work, which is a big part of this new API.

2. It is less obvious what the return types should be.

3. It is strictly less type safe, and app developers won't know what the 
accepted input types are.

4. It is unclear what the semantics is when "1" is passed in as initial value 
rather than "0".

5. We would need to implement all the primitive types, which I don't think make 
sense. In my thing only double and long are implemented. I don't see why we 
should implement all the primitive types. Why have a "byte" accumulator when 
the long one captures almost all the use cases? How often would having a 
"Boolean" accumulator make sense?

You are keeping almost all the issues with the existing API. And you would know 
if you want an avg or a long in the new one, because they have different 
functions.



> New accumulator API
> -------------------
>
>                 Key: SPARK-14654
>                 URL: https://issues.apache.org/jira/browse/SPARK-14654
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
>     // Automatically register the accumulator when it is deserialized with 
> the task closure.
>     // This is for external accumulators and internal ones that do not 
> represent task level
>     // metrics, e.g. internal SQL metrics, which are per-operator.
>     val taskContext = TaskContext.get()
>     if (taskContext != null) {
>       taskContext.registerAccumulator(this)
>     }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
>     in.defaultReadObject()
>     registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
>     _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
>     case o: LongAccumulator => _sum += o.sum
>     case _ => throw new UnsupportedOperationException(
>       s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to