Re: trouble using Aggregator with DataFrame

2016-02-17 Thread Koert Kuipers
https://issues.apache.org/jira/browse/SPARK-13363

On Wed, Feb 17, 2016 at 3:02 PM, Michael Armbrust 
wrote:

> Glad you like it :)
>
> This sounds like a bug, and we should fix it as we merge DataFrame /
> Dataset for 2.0.  Could you open JIRA targeted at 2.0?
>
> On Wed, Feb 17, 2016 at 2:22 PM, Koert Kuipers  wrote:
>
>> first of all i wanted to say that i am very happy to see
>> org.apache.spark.sql.expressions.Aggregator, it is a neat api, especially
>> when compared to the UDAF/AggregateFunction stuff.
>>
>> its doc/comments says: A base class for user-defined aggregations, which
>> can be used in [[DataFrame]] and [[Dataset]]
>>
>> it works well with Dataset/GroupedDataset, but i am having no luck using
>> it with DataFrame/GroupedData. does anyone have an example how to use it
>> with a DataFrame?
>>
>> in particular i would like to use it with this method in GroupedData:
>>   def agg(expr: Column, exprs: Column*): DataFrame
>>
>> clearly it should be possible, since GroupedDataset uses that very same
>> method to do the work:
>>   private def agg(exprs: Column*): DataFrame =
>> groupedData.agg(withEncoder(exprs.head), exprs.tail.map(withEncoder):
>> _*)
>>
>> the trick seems to be the wrapping in withEncoder, which is private. i
>> tried to do something like it myself, spending my usual daily 30 mins
>> getting around private restrictions in spark on this, but i had no luck
>> since it uses more private stuff on TypedColumn. also this column/catalyst
>> stuff makes me instantly sleepy so i didn't try to hard.
>>
>> anyhow, my attempt at using it in DataFrame:
>>
>> val simpleSum = new SqlAggregator[Int, Int, Int] {
>>   def zero: Int = 0 // The initial value.
>>   def reduce(b: Int, a: Int) = b + a// Add an element to the running
>> total
>>   def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
>>   def finish(b: Int) = b// Return the final result.
>> }.toColumn
>>
>> val df = sc.makeRDD(1 to 3).map(i => (i, i)).toDF("k", "v")
>> df.groupBy("k").agg(simpleSum).show
>>
>> and the resulting error:
>> org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate
>> [k#104], [k#104,($anon$3(),mode=Complete,isDistinct=false) AS sum#106];
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:46)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:241)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:122)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
>> at
>> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:46)
>> at
>> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
>> at org.apache.spark.sql.DataFrame.(DataFrame.scala:130)
>> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:49)
>>
>> best, koert
>>
>>
>>
>>
>


Re: trouble using Aggregator with DataFrame

2016-02-17 Thread Michael Armbrust
Glad you like it :)

This sounds like a bug, and we should fix it as we merge DataFrame /
Dataset for 2.0.  Could you open JIRA targeted at 2.0?

On Wed, Feb 17, 2016 at 2:22 PM, Koert Kuipers  wrote:

> first of all i wanted to say that i am very happy to see
> org.apache.spark.sql.expressions.Aggregator, it is a neat api, especially
> when compared to the UDAF/AggregateFunction stuff.
>
> its doc/comments says: A base class for user-defined aggregations, which
> can be used in [[DataFrame]] and [[Dataset]]
>
> it works well with Dataset/GroupedDataset, but i am having no luck using
> it with DataFrame/GroupedData. does anyone have an example how to use it
> with a DataFrame?
>
> in particular i would like to use it with this method in GroupedData:
>   def agg(expr: Column, exprs: Column*): DataFrame
>
> clearly it should be possible, since GroupedDataset uses that very same
> method to do the work:
>   private def agg(exprs: Column*): DataFrame =
> groupedData.agg(withEncoder(exprs.head), exprs.tail.map(withEncoder):
> _*)
>
> the trick seems to be the wrapping in withEncoder, which is private. i
> tried to do something like it myself, spending my usual daily 30 mins
> getting around private restrictions in spark on this, but i had no luck
> since it uses more private stuff on TypedColumn. also this column/catalyst
> stuff makes me instantly sleepy so i didn't try to hard.
>
> anyhow, my attempt at using it in DataFrame:
>
> val simpleSum = new SqlAggregator[Int, Int, Int] {
>   def zero: Int = 0 // The initial value.
>   def reduce(b: Int, a: Int) = b + a// Add an element to the running
> total
>   def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values.
>   def finish(b: Int) = b// Return the final result.
> }.toColumn
>
> val df = sc.makeRDD(1 to 3).map(i => (i, i)).toDF("k", "v")
> df.groupBy("k").agg(simpleSum).show
>
> and the resulting error:
> org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate
> [k#104], [k#104,($anon$3(),mode=Complete,isDistinct=false) AS sum#106];
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:46)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:241)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:122)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:46)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:130)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:49)
>
> best, koert
>
>
>
>