[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-07-03 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15360841#comment-15360841
 ] 

Apache Spark commented on SPARK-15598:
--

User 'techaddict' has created a pull request for this issue:
https://github.com/apache/spark/pull/13347

> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-05-30 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15306885#comment-15306885
 ] 

Reynold Xin commented on SPARK-15598:
-

That's a good point. Basically you either have the zeroth value or not.


> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-05-28 Thread koert kuipers (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15305651#comment-15305651
 ] 

koert kuipers commented on SPARK-15598:
---

how will change this impact usage like this:
{noformat}
val customSummer =  new Aggregator[Data, Int, Int] {
  def zero: Int = 0
  def reduce(b: Int, a: Data): Int = b + a.i
  def merge(b1: Int, b2: Int): Int = b1 + b2
  def finish(r: Int): Int = r
}.toColumn()

val ds: Dataset[Data] = ...
val aggregated = ds.select(customSummer)
{noformat}
what if df is empty? currently it will return zero

> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-05-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303528#comment-15303528
 ] 

Reynold Xin commented on SPARK-15598:
-

Yup I think we'd need to refactor the internals.


> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-05-26 Thread koert kuipers (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303418#comment-15303418
 ] 

koert kuipers commented on SPARK-15598:
---

the reason i ask is that if you plan to do:
{noformat}
inputs.foldLeft(aggregator.init(input.head))(aggregator.reduce _)
{noformat}
then i think your implementation of reduce using an Aggregator will not work, 
since the first element gets added twice.

but looking at the code for TypedAggregateExpression and DeclarativeAggregate 
the alternative involves serious changes... 


> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-05-26 Thread koert kuipers (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303396#comment-15303396
 ] 

koert kuipers commented on SPARK-15598:
---

just to be clear, your intention is to use it roughly as follows per partition, 
illustrated with a list:
{noformat}
val inputs: List[IN] = ...
inputs.tail.foldLeft(aggregator.init(input.head))(aggregator.reduce _)
{noformat}
or alternatively since reduce is redundant (but perhaps this is less efficient):
{noformat}
inputs.map(aggregator.init).reduce(aggregator.merge)
{noformat}

> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-05-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303374#comment-15303374
 ] 

Apache Spark commented on SPARK-15598:
--

User 'techaddict' has created a pull request for this issue:
https://github.com/apache/spark/pull/13347

> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-05-26 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303383#comment-15303383
 ] 

Reynold Xin commented on SPARK-15598:
-

That separation is kept for performance reasons. Otherwise we would have to
invoke two function calls for each record.




> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15598) Change Aggregator.zero to Aggregator.init

2016-05-26 Thread koert kuipers (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15303375#comment-15303375
 ] 

koert kuipers commented on SPARK-15598:
---

this makes a lot of sense, and is consistent with algebird's Aggregator (while 
the current implementation is more like algebird's MonoidAggregator).

do you still need the reduce method? you could provide a reasonable default as:
def reduce(b: BUF, a: IN): BUF = merge(b, init(in))


> Change Aggregator.zero to Aggregator.init
> -
>
> Key: SPARK-15598
> URL: https://issues.apache.org/jira/browse/SPARK-15598
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Reynold Xin
>
> org.apache.spark.sql.expressions.Aggregator currently requires defining the 
> zero value for an aggregator. This is actually a limitation making it 
> difficult to implement APIs such as reduce. In reduce (or reduceByKey), a 
> single associative and commutative reduce function is specified by the user, 
> and there is no definition of zero value.
> A small tweak to the API is to change zero to init, taking an input, similar 
> to the following:
> {code}
> abstract class Aggregator[-IN, BUF, OUT] extends Serializable {
>   def init(a: IN): BUF
>   def reduce(b: BUF, a: IN): BUF
>   def merge(b1: BUF, b2: BUF): BUF
>   def finish(reduction: BUF): OUT
> }
> {code}
> Then reduce can be implemented using:
> {code}
> f: (T, T) => T
> new Aggregator[T, T, T] {
>   override def init(a: T): T = identify
>   override def reduce(b: T, a: T): T = f(b, a)
>   override def merge(b1: T, b2: T): T = f(b1, b2)
>   override def finish(reduction: T): T = identify
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org