This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6da60bf [SPARK-33362][SQL] skipSchemaResolution should still require query to be resolved 6da60bf is described below commit 6da60bfcc22dee9498ea3a27d6a9f3eedbf1a8f2 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Nov 5 09:23:41 2020 -0800 [SPARK-33362][SQL] skipSchemaResolution should still require query to be resolved ### What changes were proposed in this pull request? Fix a small bug in `V2WriteCommand.resolved`. It should always require the `table` and `query` to be resolved. ### Why are the changes needed? To prevent potential bugs that we skip resolve the input query. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? a new test Closes #30265 from cloud-fan/ds-minor-2. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 26ea417b1448d679fdc777705ee2f99f4e741ef3) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../sql/catalyst/plans/logical/v2Commands.scala | 26 ++++++++++++---------- .../analysis/DataSourceV2AnalysisSuite.scala | 9 ++++++++ 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2c9681b..2f30ba3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1363,7 +1363,8 @@ class Analyzer( case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) => a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) - case o: OverwriteByExpression if !o.outputResolved => + case o: OverwriteByExpression + if !(o.table.resolved && o.query.resolved && o.outputResolved) => // do not resolve expression attributes until the query attributes are resolved against the // table by ResolveOutputRelation. that rule will alias the attributes to the table's names. o diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index d2830a9..35874a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -35,20 +35,20 @@ trait V2WriteCommand extends Command { override def children: Seq[LogicalPlan] = Seq(query) - override lazy val resolved: Boolean = outputResolved + override lazy val resolved: Boolean = table.resolved && query.resolved && outputResolved def outputResolved: Boolean = { + assert(table.resolved && query.resolved, + "`outputResolved` can only be called when `table` and `query` are both resolved.") // If the table doesn't require schema match, we don't need to resolve the output columns. - table.skipSchemaResolution || { - table.resolved && query.resolved && query.output.size == table.output.size && - query.output.zip(table.output).forall { - case (inAttr, outAttr) => - // names and types must match, nullability must be compatible - inAttr.name == outAttr.name && - DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outAttr.dataType) && - (outAttr.nullable || !inAttr.nullable) - } - } + table.skipSchemaResolution || (query.output.size == table.output.size && + query.output.zip(table.output).forall { + case (inAttr, outAttr) => + // names and types must match, nullability must be compatible + inAttr.name == outAttr.name && + DataType.equalsIgnoreCompatibleNullability(inAttr.dataType, outAttr.dataType) && + (outAttr.nullable || !inAttr.nullable) + }) } } @@ -86,7 +86,9 @@ case class OverwriteByExpression( query: LogicalPlan, writeOptions: Map[String, String], isByName: Boolean) extends V2WriteCommand { - override lazy val resolved: Boolean = outputResolved && deleteExpr.resolved + override lazy val resolved: Boolean = { + table.resolved && query.resolved && outputResolved && deleteExpr.resolved + } } object OverwriteByExpression { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index e466d55..5aa2b98 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -230,6 +230,15 @@ abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest { def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan + test("skipSchemaResolution should still require query to be resolved") { + val table = TestRelationAcceptAnySchema(StructType(Seq( + StructField("a", FloatType), + StructField("b", DoubleType))).toAttributes) + val query = UnresolvedRelation(Seq("t")) + val parsedPlan = byName(table, query) + assertNotResolved(parsedPlan) + } + test("byName: basic behavior") { val query = TestRelation(table.schema.toAttributes) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org