This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new b8204d1b89e [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs b8204d1b89e is described below commit b8204d1b89eea0c32e5269fb155651157e2c96e8 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed Aug 9 19:16:37 2023 +0800 [SPARK-43979][SQL][FOLLOWUP] transformUpWithNewOutput` should only be used with new outputs ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/41475 . It's risky to use `transformUpWithNewOutput` with existing attribute ids. If the plan contains duplicated attribute ids somewhere, then we will hit conflicting attributes and an assertion error will be thrown by `QueryPlan#transformUpWithNewOutput`. This PR takes a different approach. We canonicalize the plan first and then remove the alias-only project. Then we don't need `transformUpWithNewOutput` anymore as all attribute ids are normalized in the canonicalized plan. ### Why are the changes needed? fix potential bugs ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #42408 from cloud-fan/collect-metrics. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Kent Yao <y...@apache.org> (cherry picked from commit 4f25df1dc25cc4f002107821cc67e35c1fe0e42c) Signed-off-by: Kent Yao <y...@apache.org> --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index e198fd58953..848749f9e3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1071,13 +1071,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB def check(plan: LogicalPlan): Unit = plan.foreach { node => node match { case metrics @ CollectMetrics(name, _, _) => - val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics) + val simplifiedMetrics = simplifyPlanForCollectedMetrics(metrics.canonicalized) metricsMap.get(name) match { case Some(other) => - val simplifiedOther = simplifyPlanForCollectedMetrics(other) + val simplifiedOther = simplifyPlanForCollectedMetrics(other.canonicalized) // Exact duplicates are allowed. They can be the result // of a CTE that is used multiple times or a self join. - if (!simplifiedMetrics.sameResult(simplifiedOther)) { + if (simplifiedMetrics != simplifiedOther) { failAnalysis( errorClass = "DUPLICATED_METRICS_NAME", messageParameters = Map("metricName" -> name)) @@ -1102,7 +1102,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB * duplicates metric definition. */ private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan = { - plan.transformUpWithNewOutput { + plan.resolveOperators { case p: Project if p.projectList.size == p.child.output.size => val assignExprIdOnly = p.projectList.zip(p.child.output).forall { case (left: Alias, right: Attribute) => @@ -1110,9 +1110,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case _ => false } if (assignExprIdOnly) { - (p.child, p.output.zip(p.child.output)) + p.child } else { - (p, Nil) + p } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org