This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fde8bd68ae [SPARK-42084][SQL] Avoid leaking the qualified-access-only 
restriction
8fde8bd68ae is described below

commit 8fde8bd68ae51757be29f4b586556eb25b3aa2b7
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Wed Jan 18 18:43:55 2023 +0800

    [SPARK-42084][SQL] Avoid leaking the qualified-access-only restriction
    
    ### What changes were proposed in this pull request?
    
    This is a better fix than https://github.com/apache/spark/pull/39077 and 
https://github.com/apache/spark/pull/38862
    
    The special attribute metadata `__qualified_access_only` is very risky, as 
it breaks normal column resolution. The aforementioned 2 PRs remove the 
restriction in `SubqueryAlias` and `Alias`, but it's not good enough as we may 
forget to do the same thing for new logical plans/expressions in the future. 
It's also problematic if advanced users manipulate logical plans and 
expressions directly, when there is no `SubqueryAlias` and `Alias` to remove 
the restriction.
    
    To be safe, we should only apply this restriction when resolving join 
hidden columns, which means the plan node right above `Project(Join(using or 
natural join))`. This PR simply removes the restriction when a column is 
resolved from a sequence of `Attributes`, or from star expansion, and also when 
adding the `Project` hidden columns to its output. This makes sure that the 
qualified-access-only restriction will not be leaked to normal column 
resolution, but only metadata column resolution.
    
    ### Why are the changes needed?
    
    To make the join hidden column feature more robust
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #39596 from cloud-fan/join.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala  |  9 ++++++---
 .../spark/sql/catalyst/analysis/unresolved.scala       |  6 ++++++
 .../sql/catalyst/expressions/namedExpressions.scala    |  7 ++-----
 .../spark/sql/catalyst/expressions/package.scala       |  7 ++++++-
 .../spark/sql/catalyst/plans/logical/LogicalPlan.scala |  2 +-
 .../catalyst/plans/logical/basicLogicalOperators.scala | 10 +---------
 .../org/apache/spark/sql/catalyst/util/package.scala   | 18 ++++++++++++------
 7 files changed, 34 insertions(+), 25 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d6b68a45e77..ba2c2759e2d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -194,7 +194,10 @@ class Analyzer(override val catalogManager: CatalogManager)
   override protected def isPlanIntegral(
       previousPlan: LogicalPlan,
       currentPlan: LogicalPlan): Boolean = {
-    !Utils.isTesting || 
LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan)
+    import org.apache.spark.sql.catalyst.util._
+    !Utils.isTesting || 
(LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) &&
+      (!LogicalPlanIntegrity.canGetOutputAttrs(currentPlan) ||
+        !currentPlan.output.exists(_.qualifiedAccessOnly)))
   }
 
   override def isView(nameParts: Seq[String]): Boolean = 
v1SessionCatalog.isView(nameParts)
@@ -984,7 +987,6 @@ class Analyzer(override val catalogManager: CatalogManager)
    * projecting away metadata columns prematurely.
    */
   object AddMetadataColumns extends Rule[LogicalPlan] {
-
     import org.apache.spark.sql.catalyst.util._
 
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDownWithPruning(
@@ -1039,7 +1041,8 @@ class Analyzer(override val catalogManager: 
CatalogManager)
         s.withMetadataColumns()
       case p: Project if p.metadataOutput.exists(a => 
requiredAttrIds.contains(a.exprId)) =>
         val newProj = p.copy(
-          projectList = p.projectList ++ p.metadataOutput,
+          // Do not leak the qualified-access-only restriction to normal plan 
outputs.
+          projectList = p.projectList ++ 
p.metadataOutput.map(_.markAsAllowAnyAccess()),
           child = addMetadataCol(p.child, requiredAttrIds))
         newProj.copyTagsFrom(p)
         newProj
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 4a4028dc4c4..5e20f12747b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -425,6 +425,12 @@ case class UnresolvedStar(target: Option[Seq[String]]) 
extends Star with Unevalu
 
     // If there is a table specified, use hidden input attributes as well
     val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly)
+      // Remove the qualified-access-only restriction immediately. The 
expanded attributes will be
+      // put in a logical plan node and becomes normal attributes. They can 
still keep the special
+      // attribute metadata to indicate that they are from metadata columns, 
but they should not
+      // keep any restrictions that may break column resolution for normal 
attributes.
+      // See SPARK-42084 for more details.
+      .map(_.markAsAllowAnyAccess())
     val expandedAttributes = (hiddenOutput ++ input.output).filter(
       matchedQualifier(_, target.get, resolver))
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index ed820c80561..d18cfea1629 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.catalyst.trees.TreePattern
 import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, 
METADATA_COL_ATTR_KEY}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.collection.ImmutableBitSet
@@ -190,10 +190,7 @@ case class Alias(child: Expression, name: String)(
 
   override def toAttribute: Attribute = {
     if (resolved) {
-      val a = AttributeReference(name, child.dataType, child.nullable, 
metadata)(exprId, qualifier)
-      // Alias has its own qualifier. It doesn't make sense to still restrict 
the hidden columns
-      // of natural/using join to be accessed by qualified name only.
-      if (a.qualifiedAccessOnly) a.markAsAllowAnyAccess() else a
+      AttributeReference(name, child.dataType, child.nullable, 
metadata)(exprId, qualifier)
     } else {
       UnresolvedAttribute.quoted(name)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 44813ac7b61..74f0875c285 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -337,7 +337,12 @@ package object expressions  {
       }
 
       def name = UnresolvedAttribute(nameParts).name
-      prunedCandidates match {
+      // We may have resolved the attributes from metadata columns. The 
resolved attributes will be
+      // put in a logical plan node and becomes normal attributes. They can 
still keep the special
+      // attribute metadata to indicate that they are from metadata columns, 
but they should not
+      // keep any restrictions that may break column resolution for normal 
attributes.
+      // See SPARK-42084 for more details.
+      prunedCandidates.map(_.markAsAllowAnyAccess()) match {
         case Seq(a) if nestedFields.nonEmpty =>
           // One match, but we also need to extract the requested nested field.
           // The foldLeft adds ExtractValues for every remaining parts of the 
identifier,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 7640d9234c7..d3df6f0dd98 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -211,7 +211,7 @@ abstract class OrderPreservingUnaryNode extends UnaryNode {
 
 object LogicalPlanIntegrity {
 
-  private def canGetOutputAttrs(p: LogicalPlan): Boolean = {
+  def canGetOutputAttrs(p: LogicalPlan): Boolean = {
     p.resolved && !p.expressions.exists { e =>
       e.exists {
         // We cannot call `output` in plans with a `ScalarSubquery` expr 
having no column,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 586e344df5e..343fa3517c6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1664,15 +1664,7 @@ case class SubqueryAlias(
 
   override def output: Seq[Attribute] = {
     val qualifierList = identifier.qualifier :+ alias
-    child.output.map { attr =>
-      // `SubqueryAlias` sets a new qualifier for its output columns. It 
doesn't make sense to still
-      // restrict the hidden columns of natural/using join to be accessed by 
qualified name only.
-      if (attr.qualifiedAccessOnly) {
-        attr.markAsAllowAnyAccess().withQualifier(qualifierList)
-      } else {
-        attr.withQualifier(qualifierList)
-      }
-    }
+    child.output.map(_.withQualifier(qualifierList))
   }
 
   override def metadataOutput: Seq[Attribute] = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 1fcd3f7662b..6466afac619 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -213,11 +213,17 @@ package object util extends Logging {
         .build()
     )
 
-    def markAsAllowAnyAccess(): Attribute = attr.withMetadata(
-      new MetadataBuilder()
-        .withMetadata(attr.metadata)
-        .remove(QUALIFIED_ACCESS_ONLY)
-        .build()
-    )
+    def markAsAllowAnyAccess(): Attribute = {
+      if (qualifiedAccessOnly) {
+        attr.withMetadata(
+          new MetadataBuilder()
+            .withMetadata(attr.metadata)
+            .remove(QUALIFIED_ACCESS_ONLY)
+            .build()
+        )
+      } else {
+        attr
+      }
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to