maropu commented on a change in pull request #30144: URL: https://github.com/apache/spark/pull/30144#discussion_r606314039
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ########## @@ -632,9 +632,10 @@ class Analyzer(override val catalogManager: CatalogManager) if (resolvedInfo.nonEmpty) { val (extraAggExprs, resolvedHavingCond) = resolvedInfo.get val newChild = h.child match { - case Aggregate(Seq(gs: GroupingSet), aggregateExpressions, child) => + case Aggregate( + GroupingAnalytics(selectedGroupByExprs, _, groupByExprs), aggregateExpressions, child) => Review comment: nit: ``` case Aggregate(GroupingAnalytics(selectedGroupByExprs, _, groupByExprs), aggregateExpressions, child) => constructAggregate( ``` ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ########## @@ -658,16 +659,18 @@ class Analyzer(override val catalogManager: CatalogManager) // CUBE/ROLLUP/GROUPING SETS. This also replace grouping()/grouping_id() in resolved // Filter/Sort. def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { - case h @ UnresolvedHaving(_, agg @ Aggregate(Seq(gs: GroupingSet), aggregateExpressions, _)) - if agg.childrenResolved && (gs.groupByExprs ++ aggregateExpressions).forall(_.resolved) => + case h @ UnresolvedHaving(_, agg @ Aggregate( + GroupingAnalytics(_, _, groupByExprs), aggregateExpressions, _)) + if agg.childrenResolved && (groupByExprs ++ aggregateExpressions).forall(_.resolved) => tryResolveHavingCondition(h) case a if !a.childrenResolved => a // be sure all of the children are resolved. // Ensure group by expressions and aggregate expressions have been resolved. - case Aggregate(Seq(gs: GroupingSet), aggregateExpressions, child) - if (gs.groupByExprs ++ aggregateExpressions).forall(_.resolved) => - constructAggregate(gs.selectedGroupByExprs, gs.groupByExprs, aggregateExpressions, child) + case Aggregate( + GroupingAnalytics(selectedGroupByExprs, _, groupByExprs), aggregateExpressions, child) Review comment: nit: ` case Aggregate(GroupingAnalytics(selectedGroupByExprs, _, groupByExprs), aggExprs, child)` ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala ########## @@ -212,3 +212,25 @@ object GroupingID { if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType } } + + +object GroupingAnalytics { + def unapply(exprs: Seq[Expression]) + : Option[(Seq[Seq[Expression]], Seq[Seq[Expression]], Seq[Expression])] = { + val (groupingSetExprs, others) = exprs.partition(_.isInstanceOf[GroupingSet]) + if (groupingSetExprs.isEmpty) { + None + } else { + val groupingSets = groupingSetExprs.map(_.asInstanceOf[GroupingSet]) + val groups = groupingSets.flatMap(_.groupByExprs) ++ others + val unMergedSelectedGroupByExprs = groupingSets.map(_.selectedGroupByExprs) Review comment: nit: `unMerged` -> `unmerged`? ########## File path: docs/sql-ref-syntax-qry-select-groupby.md ########## @@ -23,8 +23,9 @@ license: | The `GROUP BY` clause is used to group the rows based on a set of specified grouping expressions and compute aggregations on the group of rows based on one or more specified aggregate functions. Spark also supports advanced aggregations to do multiple -aggregations for the same input record set via `GROUPING SETS`, `CUBE`, `ROLLUP` clauses. -When a FILTER clause is attached to an aggregate function, only the matching rows are passed to that function. +aggregations for the same input record set via `GROUPING SETS`, `CUBE`, `ROLLUP` clauses, also spark support partial grouping Review comment: ``` the group of rows based on one or more specified aggregate functions. Spark also supports advanced aggregations to do multiple aggregations for the same input record set via `GROUPING SETS`, `CUBE`, `ROLLUP` clauses. These grouping analytics clauses can be specified with regular grouping expressions (partial grouping analytics) and the different grouping analytics clauses can be specified together (concatenated groupings). When a FILTER clause is attached to an aggregate function, only the matching ``` ? ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala ########## @@ -212,3 +212,25 @@ object GroupingID { if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType } } + + Review comment: nit: remove this blank line. ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ########## @@ -658,16 +659,18 @@ class Analyzer(override val catalogManager: CatalogManager) // CUBE/ROLLUP/GROUPING SETS. This also replace grouping()/grouping_id() in resolved // Filter/Sort. def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { - case h @ UnresolvedHaving(_, agg @ Aggregate(Seq(gs: GroupingSet), aggregateExpressions, _)) - if agg.childrenResolved && (gs.groupByExprs ++ aggregateExpressions).forall(_.resolved) => + case h @ UnresolvedHaving(_, agg @ Aggregate( + GroupingAnalytics(_, _, groupByExprs), aggregateExpressions, _)) + if agg.childrenResolved && (groupByExprs ++ aggregateExpressions).forall(_.resolved) => Review comment: nit: ``` GroupingAnalytics(_, _, groupByExprs), aggregateExpressions, _)) if agg.childrenResolved && (groupByExprs ++ aggregateExpressions).forall(_.resolved) => ``` ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala ########## @@ -212,3 +212,25 @@ object GroupingID { if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType } } + + +object GroupingAnalytics { + def unapply(exprs: Seq[Expression]) + : Option[(Seq[Seq[Expression]], Seq[Seq[Expression]], Seq[Expression])] = { + val (groupingSetExprs, others) = exprs.partition(_.isInstanceOf[GroupingSet]) + if (groupingSetExprs.isEmpty) { + None + } else { + val groupingSets = groupingSetExprs.map(_.asInstanceOf[GroupingSet]) + val groups = groupingSets.flatMap(_.groupByExprs) ++ others + val unMergedSelectedGroupByExprs = groupingSets.map(_.selectedGroupByExprs) + val selectedGroupByExprs = unMergedSelectedGroupByExprs.tail + .foldRight(unMergedSelectedGroupByExprs.head) { (x, y) => + for (a <- x; b <- y) yield b ++ a + }.map { groupByExprs => + (others ++ groupByExprs).distinct + } Review comment: nit: indents ``` }.map { groupByExprs => (others ++ groupByExprs).distinct } ``` ########## File path: sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out ########## @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 37 +-- Number of queries: 45 Review comment: NOTE: I've checked the the output result are the same with the PostgreSQL ones. ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala ########## @@ -212,3 +212,25 @@ object GroupingID { if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType } } + + +object GroupingAnalytics { + def unapply(exprs: Seq[Expression]) + : Option[(Seq[Seq[Expression]], Seq[Seq[Expression]], Seq[Expression])] = { + val (groupingSetExprs, others) = exprs.partition(_.isInstanceOf[GroupingSet]) + if (groupingSetExprs.isEmpty) { + None + } else { + val groupingSets = groupingSetExprs.map(_.asInstanceOf[GroupingSet]) + val groups = groupingSets.flatMap(_.groupByExprs) ++ others + val unMergedSelectedGroupByExprs = groupingSets.map(_.selectedGroupByExprs) + val selectedGroupByExprs = unMergedSelectedGroupByExprs.tail + .foldRight(unMergedSelectedGroupByExprs.head) { (x, y) => Review comment: nit: `foldRight` -> `foldLeft`? (I think most code use `foldLeft` if both `foldLeft`/`foldRight` can work) ########## File path: sql/core/src/test/resources/sql-tests/inputs/group-analytics.sql ########## @@ -59,4 +59,12 @@ SELECT course, year FROM courseSales GROUP BY CUBE(course, year) ORDER BY groupi -- Aliases in SELECT could be used in ROLLUP/CUBE/GROUPING SETS SELECT a + b AS k1, b AS k2, SUM(a - b) FROM testData GROUP BY CUBE(k1, k2); SELECT a + b AS k, b, SUM(a - b) FROM testData GROUP BY ROLLUP(k, b); -SELECT a + b, b AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k) +SELECT a + b, b AS k, SUM(a - b) FROM testData GROUP BY a + b, k GROUPING SETS(k); + +-- GROUP BY use mixed Separate columns and CUBE/ROLLUP +SELECT a, b, count(1) FROM testData GROUP BY a, b, CUBE(a, b); +SELECT a, b, count(1) FROM testData GROUP BY a, b, ROLLUP(a, b); +SELECT a, b, count(1) FROM testData GROUP BY CUBE(a, b), ROLLUP(a, b); +SELECT a, b, count(1) FROM testData GROUP BY a, CUBE(a, b), ROLLUP(b); +SELECT a, b, count(1) FROM testData GROUP BY CUBE(a, b), ROLLUP(a, b) GROUPING SETS(a, b); Review comment: It looks an invalid query. ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala ########## @@ -212,3 +212,25 @@ object GroupingID { if (SQLConf.get.integerGroupingIdEnabled) IntegerType else LongType } } + + +object GroupingAnalytics { + def unapply(exprs: Seq[Expression]) + : Option[(Seq[Seq[Expression]], Seq[Seq[Expression]], Seq[Expression])] = { + val (groupingSetExprs, others) = exprs.partition(_.isInstanceOf[GroupingSet]) + if (groupingSetExprs.isEmpty) { + None + } else { + val groupingSets = groupingSetExprs.map(_.asInstanceOf[GroupingSet]) + val groups = groupingSets.flatMap(_.groupByExprs) ++ others + val unMergedSelectedGroupByExprs = groupingSets.map(_.selectedGroupByExprs) + val selectedGroupByExprs = unMergedSelectedGroupByExprs.tail + .foldRight(unMergedSelectedGroupByExprs.head) { (x, y) => + for (a <- x; b <- y) yield b ++ a Review comment: nit: `b ++ a` -> `a ++ b` for a natural order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org