This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd864a085f2 [SPARK-41660][SQL] Only propagate metadata columns if they 
are used
bd864a085f2 is described below

commit bd864a085f2764b8ccdfe67ffaf7400b6f44f717
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Wed Dec 21 21:07:34 2022 +0800

    [SPARK-41660][SQL] Only propagate metadata columns if they are used
    
    ### What changes were proposed in this pull request?
    
    Ideally it's OK to always propagate metadata columns, as column pruning 
will kick in later and prune them aways if they are not used. However, it may 
cause problems in cases like CTE. https://github.com/apache/spark/pull/39081 
fixed such a bug.
    
    This PR only propagates metadata columns if they are used, to keep the 
analyzed plan simple and reliable.
    
    ### Why are the changes needed?
    
    avoid potential bugs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new tests
    
    Closes #39152 from cloud-fan/follow.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   | 15 +++++++++------
 .../spark/sql/connector/MetadataColumnSuite.scala       | 17 +++++++++++++++++
 2 files changed, 26 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e959e7208a4..c21ff7bd90f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -978,7 +978,7 @@ class Analyzer(override val catalogManager: CatalogManager)
         if (metaCols.isEmpty) {
           node
         } else {
-          val newNode = addMetadataCol(node)
+          val newNode = addMetadataCol(node, attr => metaCols.exists(_.exprId 
== attr.exprId))
           // We should not change the output schema of the plan. We should 
project away the extra
           // metadata columns if necessary.
           if (newNode.sameOutput(node)) {
@@ -1012,15 +1012,18 @@ class Analyzer(override val catalogManager: 
CatalogManager)
       })
     }
 
-    private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
-      case s: ExposesMetadataColumns => s.withMetadataColumns()
-      case p: Project =>
+    private def addMetadataCol(
+        plan: LogicalPlan,
+        isRequired: Attribute => Boolean): LogicalPlan = plan match {
+      case s: ExposesMetadataColumns if s.metadataOutput.exists(isRequired) =>
+        s.withMetadataColumns()
+      case p: Project if p.metadataOutput.exists(isRequired) =>
         val newProj = p.copy(
           projectList = p.projectList ++ p.metadataOutput,
-          child = addMetadataCol(p.child))
+          child = addMetadataCol(p.child, isRequired))
         newProj.copyTagsFrom(p)
         newProj
-      case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
+      case _ => plan.withNewChildren(plan.children.map(addMetadataCol(_, 
isRequired)))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
index 8454b9f85ec..9abf0fd59e6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.struct
 
 class MetadataColumnSuite extends DatasourceV2SQLBase {
@@ -232,4 +233,20 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
       )
     }
   }
+
+  test("SPARK-41660: only propagate metadata columns if they are used") {
+    withTable(tbl) {
+      prepareTable()
+      val df = sql(s"SELECT t2.id FROM $tbl t1 JOIN $tbl t2 USING (id)")
+      val scans = df.logicalPlan.collect {
+        case d: DataSourceV2Relation => d
+      }
+      assert(scans.length == 2)
+      scans.foreach { scan =>
+        // The query only access join hidden columns, and scan nodes should 
not expose its metadata
+        // columns.
+        assert(scan.output.map(_.name) == Seq("id", "data"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to