This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8fde8bd68ae [SPARK-42084][SQL] Avoid leaking the qualified-access-only restriction 8fde8bd68ae is described below commit 8fde8bd68ae51757be29f4b586556eb25b3aa2b7 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed Jan 18 18:43:55 2023 +0800 [SPARK-42084][SQL] Avoid leaking the qualified-access-only restriction ### What changes were proposed in this pull request? This is a better fix than https://github.com/apache/spark/pull/39077 and https://github.com/apache/spark/pull/38862 The special attribute metadata `__qualified_access_only` is very risky, as it breaks normal column resolution. The aforementioned 2 PRs remove the restriction in `SubqueryAlias` and `Alias`, but it's not good enough as we may forget to do the same thing for new logical plans/expressions in the future. It's also problematic if advanced users manipulate logical plans and expressions directly, when there is no `SubqueryAlias` and `Alias` to remove the restriction. To be safe, we should only apply this restriction when resolving join hidden columns, which means the plan node right above `Project(Join(using or natural join))`. This PR simply removes the restriction when a column is resolved from a sequence of `Attributes`, or from star expansion, and also when adding the `Project` hidden columns to its output. This makes sure that the qualified-access-only restriction will not be leaked to normal column resolution, but only metadata column resolution. ### Why are the changes needed? To make the join hidden column feature more robust ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #39596 from cloud-fan/join. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 9 ++++++--- .../spark/sql/catalyst/analysis/unresolved.scala | 6 ++++++ .../sql/catalyst/expressions/namedExpressions.scala | 7 ++----- .../spark/sql/catalyst/expressions/package.scala | 7 ++++++- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- .../catalyst/plans/logical/basicLogicalOperators.scala | 10 +--------- .../org/apache/spark/sql/catalyst/util/package.scala | 18 ++++++++++++------ 7 files changed, 34 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d6b68a45e77..ba2c2759e2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -194,7 +194,10 @@ class Analyzer(override val catalogManager: CatalogManager) override protected def isPlanIntegral( previousPlan: LogicalPlan, currentPlan: LogicalPlan): Boolean = { - !Utils.isTesting || LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) + import org.apache.spark.sql.catalyst.util._ + !Utils.isTesting || (LogicalPlanIntegrity.checkIfExprIdsAreGloballyUnique(currentPlan) && + (!LogicalPlanIntegrity.canGetOutputAttrs(currentPlan) || + !currentPlan.output.exists(_.qualifiedAccessOnly))) } override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts) @@ -984,7 +987,6 @@ class Analyzer(override val catalogManager: CatalogManager) * projecting away metadata columns prematurely. */ object AddMetadataColumns extends Rule[LogicalPlan] { - import org.apache.spark.sql.catalyst.util._ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning( @@ -1039,7 +1041,8 @@ class Analyzer(override val catalogManager: CatalogManager) s.withMetadataColumns() case p: Project if p.metadataOutput.exists(a => requiredAttrIds.contains(a.exprId)) => val newProj = p.copy( - projectList = p.projectList ++ p.metadataOutput, + // Do not leak the qualified-access-only restriction to normal plan outputs. + projectList = p.projectList ++ p.metadataOutput.map(_.markAsAllowAnyAccess()), child = addMetadataCol(p.child, requiredAttrIds)) newProj.copyTagsFrom(p) newProj diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 4a4028dc4c4..5e20f12747b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -425,6 +425,12 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu // If there is a table specified, use hidden input attributes as well val hiddenOutput = input.metadataOutput.filter(_.qualifiedAccessOnly) + // Remove the qualified-access-only restriction immediately. The expanded attributes will be + // put in a logical plan node and becomes normal attributes. They can still keep the special + // attribute metadata to indicate that they are from metadata columns, but they should not + // keep any restrictions that may break column resolution for normal attributes. + // See SPARK-42084 for more details. + .map(_.markAsAllowAnyAccess()) val expandedAttributes = (hiddenOutput ++ input.output).filter( matchedQualifier(_, target.get, resolver)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index ed820c80561..d18cfea1629 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.catalyst.trees.TreePattern import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY} import org.apache.spark.sql.types._ import org.apache.spark.util.collection.BitSet import org.apache.spark.util.collection.ImmutableBitSet @@ -190,10 +190,7 @@ case class Alias(child: Expression, name: String)( override def toAttribute: Attribute = { if (resolved) { - val a = AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier) - // Alias has its own qualifier. It doesn't make sense to still restrict the hidden columns - // of natural/using join to be accessed by qualified name only. - if (a.qualifiedAccessOnly) a.markAsAllowAnyAccess() else a + AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier) } else { UnresolvedAttribute.quoted(name) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 44813ac7b61..74f0875c285 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -337,7 +337,12 @@ package object expressions { } def name = UnresolvedAttribute(nameParts).name - prunedCandidates match { + // We may have resolved the attributes from metadata columns. The resolved attributes will be + // put in a logical plan node and becomes normal attributes. They can still keep the special + // attribute metadata to indicate that they are from metadata columns, but they should not + // keep any restrictions that may break column resolution for normal attributes. + // See SPARK-42084 for more details. + prunedCandidates.map(_.markAsAllowAnyAccess()) match { case Seq(a) if nestedFields.nonEmpty => // One match, but we also need to extract the requested nested field. // The foldLeft adds ExtractValues for every remaining parts of the identifier, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 7640d9234c7..d3df6f0dd98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -211,7 +211,7 @@ abstract class OrderPreservingUnaryNode extends UnaryNode { object LogicalPlanIntegrity { - private def canGetOutputAttrs(p: LogicalPlan): Boolean = { + def canGetOutputAttrs(p: LogicalPlan): Boolean = { p.resolved && !p.expressions.exists { e => e.exists { // We cannot call `output` in plans with a `ScalarSubquery` expr having no column, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 586e344df5e..343fa3517c6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1664,15 +1664,7 @@ case class SubqueryAlias( override def output: Seq[Attribute] = { val qualifierList = identifier.qualifier :+ alias - child.output.map { attr => - // `SubqueryAlias` sets a new qualifier for its output columns. It doesn't make sense to still - // restrict the hidden columns of natural/using join to be accessed by qualified name only. - if (attr.qualifiedAccessOnly) { - attr.markAsAllowAnyAccess().withQualifier(qualifierList) - } else { - attr.withQualifier(qualifierList) - } - } + child.output.map(_.withQualifier(qualifierList)) } override def metadataOutput: Seq[Attribute] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 1fcd3f7662b..6466afac619 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -213,11 +213,17 @@ package object util extends Logging { .build() ) - def markAsAllowAnyAccess(): Attribute = attr.withMetadata( - new MetadataBuilder() - .withMetadata(attr.metadata) - .remove(QUALIFIED_ACCESS_ONLY) - .build() - ) + def markAsAllowAnyAccess(): Attribute = { + if (qualifiedAccessOnly) { + attr.withMetadata( + new MetadataBuilder() + .withMetadata(attr.metadata) + .remove(QUALIFIED_ACCESS_ONLY) + .build() + ) + } else { + attr + } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org