[spark] branch branch-3.0 updated: [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query

2020-11-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1e2984b  [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve 
its delete condition based on the table relation not the input query
1e2984b is described below

commit 1e2984bc9525d38ddd31ade48f5caf83213fc853
Author: Wenchen Fan 
AuthorDate: Wed Nov 11 13:50:39 2020 +

[SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete 
condition based on the table relation not the input query

backport https://github.com/apache/spark/pull/30318 to 3.0

Closes #30328 from cloud-fan/backport.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   |  9 -
 .../spark/sql/catalyst/plans/logical/v2Commands.scala   |  3 ++-
 .../catalyst/analysis/DataSourceV2AnalysisSuite.scala   | 17 -
 3 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2f30ba3..7795d70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1363,11 +1363,10 @@ class Analyzer(
   case a @ Aggregate(groupingExprs, aggExprs, appendColumns: 
AppendColumns) =>
 a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
-  case o: OverwriteByExpression
-  if !(o.table.resolved && o.query.resolved && o.outputResolved) =>
-// do not resolve expression attributes until the query attributes are 
resolved against the
-// table by ResolveOutputRelation. that rule will alias the attributes 
to the table's names.
-o
+  case o: OverwriteByExpression if o.table.resolved =>
+// The delete condition of `OverwriteByExpression` will be passed to 
the table
+// implementation and should be resolved based on the table schema.
+o.copy(deleteExpr = resolveExpressionBottomUp(o.deleteExpr, o.table))
 
   case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
 if !m.resolved && targetTable.resolved && sourceTable.resolved =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 35874a9..9077f7a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, 
UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
ColumnChange}
@@ -89,6 +89,7 @@ case class OverwriteByExpression(
   override lazy val resolved: Boolean = {
 table.resolved && query.resolved && outputResolved && deleteExpr.resolved
   }
+  override def inputSet: AttributeSet = AttributeSet(table.output)
 }
 
 object OverwriteByExpression {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
index 5aa2b98..3810434 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -588,9 +588,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
 Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
 Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
 query),
-  LessThanOrEqual(
-AttributeReference("x", DoubleType, nullable = false)(x.exprId),
-Literal(15.0d)))
+  LessThanOrEqual(x, Literal(15.0d)))
 
 assertNotResolved(parsedPlan)
 checkAnalysis(parsedPlan, expectedPlan)
@@ -598,7 +596,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
   }
 
   protected def testNotResolvedOverwriteByExpression(): Unit = {
-

[spark] branch branch-3.0 updated: [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query

2020-11-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1e2984b  [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve 
its delete condition based on the table relation not the input query
1e2984b is described below

commit 1e2984bc9525d38ddd31ade48f5caf83213fc853
Author: Wenchen Fan 
AuthorDate: Wed Nov 11 13:50:39 2020 +

[SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete 
condition based on the table relation not the input query

backport https://github.com/apache/spark/pull/30318 to 3.0

Closes #30328 from cloud-fan/backport.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   |  9 -
 .../spark/sql/catalyst/plans/logical/v2Commands.scala   |  3 ++-
 .../catalyst/analysis/DataSourceV2AnalysisSuite.scala   | 17 -
 3 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2f30ba3..7795d70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1363,11 +1363,10 @@ class Analyzer(
   case a @ Aggregate(groupingExprs, aggExprs, appendColumns: 
AppendColumns) =>
 a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
-  case o: OverwriteByExpression
-  if !(o.table.resolved && o.query.resolved && o.outputResolved) =>
-// do not resolve expression attributes until the query attributes are 
resolved against the
-// table by ResolveOutputRelation. that rule will alias the attributes 
to the table's names.
-o
+  case o: OverwriteByExpression if o.table.resolved =>
+// The delete condition of `OverwriteByExpression` will be passed to 
the table
+// implementation and should be resolved based on the table schema.
+o.copy(deleteExpr = resolveExpressionBottomUp(o.deleteExpr, o.table))
 
   case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
 if !m.resolved && targetTable.resolved && sourceTable.resolved =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 35874a9..9077f7a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, 
UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
ColumnChange}
@@ -89,6 +89,7 @@ case class OverwriteByExpression(
   override lazy val resolved: Boolean = {
 table.resolved && query.resolved && outputResolved && deleteExpr.resolved
   }
+  override def inputSet: AttributeSet = AttributeSet(table.output)
 }
 
 object OverwriteByExpression {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
index 5aa2b98..3810434 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -588,9 +588,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
 Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
 Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
 query),
-  LessThanOrEqual(
-AttributeReference("x", DoubleType, nullable = false)(x.exprId),
-Literal(15.0d)))
+  LessThanOrEqual(x, Literal(15.0d)))
 
 assertNotResolved(parsedPlan)
 checkAnalysis(parsedPlan, expectedPlan)
@@ -598,7 +596,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
   }
 
   protected def testNotResolvedOverwriteByExpression(): Unit = {
-

[spark] branch branch-3.0 updated: [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query

2020-11-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1e2984b  [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve 
its delete condition based on the table relation not the input query
1e2984b is described below

commit 1e2984bc9525d38ddd31ade48f5caf83213fc853
Author: Wenchen Fan 
AuthorDate: Wed Nov 11 13:50:39 2020 +

[SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete 
condition based on the table relation not the input query

backport https://github.com/apache/spark/pull/30318 to 3.0

Closes #30328 from cloud-fan/backport.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   |  9 -
 .../spark/sql/catalyst/plans/logical/v2Commands.scala   |  3 ++-
 .../catalyst/analysis/DataSourceV2AnalysisSuite.scala   | 17 -
 3 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2f30ba3..7795d70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1363,11 +1363,10 @@ class Analyzer(
   case a @ Aggregate(groupingExprs, aggExprs, appendColumns: 
AppendColumns) =>
 a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
-  case o: OverwriteByExpression
-  if !(o.table.resolved && o.query.resolved && o.outputResolved) =>
-// do not resolve expression attributes until the query attributes are 
resolved against the
-// table by ResolveOutputRelation. that rule will alias the attributes 
to the table's names.
-o
+  case o: OverwriteByExpression if o.table.resolved =>
+// The delete condition of `OverwriteByExpression` will be passed to 
the table
+// implementation and should be resolved based on the table schema.
+o.copy(deleteExpr = resolveExpressionBottomUp(o.deleteExpr, o.table))
 
   case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
 if !m.resolved && targetTable.resolved && sourceTable.resolved =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 35874a9..9077f7a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, 
UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
ColumnChange}
@@ -89,6 +89,7 @@ case class OverwriteByExpression(
   override lazy val resolved: Boolean = {
 table.resolved && query.resolved && outputResolved && deleteExpr.resolved
   }
+  override def inputSet: AttributeSet = AttributeSet(table.output)
 }
 
 object OverwriteByExpression {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
index 5aa2b98..3810434 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -588,9 +588,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
 Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
 Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
 query),
-  LessThanOrEqual(
-AttributeReference("x", DoubleType, nullable = false)(x.exprId),
-Literal(15.0d)))
+  LessThanOrEqual(x, Literal(15.0d)))
 
 assertNotResolved(parsedPlan)
 checkAnalysis(parsedPlan, expectedPlan)
@@ -598,7 +596,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
   }
 
   protected def testNotResolvedOverwriteByExpression(): Unit = {
-

[spark] branch branch-3.0 updated: [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query

2020-11-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1e2984b  [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve 
its delete condition based on the table relation not the input query
1e2984b is described below

commit 1e2984bc9525d38ddd31ade48f5caf83213fc853
Author: Wenchen Fan 
AuthorDate: Wed Nov 11 13:50:39 2020 +

[SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete 
condition based on the table relation not the input query

backport https://github.com/apache/spark/pull/30318 to 3.0

Closes #30328 from cloud-fan/backport.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   |  9 -
 .../spark/sql/catalyst/plans/logical/v2Commands.scala   |  3 ++-
 .../catalyst/analysis/DataSourceV2AnalysisSuite.scala   | 17 -
 3 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2f30ba3..7795d70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1363,11 +1363,10 @@ class Analyzer(
   case a @ Aggregate(groupingExprs, aggExprs, appendColumns: 
AppendColumns) =>
 a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
-  case o: OverwriteByExpression
-  if !(o.table.resolved && o.query.resolved && o.outputResolved) =>
-// do not resolve expression attributes until the query attributes are 
resolved against the
-// table by ResolveOutputRelation. that rule will alias the attributes 
to the table's names.
-o
+  case o: OverwriteByExpression if o.table.resolved =>
+// The delete condition of `OverwriteByExpression` will be passed to 
the table
+// implementation and should be resolved based on the table schema.
+o.copy(deleteExpr = resolveExpressionBottomUp(o.deleteExpr, o.table))
 
   case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
 if !m.resolved && targetTable.resolved && sourceTable.resolved =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 35874a9..9077f7a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, 
UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
ColumnChange}
@@ -89,6 +89,7 @@ case class OverwriteByExpression(
   override lazy val resolved: Boolean = {
 table.resolved && query.resolved && outputResolved && deleteExpr.resolved
   }
+  override def inputSet: AttributeSet = AttributeSet(table.output)
 }
 
 object OverwriteByExpression {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
index 5aa2b98..3810434 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -588,9 +588,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
 Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
 Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
 query),
-  LessThanOrEqual(
-AttributeReference("x", DoubleType, nullable = false)(x.exprId),
-Literal(15.0d)))
+  LessThanOrEqual(x, Literal(15.0d)))
 
 assertNotResolved(parsedPlan)
 checkAnalysis(parsedPlan, expectedPlan)
@@ -598,7 +596,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
   }
 
   protected def testNotResolvedOverwriteByExpression(): Unit = {
-

[spark] branch branch-3.0 updated: [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete condition based on the table relation not the input query

2020-11-11 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1e2984b  [SPARK-33412][SQL][3.0] OverwriteByExpression should resolve 
its delete condition based on the table relation not the input query
1e2984b is described below

commit 1e2984bc9525d38ddd31ade48f5caf83213fc853
Author: Wenchen Fan 
AuthorDate: Wed Nov 11 13:50:39 2020 +

[SPARK-33412][SQL][3.0] OverwriteByExpression should resolve its delete 
condition based on the table relation not the input query

backport https://github.com/apache/spark/pull/30318 to 3.0

Closes #30328 from cloud-fan/backport.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   |  9 -
 .../spark/sql/catalyst/plans/logical/v2Commands.scala   |  3 ++-
 .../catalyst/analysis/DataSourceV2AnalysisSuite.scala   | 17 -
 3 files changed, 18 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2f30ba3..7795d70 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1363,11 +1363,10 @@ class Analyzer(
   case a @ Aggregate(groupingExprs, aggExprs, appendColumns: 
AppendColumns) =>
 a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
-  case o: OverwriteByExpression
-  if !(o.table.resolved && o.query.resolved && o.outputResolved) =>
-// do not resolve expression attributes until the query attributes are 
resolved against the
-// table by ResolveOutputRelation. that rule will alias the attributes 
to the table's names.
-o
+  case o: OverwriteByExpression if o.table.resolved =>
+// The delete condition of `OverwriteByExpression` will be passed to 
the table
+// implementation and should be resolved based on the table schema.
+o.copy(deleteExpr = resolveExpressionBottomUp(o.deleteExpr, o.table))
 
   case m @ MergeIntoTable(targetTable, sourceTable, _, _, _)
 if !m.resolved && targetTable.resolved && sourceTable.resolved =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 35874a9..9077f7a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, 
UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, 
ColumnChange}
@@ -89,6 +89,7 @@ case class OverwriteByExpression(
   override lazy val resolved: Boolean = {
 table.resolved && query.resolved && outputResolved && deleteExpr.resolved
   }
+  override def inputSet: AttributeSet = AttributeSet(table.output)
 }
 
 object OverwriteByExpression {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
index 5aa2b98..3810434 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -588,9 +588,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
 Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
 Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
 query),
-  LessThanOrEqual(
-AttributeReference("x", DoubleType, nullable = false)(x.exprId),
-Literal(15.0d)))
+  LessThanOrEqual(x, Literal(15.0d)))
 
 assertNotResolved(parsedPlan)
 checkAnalysis(parsedPlan, expectedPlan)
@@ -598,7 +596,7 @@ abstract class DataSourceV2AnalysisBaseSuite extends 
AnalysisTest {
   }
 
   protected def testNotResolvedOverwriteByExpression(): Unit = {
-