[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-24 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255840#comment-15255840
 ] 

holdenk commented on SPARK-14654:
-

Ah yes, in DAGScheduler right now we cast it to [Any, Any] when we get it from 
the map so I suppose the type safety would be lost and we just hide the cast 
from the developer which probably isn't worth it.

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-24 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255820#comment-15255820
 ] 

Reynold Xin commented on SPARK-14654:
-

But Holden the call sites in Spark are already erasing the types, because the 
accumulators are put into a collection.


> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-24 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255819#comment-15255819
 ] 

holdenk commented on SPARK-14654:
-

Even if merge isn't supposed to be called by the end user it would be nice to 
have it be type safe - we also replace a conditional with reflection or cast we 
expect the developers to remember with a type parameter. By adding this I'd 
argue we reduce complexity/cost of the API.

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255363#comment-15255363
 ] 

Reynold Xin commented on SPARK-14654:
-

I see. The merge function isn't supposed to be called by the end user. 

Adding type parameters are not free -- actually everything we add is not free. 
We need to consider how much gain it brings. In this case I think the gain is 
minimal, if any.


> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255359#comment-15255359
 ] 

holdenk commented on SPARK-14654:
-

So ACC isn't the internal buffer type, rather its the type of the Accumulator. 
This just replaces the runtime exception of someone trying to merge two 
incompatible Accumulators with a compile time check. 

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255357#comment-15255357
 ] 

Reynold Xin commented on SPARK-14654:
-

No it can't be private spark because it needs to be implemented. I also don't 
see why we'd need to expose the internal buffer type, since it is strictly an 
implementation detail of the accumulators.

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255356#comment-15255356
 ] 

holdenk commented on SPARK-14654:
-

Since we're talking about average accumulators anyways - what would the return 
type of a Long average accumulator's value function be?

Also would it maybe make sense to have the merge function be 
{code}private[spark]{code}? And or maybe change {code}abstract class 
Accumulator[IN, OUT] extends Serializable {{code} to {code}abstract class 
Accumulator[IN, ACC, OUT] extends Serializable {{code} and have merge take ACC 
e.g. {code}def merge(other: ACC): Unit{code} and then do 
{code}class LongAccumulator extends Accumulator[jl.Long, LongAccumulator, 
jl.Long] {{code}

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255355#comment-15255355
 ] 

holdenk commented on SPARK-14654:
-

If were only going to do Long and Double for the easy creation on the 
SparkContext then I can certainly see why it wouldn't be worth the headaches of 
using reflection to avoid the duplicate boiler plate code between types. I 
didn't intend to suggest that the only way to create the accumulators would be 
through the reflection based API, just in place of the individual convenience 
functions on the SparkContext (would still have the ability to construct custom 
Accumulators and register them with registerAccumulator).

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255312#comment-15255312
 ] 

Reynold Xin commented on SPARK-14654:
-

I don't get what you are trying to accomplish. It seems like you enjoy the 
cuteness of reflection. With your proposal:

1. Specialization won't work, which is a big part of this new API.

2. It is less obvious what the return types should be.

3. It is strictly less type safe, and app developers won't know what the 
accepted input types are.

4. It is unclear what the semantics is when "1" is passed in as initial value 
rather than "0".

5. We would need to implement all the primitive types, which I don't think make 
sense. In my thing only double and long are implemented. I don't see why we 
should implement all the primitive types. Why have a "byte" accumulator when 
the long one captures almost all the use cases? How often would having a 
"Boolean" accumulator make sense?

You are keeping almost all the issues with the existing API.



> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255291#comment-15255291
 ] 

holdenk commented on SPARK-14654:
-

You wouldn't know if you want a counter or overage, but the same applies to the 
function `newLongAccumulator` so you give them a counter and if they want a 
different combiner function they subclass the `Accumulator` to implement that 
(or if we wanted to offer average easily you could add a flag or a different 
call of newAverageAccumulator and have standard implementations for average for 
the built in classes). If the users passes a 1 it means the accumulator starts 
with a value of 1. To me it just feels a little clunky have newXAccumulator 
\forall X in {default supported types} - but it is clearer at compile time so I 
can see why it might be a better fit.

If we do end up adding a lot of newXAccumulator to the API I think we should 
consider either grouping them in the API docs or moving them to a separate 
class.

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255169#comment-15255169
 ] 

Reynold Xin commented on SPARK-14654:
-

How do you know for double whether we'd want a counter or an average? Also 
specifying initial value really makes no sense. What if the user passes in a 1? 
What does that mean?


> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-23 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255163#comment-15255163
 ] 

holdenk commented on SPARK-14654:
-

[~rxin] its true, but compared to the current proposed new API we could use it 
for the built-in-types with standard accumulation in place of having things 
like `newLongAccumulator` and `newIntAccumulator` and `newDoubleAccumulator` 
which are similarly constrained (custom accumulators would still be supported 
by manually constructing).

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-22 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255159#comment-15255159
 ] 

Reynold Xin commented on SPARK-14654:
-

That only works for the built-in ones and can't work if there is more than one 
accumulator types that support a given value, e.g. double for double sum and 
avg.


> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-22 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255123#comment-15255123
 ] 

holdenk commented on SPARK-14654:
-

Giving it a bit of thought on the flight, is there a reason why we don't want 
to use reflection in the Java API(e.g. something like 
https://github.com/holdenk/spark/tree/alternative-java-scala-acc-api )? It 
seems like it would make it even simpler.

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-22 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254053#comment-15254053
 ] 

Apache Spark commented on SPARK-14654:
--

User 'cloud-fan' has created a pull request for this issue:
https://github.com/apache/spark/pull/12612

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-21 Thread holdenk (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15253000#comment-15253000
 ] 

holdenk commented on SPARK-14654:
-

Looks like a nice clean API, out of interest what is `dedup` for? Is that the 
new planned name for data property?

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def newLongAccumulator(name: Long): LongAccumulator
>   def newLongAccumulator(name: Long, dedup: Boolean): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-15 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15242540#comment-15242540
 ] 

Reynold Xin commented on SPARK-14654:
-

cc [~holdenk] and [~imranr]

> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14654) New accumulator API

2016-04-15 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15242536#comment-15242536
 ] 

Reynold Xin commented on SPARK-14654:
-

 Note that one challenge is that we most likely don't want to break the 
existing accumulator API, meaning we'd need to name this something else or put 
it in a different package, or both.


> New accumulator API
> ---
>
> Key: SPARK-14654
> URL: https://issues.apache.org/jira/browse/SPARK-14654
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> The current accumulator API has a few problems:
> 1. Its type hierarchy is very complicated, with Accumulator, Accumulable, 
> AccumulatorParam, AccumulableParam, etc.
> 2. The intermediate buffer type must be the same as the output type, so there 
> is no way to define an accumulator that computes averages.
> 3. It is very difficult to specialize the methods, leading to excessive 
> boxing and making accumulators bad for metrics that change for each record.
> 4. There is not a single coherent API that works for both Java and Scala.
> This is a proposed new API that addresses all of the above. In this new API:
> 1. There is only a single class (Accumulator) that is user facing
> 2. The intermediate value is stored in the accumulator itself and can be 
> different from the output type.
> 3. Concrete implementations can provide its own specialized methods.
> 4. Designed to work for both Java and Scala.
> {code}
> abstract class Accumulator[IN, OUT] extends Serializable {
>   def isRegistered: Boolean = ...
>   def register(metadata: AccumulatorMetadata): Unit = ...
>   def metadata: AccumulatorMetadata = ...
>   def reset(): Unit
>   def add(v: IN): Unit
>   def merge(other: Accumulator[IN, OUT]): Unit
>   def value: OUT
>   def localValue: OUT = value
>   final def registerAccumulatorOnExecutor(): Unit = {
> // Automatically register the accumulator when it is deserialized with 
> the task closure.
> // This is for external accumulators and internal ones that do not 
> represent task level
> // metrics, e.g. internal SQL metrics, which are per-operator.
> val taskContext = TaskContext.get()
> if (taskContext != null) {
>   taskContext.registerAccumulator(this)
> }
>   }
>   // Called by Java when deserializing an object
>   private def readObject(in: ObjectInputStream): Unit = 
> Utils.tryOrIOException {
> in.defaultReadObject()
> registerAccumulator()
>   }
> }
> {code}
> Metadata, provided by Spark after registration:
> {code}
> class AccumulatorMetadata(
>   val id: Long,
>   val name: Option[String],
>   val countFailedValues: Boolean
> ) extends Serializable
> {code}
> and an implementation that also offers specialized getters and setters
> {code}
> class LongAccumulator extends Accumulator[jl.Long, jl.Long] {
>   private[this] var _sum = 0L
>   override def reset(): Unit = _sum = 0L
>   override def add(v: jl.Long): Unit = {
> _sum += v
>   }
>   override def merge(other: Accumulator[jl.Long, jl.Long]): Unit = other 
> match {
> case o: LongAccumulator => _sum += o.sum
> case _ => throw new UnsupportedOperationException(
>   s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
>   }
>   override def value: jl.Long = _sum
>   def sum: Long = _sum
> }
> {code}
> and SparkContext...
> {code}
> class SparkContext {
>   ...
>   def newLongAccumulator(): LongAccumulator
>   def registerAccumulator[IN, OUT](acc: Accumulator[IN, OUT]): 
> Accumulator[IN, OUT]
>   ...
> }
> {code}
> To use it ...
> {code}
> val acc = sc.newLongAccumulator()
> sc.parallelize(1 to 1000).map { i =>
>   acc.add(1)
>   i
> }
> {code}
> A work-in-progress prototype here: 
> https://github.com/rxin/spark/tree/accumulator-refactor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org