[ https://issues.apache.org/jira/browse/SPARK-30590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021792#comment-17021792 ]
Daniel Mantovani edited comment on SPARK-30590 at 1/23/20 5:48 AM: ------------------------------------------------------------------- [~hyukjin.kwon] You tried with 5 parameters which works, you should try with 6 to get the exception: {code:java} scala> df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5),fooAgg(6)).show org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];; 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141] {code} was (Author: mantovani): [~hyukjin.kwon] You tried with 5 parameters which works, you should try with 6 to get the exception: scala> df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5),fooAgg(6)).show org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];; 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS foo_agg_6#141] > can't use more than five type-safe user-defined aggregation in select > statement > ------------------------------------------------------------------------------- > > Key: SPARK-30590 > URL: https://issues.apache.org/jira/browse/SPARK-30590 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.2.3, 2.3.4, 2.4.4 > Reporter: Daniel Mantovani > Priority: Major > > How to reproduce: > {code:scala} > val df = Seq((1,2,3,4,5,6)).toDF("a","b","c","d","e","f") > import org.apache.spark.sql.expressions.Aggregator > import org.apache.spark.sql.Encoder > import org.apache.spark.sql.Encoders > import org.apache.spark.sql.Row > case class FooAgg(s:Int) extends Aggregator[Row, Int, Int] { > def zero:Int = s > def reduce(b: Int, r: Row): Int = b + r.getAs[Int](0) > def merge(b1: Int, b2: Int): Int = b1 + b2 > def finish(b: Int): Int = b > def bufferEncoder: Encoder[Int] = Encoders.scalaInt > def outputEncoder: Encoder[Int] = Encoders.scalaInt > } > val fooAgg = (i:Int) => FooAgg(i).toColumn.name(s"foo_agg_$i") > scala> df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5)).show > +---------+---------+---------+---------+---------+ > |foo_agg_1|foo_agg_2|foo_agg_3|foo_agg_4|foo_agg_5| > +---------+---------+---------+---------+---------+ > | 3| 5| 7| 9| 11| > +---------+---------+---------+---------+---------+ > {code} > With 6 arguments we have error: > {code:scala} > scala> > df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5),fooAgg(6)).show > org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate > [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, > assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, > IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, > None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 > as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) > AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS > value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS > value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, > fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, > assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, > IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, > None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 > as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) > AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS > value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS > value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];; > 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS > value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS > value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, > fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, > assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, > IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, > None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 > as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) > AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS > value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS > value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, > fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, > assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, > IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, > None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 > as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) > AS foo_agg_6#141] > +- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS > e#17, _6#11 AS F#18] > +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11] > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430) > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47) > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3412) > at org.apache.spark.sql.Dataset.select(Dataset.scala:1340) > ... 50 elided > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org