This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 715b43428913 [SPARK-46763] Fix assertion failure in ReplaceDeduplicateWithAggregate for duplicate attributes 715b43428913 is described below commit 715b43428913d6a631f8f9043baac751b88cb5d4 Author: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com> AuthorDate: Tue Jan 23 17:15:30 2024 +0800 [SPARK-46763] Fix assertion failure in ReplaceDeduplicateWithAggregate for duplicate attributes ### What changes were proposed in this pull request? - Updated the `ReplaceDeduplicateWithAggregate` implementation to reuse aliases generated for an attribute. - Added a unit test to ensure scenarios with duplicate non-grouping keys are correctly optimized. ### Why are the changes needed? - `ReplaceDeduplicateWithAggregate` replaces `Deduplicate` with an `Aggregate` operator with grouping expressions for the deduplication keys and aggregate expressions for the non-grouping keys (to preserve the output schema and keep the non-grouping columns). - For non-grouping key `a#X`, it generates an aggregate expression of the form `first(a#X, false) AS a#Y` - In case the non-grouping keys have a repeated attribute (with the same name and exprId), the existing logic would generate two different aggregate expressions both having two different exprId. - This then leads to duplicate rewrite attributes error (in `transformUpWithNewOutput`) when transforming the remaining tree. - For example, for the query ``` Project [a#0, b#1] +- Deduplicate [b#1] +- Project [a#0, a#0, b#1] +- LocalRelation <empty>, [a#0, b#1] ``` the existing logic would transform it to ``` Project [a#3, b#1] +- Aggregate [b#1], [first(a#0, false) AS a#3, first(a#0, false) AS a#5, b#1] +- Project [a#0, a#0, b#1] +- LocalRelation <empty>, [a#0, b#1] ``` with the aggregate mapping having two entries `a#0 -> a#3, a#0 -> a#5`. The correct transformation would be ``` Project [a#3, b#1] +- Aggregate [b#1], [first(a#0, false) AS a#3, first(a#0, false) AS a#3, b#1] +- Project [a#0, a#0, b#1] +- LocalRelation <empty>, [a#0, b#1] ``` with the aggregate mapping having only one entry `a#0 -> a#3`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a unit test in `ResolveOperatorSuite`. ### Was this patch authored or co-authored using generative AI tooling? No Closes #44835 from nikhilsheoran-db/SPARK-46763. Authored-by: Nikhil Sheoran <125331115+nikhilsheoran...@users.noreply.github.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 6 ++++- .../catalyst/optimizer/ReplaceOperatorSuite.scala | 31 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4186c8c1db91..46d3043df3eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -2192,11 +2192,15 @@ object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput { case d @ Deduplicate(keys, child) if !child.isStreaming => val keyExprIds = keys.map(_.exprId) + val generatedAliasesMap = new mutable.HashMap[Attribute, Alias](); val aggCols = child.output.map { attr => if (keyExprIds.contains(attr.exprId)) { attr } else { - Alias(new First(attr).toAggregateExpression(), attr.name)() + // Keep track of the generated aliases to avoid generating multiple aliases + // for the same attribute (in case the attribute is duplicated) + generatedAliasesMap.getOrElseUpdate(attr, + Alias(new First(attr).toAggregateExpression(), attr.name)()) } } // SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index 5d81e96a8e58..cb9577e050d0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -265,4 +265,35 @@ class ReplaceOperatorSuite extends PlanTest { Join(basePlan, otherPlan, LeftAnti, Option(condition), JoinHint.NONE)).analyze comparePlans(result, correctAnswer) } + + test("SPARK-46763: ReplaceDeduplicateWithAggregate non-grouping keys with duplicate attributes") { + val a = $"a".int + val b = $"b".int + val first_a = Alias(new First(a).toAggregateExpression(), a.name)() + + val query = Project( + projectList = Seq(a, b), + Deduplicate( + keys = Seq(b), + child = Project( + projectList = Seq(a, a, b), + child = LocalRelation(Seq(a, b)) + ) + ) + ).analyze + + val result = Optimize.execute(query) + val correctAnswer = Project( + projectList = Seq(first_a.toAttribute, b), + Aggregate( + Seq(b), + Seq(first_a, first_a, b), + Project( + projectList = Seq(a, a, b), + child = LocalRelation(Seq(a, b)) + ) + ) + ).analyze + comparePlans(result, correctAnswer) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org