[ https://issues.apache.org/jira/browse/SPARK-13363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-13363: ------------------------------------ Assignee: (was: Apache Spark) > Aggregator not working with DataFrame > ------------------------------------- > > Key: SPARK-13363 > URL: https://issues.apache.org/jira/browse/SPARK-13363 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.0 > Reporter: koert kuipers > Priority: Blocker > > org.apache.spark.sql.expressions.Aggregator doc/comments says: A base class > for user-defined aggregations, which can be used in [[DataFrame]] and > [[Dataset]] > it works well with Dataset/GroupedDataset, but i am having no luck using it > with DataFrame/GroupedData. does anyone have an example how to use it with a > DataFrame? > in particular i would like to use it with this method in GroupedData: > {noformat} > def agg(expr: Column, exprs: Column*): DataFrame > {noformat} > clearly it should be possible, since GroupedDataset uses that very same > method to do the work: > {noformat} > private def agg(exprs: Column*): DataFrame = > groupedData.agg(withEncoder(exprs.head), exprs.tail.map(withEncoder): _*) > {noformat} > the trick seems to be the wrapping in withEncoder, which is private. i tried > to do something like it myself, but i had no luck since it uses more private > stuff in TypedColumn. > anyhow, my attempt at using it in DataFrame: > {noformat} > val simpleSum = new SqlAggregator[Int, Int, Int] { > def zero: Int = 0 // The initial value. > def reduce(b: Int, a: Int) = b + a // Add an element to the running total > def merge(b1: Int, b2: Int) = b1 + b2 // Merge intermediate values. > def finish(b: Int) = b // Return the final result. > }.toColumn > val df = sc.makeRDD(1 to 3).map(i => (i, i)).toDF("k", "v") > df.groupBy("k").agg(simpleSum).show > {noformat} > and the resulting error: > {noformat} > org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate > [k#104], [k#104,($anon$3(),mode=Complete,isDistinct=false) AS sum#106]; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:46) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:241) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50) > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:122) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:46) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34) > at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:130) > at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:49) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org