[ 
https://issues.apache.org/jira/browse/SPARK-30590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Mantovani updated SPARK-30590:
-------------------------------------
    Description: 
 How to reproduce:
{code:scala}
val df = Seq((1,2,3,4,5,6)).toDF("a","b","c","d","e","f")

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Row

case class FooAgg(s:Int) extends Aggregator[Row, Int, Int] {
  def zero:Int = s
  def reduce(b: Int, r: Row): Int = b + r.getAs[Int](0)
  def merge(b1: Int, b2: Int): Int = b1 + b2
  def finish(b: Int): Int = b
  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
  def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

val fooAgg = (i:Int) => FooAgg(i).toColumn.name(s"foo_agg_$i")

scala> df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5)).show
+---------+---------+---------+---------+---------+
|foo_agg_1|foo_agg_2|foo_agg_3|foo_agg_4|foo_agg_5|
+---------+---------+---------+---------+---------+
|        3|        5|        7|        9|       11|
+---------+---------+---------+---------+---------+

{code}
With 6 arguments we have error:
{code:scala}
scala> 
df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5),fooAgg(6)).show

org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate 
[fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, 
assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, 
IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, 
None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as 
int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS 
foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS 
value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS 
value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), 
None, None, None, input[0, int, false] AS value#129, 
assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, 
IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, 
None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as 
int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS 
foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS 
value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS 
value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];;
'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS 
value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS 
value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), 
None, None, None, input[0, int, false] AS value#119, 
assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, 
IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, 
None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as 
int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS 
foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS 
value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS 
value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), 
None, None, None, input[0, int, false] AS value#134, 
assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, 
IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, 
None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as 
int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS 
foo_agg_6#141]
+- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS 
e#17, _6#11 AS F#18]
 +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]

at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
 at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
 at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)

 at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
 at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
 at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
 at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3412)
 at org.apache.spark.sql.Dataset.select(Dataset.scala:1340)
 ... 50 elided
{code}
 

 

 

  was:
 How to reproduce:
{code:scala}
val df = Seq((1,2,3,4,5,6)).toDF("a","b","c","d","e","f")

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders

import org.apache.spark.sql.Rowcase class FooAgg(s:Int) extends Aggregator[Row, 
Int, Int] {
  def zero:Int = s
  def reduce(b: Int, r: Row): Int = b + r.getAs[Int](0)
  def merge(b1: Int, b2: Int): Int = b1 + b2
  def finish(b: Int): Int = b
  def bufferEncoder: Encoder[Int] = Encoders.scalaInt
  def outputEncoder: Encoder[Int] = Encoders.scalaInt
}

val fooAgg = (i:Int) => FooAgg(i).toColumn.name(s"foo_agg_$i")

scala> df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5)).show
+---------+---------+---------+---------+---------+
|foo_agg_1|foo_agg_2|foo_agg_3|foo_agg_4|foo_agg_5|
+---------+---------+---------+---------+---------+
|        3|        5|        7|        9|       11|
+---------+---------+---------+---------+---------+

{code}
With 6 arguments we have error:
{code:scala}
scala> 
df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5),fooAgg(6)).show

org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate 
[fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, 
assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, 
IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, 
None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 as 
int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) AS 
foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS 
value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS 
value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, fooagg(FooAgg(4), 
None, None, None, input[0, int, false] AS value#129, 
assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, 
IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, 
None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 as 
int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) AS 
foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS 
value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS 
value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];;
'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS 
value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS 
value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), 
None, None, None, input[0, int, false] AS value#119, 
assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, 
IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, 
None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 as 
int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) AS 
foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS 
value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS 
value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), 
None, None, None, input[0, int, false] AS value#134, 
assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, 
IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, 
None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 as 
int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) AS 
foo_agg_6#141]
+- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS 
e#17, _6#11 AS F#18]
 +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]

at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
 at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
 at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
 at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
 at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)

 at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
 at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
 at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
 at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3412)
 at org.apache.spark.sql.Dataset.select(Dataset.scala:1340)
 ... 50 elided
{code}
 

 

 


> can't use more than five type-safe user-defined aggregation in select 
> statement
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-30590
>                 URL: https://issues.apache.org/jira/browse/SPARK-30590
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.3, 2.3.4, 2.4.4
>            Reporter: Daniel Mantovani
>            Priority: Major
>
>  How to reproduce:
> {code:scala}
> val df = Seq((1,2,3,4,5,6)).toDF("a","b","c","d","e","f")
> import org.apache.spark.sql.expressions.Aggregator
> import org.apache.spark.sql.Encoder
> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.Row
> case class FooAgg(s:Int) extends Aggregator[Row, Int, Int] {
>   def zero:Int = s
>   def reduce(b: Int, r: Row): Int = b + r.getAs[Int](0)
>   def merge(b1: Int, b2: Int): Int = b1 + b2
>   def finish(b: Int): Int = b
>   def bufferEncoder: Encoder[Int] = Encoders.scalaInt
>   def outputEncoder: Encoder[Int] = Encoders.scalaInt
> }
> val fooAgg = (i:Int) => FooAgg(i).toColumn.name(s"foo_agg_$i")
> scala> df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5)).show
> +---------+---------+---------+---------+---------+
> |foo_agg_1|foo_agg_2|foo_agg_3|foo_agg_4|foo_agg_5|
> +---------+---------+---------+---------+---------+
> |        3|        5|        7|        9|       11|
> +---------+---------+---------+---------+---------+
> {code}
> With 6 arguments we have error:
> {code:scala}
> scala> 
> df.select(fooAgg(1),fooAgg(2),fooAgg(3),fooAgg(4),fooAgg(5),fooAgg(6)).show
> org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate 
> [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS value#114, 
> assertnotnull(cast(value#114 as int)), input[0, int, false] AS value#113, 
> IntegerType, IntegerType, false) AS foo_agg_1#116, fooagg(FooAgg(2), None, 
> None, None, input[0, int, false] AS value#119, assertnotnull(cast(value#119 
> as int)), input[0, int, false] AS value#118, IntegerType, IntegerType, false) 
> AS foo_agg_2#121, fooagg(FooAgg(3), None, None, None, input[0, int, false] AS 
> value#124, assertnotnull(cast(value#124 as int)), input[0, int, false] AS 
> value#123, IntegerType, IntegerType, false) AS foo_agg_3#126, 
> fooagg(FooAgg(4), None, None, None, input[0, int, false] AS value#129, 
> assertnotnull(cast(value#129 as int)), input[0, int, false] AS value#128, 
> IntegerType, IntegerType, false) AS foo_agg_4#131, fooagg(FooAgg(5), None, 
> None, None, input[0, int, false] AS value#134, assertnotnull(cast(value#134 
> as int)), input[0, int, false] AS value#133, IntegerType, IntegerType, false) 
> AS foo_agg_5#136, fooagg(FooAgg(6), None, None, None, input[0, int, false] AS 
> value#139, assertnotnull(cast(value#139 as int)), input[0, int, false] AS 
> value#138, IntegerType, IntegerType, false) AS foo_agg_6#141];;
> 'Aggregate [fooagg(FooAgg(1), None, None, None, input[0, int, false] AS 
> value#114, assertnotnull(cast(value#114 as int)), input[0, int, false] AS 
> value#113, IntegerType, IntegerType, false) AS foo_agg_1#116, 
> fooagg(FooAgg(2), None, None, None, input[0, int, false] AS value#119, 
> assertnotnull(cast(value#119 as int)), input[0, int, false] AS value#118, 
> IntegerType, IntegerType, false) AS foo_agg_2#121, fooagg(FooAgg(3), None, 
> None, None, input[0, int, false] AS value#124, assertnotnull(cast(value#124 
> as int)), input[0, int, false] AS value#123, IntegerType, IntegerType, false) 
> AS foo_agg_3#126, fooagg(FooAgg(4), None, None, None, input[0, int, false] AS 
> value#129, assertnotnull(cast(value#129 as int)), input[0, int, false] AS 
> value#128, IntegerType, IntegerType, false) AS foo_agg_4#131, 
> fooagg(FooAgg(5), None, None, None, input[0, int, false] AS value#134, 
> assertnotnull(cast(value#134 as int)), input[0, int, false] AS value#133, 
> IntegerType, IntegerType, false) AS foo_agg_5#136, fooagg(FooAgg(6), None, 
> None, None, input[0, int, false] AS value#139, assertnotnull(cast(value#139 
> as int)), input[0, int, false] AS value#138, IntegerType, IntegerType, false) 
> AS foo_agg_6#141]
> +- Project [_1#6 AS a#13, _2#7 AS b#14, _3#8 AS c#15, _4#9 AS d#16, _5#10 AS 
> e#17, _6#11 AS F#18]
>  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:43)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:95)
>  at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:431)
>  at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$3.apply(CheckAnalysis.scala:430)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
>  at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:430)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
>  at 
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
>  at 
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
>  at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
>  at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3412)
>  at org.apache.spark.sql.Dataset.select(Dataset.scala:1340)
>  ... 50 elided
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to