[ https://issues.apache.org/jira/browse/SPARK-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16871798#comment-16871798 ]
Josh Rosen commented on SPARK-16474: ------------------------------------ I just ran into this same issue. The problem here is that {{RelationalGroupedDataset}} only supports custom aggregate expressions which are defined over \{{Row}}s; you can see this from the code at [https://github.com/apache/spark/blob/67042e90e763f6a2716b942c3f887e94813889ce/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala#L225]. The solution here is to use the typed {{KeyValueGroupedDataset}} API via {{groupByKey()}}. Unfortunately there's not a great way to provide better error messages here: {{Aggregator}} doesn't require {{TypeTag}} or {{ClassTag}} for its input type, so we lack a reliable mechanism to detect and fail-fast when we're passed an aggregate over non-Rows. > Global Aggregation doesn't seem to work at all > ----------------------------------------------- > > Key: SPARK-16474 > URL: https://issues.apache.org/jira/browse/SPARK-16474 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 1.6.2, 2.0.0 > Reporter: Amit Sela > Priority: Major > > Executing a global aggregation (not grouped by key) fails. > Take the following code for example: > {code} > val session = SparkSession.builder() > .appName("TestGlobalAggregator") > .master("local[*]") > .getOrCreate() > import session.implicits._ > val ds1 = List(1, 2, 3).toDS > val ds2 = ds1.agg( > new Aggregator[Int, Int, Int]{ > def zero: Int = 0 > def reduce(b: Int, a: Int): Int = b + a > def merge(b1: Int, b2: Int): Int = b1 + b2 > def finish(reduction: Int): Int = reduction > def bufferEncoder: Encoder[Int] = implicitly[Encoder[Int]] > def outputEncoder: Encoder[Int] = implicitly[Encoder[Int]] > }.toColumn) > ds2.printSchema > ds2.show > {code} > I would expect the result to be 6, but instead I get the following exception: > {noformat} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to java.lang.Integer > at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101) > ......... > {noformat} > Trying the same code on DataFrames in 1.6.2 results in: > {noformat} > Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved > operator 'Aggregate [(anon$1(),mode=Complete,isDistinct=false) AS > anon$1()#8]; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) > .......... > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org