This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b676a2d9a67 [SPARK-43979][SQL][FOLLOW-UP] Simplify metrics plan should 
replace nodes by new attributes
b676a2d9a67 is described below

commit b676a2d9a67c121c563979f93605d1215473ae32
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Sun Jun 18 13:45:46 2023 -0700

    [SPARK-43979][SQL][FOLLOW-UP] Simplify metrics plan should replace nodes by 
new attributes
    
    ### What changes were proposed in this pull request?
    
    https://github.com/apache/spark/pull/41475 introduces a fix that we remove 
extra alias-only project which might cause same metrics mismatch over the query 
plan. However, to make it more robust, we need to replace the attributes if we 
need to drop the extra Project.
    
    ### Why are the changes needed?
    
    Enhance the fix to cover more test case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    Closes #41620 from amaliujia/fix_json_followup.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/sql/catalyst/analysis/CheckAnalysis.scala      |  6 +++---
 .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala  | 16 ++++++++++++++++
 2 files changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index d3dc9a75dd5..e47966f1e27 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1081,7 +1081,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
    * duplicates metric definition.
    */
   private def simplifyPlanForCollectedMetrics(plan: LogicalPlan): LogicalPlan 
= {
-    plan.resolveOperatorsDown {
+    plan.resolveOperatorsUpWithNewOutput {
       case p: Project if p.projectList.size == p.child.output.size =>
         val assignExprIdOnly = p.projectList.zip(p.child.output).forall {
           case (left: Alias, right: Attribute) =>
@@ -1089,9 +1089,9 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
           case _ => false
         }
         if (assignExprIdOnly) {
-          p.child
+          (p.child, p.output.zip(p.child.output))
         } else {
-          p
+          (p, Nil)
         }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 7b4a4a52a85..381c7714402 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -4682,6 +4682,22 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
       """
     ).observe("my_event", count("*"))
     df1.crossJoin(df1)
+
+    val df2 = spark.sql(
+      """
+      WITH t1 AS (
+      SELECT customer_id, age, row_number() OVER(PARTITION BY customer_id 
ORDER BY age ASC) rn
+      FROM tmp_view)
+      SELECT customer_id, age FROM t1 WHERE rn = 1
+     """.stripMargin
+    ).observe("my_event2", count("*")).as("df2")
+
+    val df3 = spark.range(1, 5).toDF("id").withColumn("zaak_id", lit(1))
+      .withColumn("targetid", lit(2)).as("df3")
+    val df4 = spark.range(1, 5).toDF("id").withColumn("zaak_id", 
lit(2)).as("df4")
+    val df5 = df4.join(df2, col("df4.id") === col("df2.customer_id"), "inner")
+    val df6 = df3.join(df2, col("df3.zaak_id") === col("df2.customer_id"), 
"outer")
+    df5.crossJoin(df6)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to