This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1e2984b [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query 1e2984b is described below commit 1e2984bc9525d38ddd31ade48f5caf83213fc853 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed Nov 11 13:50:39 2020 +0000 [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query backport https://github.com/apache/spark/pull/30318 to 3.0 Closes #30328 from cloud-fan/backport. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 9 ++++----- .../spark/sql/catalyst/plans/logical/v2Commands.scala | 3 ++- .../catalyst/analysis/DataSourceV2AnalysisSuite.scala | 17 ++++++++++++----- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f30ba3..7795d70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1363,11 +1363,10 @@ class Analyzer( case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) - case o: OverwriteByExpression - if !(o.table.resolved && o.query.resolved && o.outputResolved) => - // do not resolve expression attributes until the query attributes are resolved against the - // table by ResolveOutputRelation. that rule will alias the attributes to the table's names. - o + case o: OverwriteByExpression if o.table.resolved => + // The delete condition of `OverwriteByExpression` will be passed to the table + // implementation and should be resolved based on the table schema. + o.copy(deleteExpr = resolveExpressionBottomUp(o.deleteExpr, o.table)) case m @ MergeIntoTable(targetTable, sourceTable, _, _, _) if !m.resolved && targetTable.resolved && sourceTable.resolved => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 35874a9..9077f7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.analysis.{NamedRelation, UnresolvedException} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable} import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange} @@ -89,6 +89,7 @@ case class OverwriteByExpression( override lazy val resolved: Boolean = { table.resolved && query.resolved && outputResolved && deleteExpr.resolved } + override def inputSet: AttributeSet = AttributeSet(table.output) } object OverwriteByExpression { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 5aa2b98..3810434 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -588,9 +588,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest { Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(), Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()), query), - LessThanOrEqual( - AttributeReference("x", DoubleType, nullable = false)(x.exprId), - Literal(15.0d))) + LessThanOrEqual(x, Literal(15.0d))) assertNotResolved(parsedPlan) checkAnalysis(parsedPlan, expectedPlan) @@ -598,7 +596,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest { } protected def testNotResolvedOverwriteByExpression(): Unit = { - val xRequiredTable = TestRelation(StructType(Seq( + val table = TestRelation(StructType(Seq( StructField("x", DoubleType, nullable = false), StructField("y", DoubleType))).toAttributes) @@ -607,10 +605,19 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest { StructField("b", DoubleType))).toAttributes) // the write is resolved (checked above). this test plan is not because of the expression. - val parsedPlan = OverwriteByExpression.byPosition(xRequiredTable, query, + val parsedPlan = OverwriteByExpression.byPosition(table, query, LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d))) assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq("cannot resolve", "`a`", "given input columns", "x, y")) + + val tableAcceptAnySchema = TestRelationAcceptAnySchema(StructType(Seq( + StructField("x", DoubleType, nullable = false), + StructField("y", DoubleType))).toAttributes) + + val parsedPlan2 = OverwriteByExpression.byPosition(tableAcceptAnySchema, query, + LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d))) + assertNotResolved(parsedPlan2) + assertAnalysisError(parsedPlan2, Seq("cannot resolve", "`a`", "given input columns", "x, y")) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org