Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/9409#discussion_r44213709 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Utils.scala --- @@ -54,10 +54,14 @@ object Utils { mode = aggregate.Complete, isDistinct = false) - // We do not support multiple COUNT DISTINCT columns for now. - case expressions.CountDistinct(children) if children.length == 1 => + case expressions.CountDistinct(children) => + val child = if (children.size > 1) { + DropAnyNull(CreateStruct(children)) --- End diff -- @yhuai if we combine this with the distinct rewriting rule. It will add a ```struct``` to the groupBy clause of the first aggregate. This is currently not allowed in the new UDAF path, so it'll fall back to the old path. For example: val data2 = Seq[(Integer, Integer, Integer)]( (1, 10, -10), (null, -60, 60), (1, 30, -30), (1, 30, 30), (2, 1, 1), (null, -10, 10), (2, -1, null), (2, 1, 1), (2, null, 1), (null, 100, -10), (3, null, 3), (null, null, null), (3, null, null)).toDF("key", "value1", "value2") data2.registerTempTable("agg2") val q sql( """ |SELECT | key, | count(distinct value1), | count(distinct value2), | count(distinct value1, value2) |FROM agg2 |GROUP BY key """.stripMargin) Will create the following physical plan: == Physical Plan == TungstenAggregate(key=[key#3], functions=[(count(if ((gid#44 = 1)) attributereference#45 else null),mode=Final,isDistinct=false),(count(if ((gid#44 = 3)) attributereference#47 else null),mode=Final,isDistinct=false),(count(if ((gid#44 = 2)) dropanynull#46 else null),mode=Final,isDistinct=false)], output=[key#3,_c1#32L,_c2#33L,_c3#34L]) TungstenExchange(Shuffle without coordinator) hashpartitioning(key#3,200), None TungstenAggregate(key=[key#3], functions=[(count(if ((gid#44 = 1)) attributereference#45 else null),mode=Partial,isDistinct=false),(count(if ((gid#44 = 3)) attributereference#47 else null),mode=Partial,isDistinct=false),(count(if ((gid#44 = 2)) dropanynull#46 else null),mode=Partial,isDistinct=false)], output=[key#3,count#49L,count#53L,count#51L]) Aggregate false, [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44], [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44] ConvertToSafe TungstenExchange(Shuffle without coordinator) hashpartitioning(key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44,200), None ConvertToUnsafe Aggregate true, [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44], [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44] !Expand [List(key#3, value1#4, null, null, 1),List(key#3, null, dropanynull(struct(value1#4,value2#5)), null, 2),List(key#3, null, null, value2#5, 3)], [key#3,attributereference#45,dropanynull#46,attributereference#47,gid#44] LocalTableScan [key#3,value1#4,value2#5], [[1,10,-10],[null,-60,60],[1,30,-30],[1,30,30],[2,1,1],[null,-10,10],[2,-1,null],[2,1,1],[2,null,1],[null,100,-10],[3,null,3],[null,null,null],[3,null,null]] Is it possible to add support for fixed width structs as group by expression to the new aggregation path?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org