This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 60caa92  [SPARK-26666][SQL] Support DSv2 overwrite and dynamic 
partition overwrite.
60caa92 is described below

commit 60caa92deaf6941f58da82dcc0962ebf3a598ced
Author: Ryan Blue <b...@apache.org>
AuthorDate: Mon Feb 18 13:16:28 2019 +0800

    [SPARK-26666][SQL] Support DSv2 overwrite and dynamic partition overwrite.
    
    ## What changes were proposed in this pull request?
    
    This adds two logical plans that implement the ReplaceData operation from 
the [logical plans 
SPIP](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5a987801#heading=h.m45webtwxf2d).
 These two plans will be used to implement Spark's `INSERT OVERWRITE` behavior 
for v2.
    
    Specific changes:
    * Add `SupportsTruncate`, `SupportsOverwrite`, and 
`SupportsDynamicOverwrite` to DSv2 write API
    * Add `OverwriteByExpression` and `OverwritePartitionsDynamic` plans 
(logical and physical)
    * Add new plans to DSv2 write validation rule `ResolveOutputRelation`
    * Refactor `WriteToDataSourceV2Exec` into trait used by all DSv2 write exec 
nodes
    
    ## How was this patch tested?
    
    * The v2 analysis suite has been updated to validate the new overwrite plans
    * The analysis suite for `OverwriteByExpression` checks that the delete 
expression is resolved using the table's columns
    * Existing tests validate that overwrite exec plan works
    * Updated existing v2 test because schema is used to validate overwrite
    
    Closes #23606 from rdblue/SPARK-26666-add-overwrite.
    
    Authored-by: Ryan Blue <b...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  27 ++-
 .../plans/logical/basicLogicalOperators.scala      |  69 +++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |   2 +-
 .../analysis/DataSourceV2AnalysisSuite.scala       | 191 +++++++++++++++------
 .../sources/v2/reader/SupportsPushDownFilters.java |   3 +
 .../v2/writer/SupportsDynamicOverwrite.java        |  37 ++++
 .../sql/sources/v2/writer/SupportsOverwrite.java   |  45 +++++
 .../sql/sources/v2/writer/SupportsTruncate.java    |  32 ++++
 .../org/apache/spark/sql/DataFrameWriter.scala     |  54 +++---
 .../execution/datasources/DataSourceStrategy.scala |   6 +
 .../datasources/v2/DataSourceV2Implicits.scala     |  49 ++++++
 .../datasources/v2/DataSourceV2Relation.scala      |  24 +--
 .../datasources/v2/DataSourceV2Strategy.scala      |  35 ++--
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 135 ++++++++++++++-
 .../org/apache/spark/sql/sources/filters.scala     |  26 ++-
 .../spark/sql/sources/v2/DataSourceV2Suite.scala   |   8 +-
 16 files changed, 613 insertions(+), 130 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 793c337..42904c5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -978,6 +978,11 @@ class Analyzer(
       case a @ Aggregate(groupingExprs, aggExprs, appendColumns: 
AppendColumns) =>
         a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
+      case o: OverwriteByExpression if !o.outputResolved =>
+        // do not resolve expression attributes until the query attributes are 
resolved against the
+        // table by ResolveOutputRelation. that rule will alias the attributes 
to the table's names.
+        o
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve 
${q.simpleString(SQLConf.get.maxToStringFields)}")
         q.mapExpressions(resolveExpressionTopDown(_, q))
@@ -2246,7 +2251,7 @@ class Analyzer(
   object ResolveOutputRelation extends Rule[LogicalPlan] {
     override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators 
{
       case append @ AppendData(table, query, isByName)
-          if table.resolved && query.resolved && !append.resolved =>
+          if table.resolved && query.resolved && !append.outputResolved =>
         val projection = resolveOutputColumns(table.name, table.output, query, 
isByName)
 
         if (projection != query) {
@@ -2254,6 +2259,26 @@ class Analyzer(
         } else {
           append
         }
+
+      case overwrite @ OverwriteByExpression(table, _, query, isByName)
+          if table.resolved && query.resolved && !overwrite.outputResolved =>
+        val projection = resolveOutputColumns(table.name, table.output, query, 
isByName)
+
+        if (projection != query) {
+          overwrite.copy(query = projection)
+        } else {
+          overwrite
+        }
+
+      case overwrite @ OverwritePartitionsDynamic(table, query, isByName)
+          if table.resolved && query.resolved && !overwrite.outputResolved =>
+        val projection = resolveOutputColumns(table.name, table.output, query, 
isByName)
+
+        if (projection != query) {
+          overwrite.copy(query = projection)
+        } else {
+          overwrite
+        }
     }
 
     def resolveOutputColumns(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 639d68f..f7f701c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -365,16 +365,17 @@ case class Join(
 }
 
 /**
- * Append data to an existing table.
+ * Base trait for DataSourceV2 write commands
  */
-case class AppendData(
-    table: NamedRelation,
-    query: LogicalPlan,
-    isByName: Boolean) extends LogicalPlan {
+trait V2WriteCommand extends Command {
+  def table: NamedRelation
+  def query: LogicalPlan
+
   override def children: Seq[LogicalPlan] = Seq(query)
-  override def output: Seq[Attribute] = Seq.empty
 
-  override lazy val resolved: Boolean = {
+  override lazy val resolved: Boolean = outputResolved
+
+  def outputResolved: Boolean = {
     table.resolved && query.resolved && query.output.size == table.output.size 
&&
         query.output.zip(table.output).forall {
           case (inAttr, outAttr) =>
@@ -386,17 +387,67 @@ case class AppendData(
   }
 }
 
+/**
+ * Append data to an existing table.
+ */
+case class AppendData(
+    table: NamedRelation,
+    query: LogicalPlan,
+    isByName: Boolean) extends V2WriteCommand
+
 object AppendData {
   def byName(table: NamedRelation, df: LogicalPlan): AppendData = {
-    new AppendData(table, df, true)
+    new AppendData(table, df, isByName = true)
   }
 
   def byPosition(table: NamedRelation, query: LogicalPlan): AppendData = {
-    new AppendData(table, query, false)
+    new AppendData(table, query, isByName = false)
   }
 }
 
 /**
+ * Overwrite data matching a filter in an existing table.
+ */
+case class OverwriteByExpression(
+    table: NamedRelation,
+    deleteExpr: Expression,
+    query: LogicalPlan,
+    isByName: Boolean) extends V2WriteCommand {
+  override lazy val resolved: Boolean = outputResolved && deleteExpr.resolved
+}
+
+object OverwriteByExpression {
+  def byName(
+      table: NamedRelation, df: LogicalPlan, deleteExpr: Expression): 
OverwriteByExpression = {
+    OverwriteByExpression(table, deleteExpr, df, isByName = true)
+  }
+
+  def byPosition(
+      table: NamedRelation, query: LogicalPlan, deleteExpr: Expression): 
OverwriteByExpression = {
+    OverwriteByExpression(table, deleteExpr, query, isByName = false)
+  }
+}
+
+/**
+ * Dynamically overwrite partitions in an existing table.
+ */
+case class OverwritePartitionsDynamic(
+    table: NamedRelation,
+    query: LogicalPlan,
+    isByName: Boolean) extends V2WriteCommand
+
+object OverwritePartitionsDynamic {
+  def byName(table: NamedRelation, df: LogicalPlan): 
OverwritePartitionsDynamic = {
+    OverwritePartitionsDynamic(table, df, isByName = true)
+  }
+
+  def byPosition(table: NamedRelation, query: LogicalPlan): 
OverwritePartitionsDynamic = {
+    OverwritePartitionsDynamic(table, query, isByName = false)
+  }
+}
+
+
+/**
  * Insert some data into a table. Note that this plan is unresolved and has to 
be replaced by the
  * concrete implementations during analysis.
  *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d285e00..0b7b67e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1452,7 +1452,7 @@ object SQLConf {
       " register class names for which data source V2 write paths are 
disabled. Writes from these" +
       " sources will fall back to the V1 sources.")
     .stringConf
-    .createWithDefault("")
+    .createWithDefault("orc")
 
   val DISABLED_V2_STREAMING_WRITERS = 
buildConf("spark.sql.streaming.disabledV2Writers")
     .doc("A comma-separated list of fully qualified data source register class 
names for which" +
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
index 6c899b6..0c48548 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala
@@ -19,15 +19,92 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast, UpCast}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LeafNode, 
LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast, Expression, LessThanOrEqual, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LeafNode, 
LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Project}
 import org.apache.spark.sql.types.{DoubleType, FloatType, StructField, 
StructType}
 
+class V2AppendDataAnalysisSuite extends DataSourceV2AnalysisSuite {
+  override def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan = 
{
+    AppendData.byName(table, query)
+  }
+
+  override def byPosition(table: NamedRelation, query: LogicalPlan): 
LogicalPlan = {
+    AppendData.byPosition(table, query)
+  }
+}
+
+class V2OverwritePartitionsDynamicAnalysisSuite extends 
DataSourceV2AnalysisSuite {
+  override def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan = 
{
+    OverwritePartitionsDynamic.byName(table, query)
+  }
+
+  override def byPosition(table: NamedRelation, query: LogicalPlan): 
LogicalPlan = {
+    OverwritePartitionsDynamic.byPosition(table, query)
+  }
+}
+
+class V2OverwriteByExpressionAnalysisSuite extends DataSourceV2AnalysisSuite {
+  override def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan = 
{
+    OverwriteByExpression.byName(table, query, Literal(true))
+  }
+
+  override def byPosition(table: NamedRelation, query: LogicalPlan): 
LogicalPlan = {
+    OverwriteByExpression.byPosition(table, query, Literal(true))
+  }
+
+  test("delete expression is resolved using table fields") {
+    val table = TestRelation(StructType(Seq(
+      StructField("x", DoubleType, nullable = false),
+      StructField("y", DoubleType))).toAttributes)
+
+    val query = TestRelation(StructType(Seq(
+      StructField("a", DoubleType, nullable = false),
+      StructField("b", DoubleType))).toAttributes)
+
+    val a = query.output.head
+    val b = query.output.last
+    val x = table.output.head
+
+    val parsedPlan = OverwriteByExpression.byPosition(table, query,
+      LessThanOrEqual(UnresolvedAttribute(Seq("x")), Literal(15.0d)))
+
+    val expectedPlan = OverwriteByExpression.byPosition(table,
+      Project(Seq(
+        Alias(Cast(a, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
+        Alias(Cast(b, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
+        query),
+      LessThanOrEqual(
+        AttributeReference("x", DoubleType, nullable = false)(x.exprId),
+        Literal(15.0d)))
+
+    assertNotResolved(parsedPlan)
+    checkAnalysis(parsedPlan, expectedPlan)
+    assertResolved(expectedPlan)
+  }
+
+  test("delete expression is not resolved using query fields") {
+    val xRequiredTable = TestRelation(StructType(Seq(
+      StructField("x", DoubleType, nullable = false),
+      StructField("y", DoubleType))).toAttributes)
+
+    val query = TestRelation(StructType(Seq(
+      StructField("a", DoubleType, nullable = false),
+      StructField("b", DoubleType))).toAttributes)
+
+    // the write is resolved (checked above). this test plan is not because of 
the expression.
+    val parsedPlan = OverwriteByExpression.byPosition(xRequiredTable, query,
+      LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
+
+    assertNotResolved(parsedPlan)
+    assertAnalysisError(parsedPlan, Seq("cannot resolve", "`a`", "given input 
columns", "x, y"))
+  }
+}
+
 case class TestRelation(output: Seq[AttributeReference]) extends LeafNode with 
NamedRelation {
   override def name: String = "table-name"
 }
 
-class DataSourceV2AnalysisSuite extends AnalysisTest {
+abstract class DataSourceV2AnalysisSuite extends AnalysisTest {
   val table = TestRelation(StructType(Seq(
     StructField("x", FloatType),
     StructField("y", FloatType))).toAttributes)
@@ -40,21 +117,25 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     StructField("x", DoubleType),
     StructField("y", DoubleType))).toAttributes)
 
-  test("Append.byName: basic behavior") {
+  def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan
+
+  def byPosition(table: NamedRelation, query: LogicalPlan): LogicalPlan
+
+  test("byName: basic behavior") {
     val query = TestRelation(table.schema.toAttributes)
 
-    val parsedPlan = AppendData.byName(table, query)
+    val parsedPlan = byName(table, query)
 
     checkAnalysis(parsedPlan, parsedPlan)
     assertResolved(parsedPlan)
   }
 
-  test("Append.byName: does not match by position") {
+  test("byName: does not match by position") {
     val query = TestRelation(StructType(Seq(
       StructField("a", FloatType),
       StructField("b", FloatType))).toAttributes)
 
-    val parsedPlan = AppendData.byName(table, query)
+    val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -62,12 +143,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Cannot find data for output column", "'x'", "'y'"))
   }
 
-  test("Append.byName: case sensitive column resolution") {
+  test("byName: case sensitive column resolution") {
     val query = TestRelation(StructType(Seq(
       StructField("X", FloatType), // doesn't match case!
       StructField("y", FloatType))).toAttributes)
 
-    val parsedPlan = AppendData.byName(table, query)
+    val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -76,7 +157,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       caseSensitive = true)
   }
 
-  test("Append.byName: case insensitive column resolution") {
+  test("byName: case insensitive column resolution") {
     val query = TestRelation(StructType(Seq(
       StructField("X", FloatType), // doesn't match case!
       StructField("y", FloatType))).toAttributes)
@@ -84,8 +165,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     val X = query.output.head
     val y = query.output.last
 
-    val parsedPlan = AppendData.byName(table, query)
-    val expectedPlan = AppendData.byName(table,
+    val parsedPlan = byName(table, query)
+    val expectedPlan = byName(table,
       Project(Seq(
         Alias(Cast(toLower(X), FloatType, Some(conf.sessionLocalTimeZone)), 
"x")(),
         Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
@@ -96,7 +177,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     assertResolved(expectedPlan)
   }
 
-  test("Append.byName: data columns are reordered by name") {
+  test("byName: data columns are reordered by name") {
     // out of order
     val query = TestRelation(StructType(Seq(
       StructField("y", FloatType),
@@ -105,8 +186,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     val y = query.output.head
     val x = query.output.last
 
-    val parsedPlan = AppendData.byName(table, query)
-    val expectedPlan = AppendData.byName(table,
+    val parsedPlan = byName(table, query)
+    val expectedPlan = byName(table,
       Project(Seq(
         Alias(Cast(x, FloatType, Some(conf.sessionLocalTimeZone)), "x")(),
         Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
@@ -117,26 +198,26 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     assertResolved(expectedPlan)
   }
 
-  test("Append.byName: fail nullable data written to required columns") {
-    val parsedPlan = AppendData.byName(requiredTable, table)
+  test("byName: fail nullable data written to required columns") {
+    val parsedPlan = byName(requiredTable, table)
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
       "Cannot write incompatible data to table", "'table-name'",
       "Cannot write nullable values to non-null column", "'x'", "'y'"))
   }
 
-  test("Append.byName: allow required data written to nullable columns") {
-    val parsedPlan = AppendData.byName(table, requiredTable)
+  test("byName: allow required data written to nullable columns") {
+    val parsedPlan = byName(table, requiredTable)
     assertResolved(parsedPlan)
     checkAnalysis(parsedPlan, parsedPlan)
   }
 
-  test("Append.byName: missing required columns cause failure and are 
identified by name") {
+  test("byName: missing required columns cause failure and are identified by 
name") {
     // missing required field x
     val query = TestRelation(StructType(Seq(
       StructField("y", FloatType, nullable = false))).toAttributes)
 
-    val parsedPlan = AppendData.byName(requiredTable, query)
+    val parsedPlan = byName(requiredTable, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -144,12 +225,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Cannot find data for output column", "'x'"))
   }
 
-  test("Append.byName: missing optional columns cause failure and are 
identified by name") {
+  test("byName: missing optional columns cause failure and are identified by 
name") {
     // missing optional field x
     val query = TestRelation(StructType(Seq(
       StructField("y", FloatType))).toAttributes)
 
-    val parsedPlan = AppendData.byName(table, query)
+    val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -157,8 +238,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Cannot find data for output column", "'x'"))
   }
 
-  test("Append.byName: fail canWrite check") {
-    val parsedPlan = AppendData.byName(table, widerTable)
+  test("byName: fail canWrite check") {
+    val parsedPlan = byName(table, widerTable)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -166,12 +247,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType"))
   }
 
-  test("Append.byName: insert safe cast") {
+  test("byName: insert safe cast") {
     val x = table.output.head
     val y = table.output.last
 
-    val parsedPlan = AppendData.byName(widerTable, table)
-    val expectedPlan = AppendData.byName(widerTable,
+    val parsedPlan = byName(widerTable, table)
+    val expectedPlan = byName(widerTable,
       Project(Seq(
         Alias(Cast(x, DoubleType, Some(conf.sessionLocalTimeZone)), "x")(),
         Alias(Cast(y, DoubleType, Some(conf.sessionLocalTimeZone)), "y")()),
@@ -182,13 +263,13 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     assertResolved(expectedPlan)
   }
 
-  test("Append.byName: fail extra data fields") {
+  test("byName: fail extra data fields") {
     val query = TestRelation(StructType(Seq(
       StructField("x", FloatType),
       StructField("y", FloatType),
       StructField("z", FloatType))).toAttributes)
 
-    val parsedPlan = AppendData.byName(table, query)
+    val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -197,7 +278,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Data columns: 'x', 'y', 'z'"))
   }
 
-  test("Append.byName: multiple field errors are reported") {
+  test("byName: multiple field errors are reported") {
     val xRequiredTable = TestRelation(StructType(Seq(
       StructField("x", FloatType, nullable = false),
       StructField("y", DoubleType))).toAttributes)
@@ -206,7 +287,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       StructField("x", DoubleType),
       StructField("b", FloatType))).toAttributes)
 
-    val parsedPlan = AppendData.byName(xRequiredTable, query)
+    val parsedPlan = byName(xRequiredTable, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -216,7 +297,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Cannot find data for output column", "'y'"))
   }
 
-  test("Append.byPosition: basic behavior") {
+  test("byPosition: basic behavior") {
     val query = TestRelation(StructType(Seq(
       StructField("a", FloatType),
       StructField("b", FloatType))).toAttributes)
@@ -224,8 +305,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     val a = query.output.head
     val b = query.output.last
 
-    val parsedPlan = AppendData.byPosition(table, query)
-    val expectedPlan = AppendData.byPosition(table,
+    val parsedPlan = byPosition(table, query)
+    val expectedPlan = byPosition(table,
       Project(Seq(
         Alias(Cast(a, FloatType, Some(conf.sessionLocalTimeZone)), "x")(),
         Alias(Cast(b, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
@@ -236,7 +317,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     assertResolved(expectedPlan)
   }
 
-  test("Append.byPosition: data columns are not reordered") {
+  test("byPosition: data columns are not reordered") {
     // out of order
     val query = TestRelation(StructType(Seq(
       StructField("y", FloatType),
@@ -245,8 +326,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     val y = query.output.head
     val x = query.output.last
 
-    val parsedPlan = AppendData.byPosition(table, query)
-    val expectedPlan = AppendData.byPosition(table,
+    val parsedPlan = byPosition(table, query)
+    val expectedPlan = byPosition(table,
       Project(Seq(
         Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "x")(),
         Alias(Cast(x, FloatType, Some(conf.sessionLocalTimeZone)), "y")()),
@@ -257,26 +338,26 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     assertResolved(expectedPlan)
   }
 
-  test("Append.byPosition: fail nullable data written to required columns") {
-    val parsedPlan = AppendData.byPosition(requiredTable, table)
+  test("byPosition: fail nullable data written to required columns") {
+    val parsedPlan = byPosition(requiredTable, table)
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
       "Cannot write incompatible data to table", "'table-name'",
       "Cannot write nullable values to non-null column", "'x'", "'y'"))
   }
 
-  test("Append.byPosition: allow required data written to nullable columns") {
-    val parsedPlan = AppendData.byPosition(table, requiredTable)
+  test("byPosition: allow required data written to nullable columns") {
+    val parsedPlan = byPosition(table, requiredTable)
     assertResolved(parsedPlan)
     checkAnalysis(parsedPlan, parsedPlan)
   }
 
-  test("Append.byPosition: missing required columns cause failure") {
+  test("byPosition: missing required columns cause failure") {
     // missing optional field x
     val query = TestRelation(StructType(Seq(
       StructField("y", FloatType, nullable = false))).toAttributes)
 
-    val parsedPlan = AppendData.byPosition(requiredTable, query)
+    val parsedPlan = byPosition(requiredTable, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -285,12 +366,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Data columns: 'y'"))
   }
 
-  test("Append.byPosition: missing optional columns cause failure") {
+  test("byPosition: missing optional columns cause failure") {
     // missing optional field x
     val query = TestRelation(StructType(Seq(
       StructField("y", FloatType))).toAttributes)
 
-    val parsedPlan = AppendData.byPosition(table, query)
+    val parsedPlan = byPosition(table, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -299,12 +380,12 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Data columns: 'y'"))
   }
 
-  test("Append.byPosition: fail canWrite check") {
+  test("byPosition: fail canWrite check") {
     val widerTable = TestRelation(StructType(Seq(
       StructField("a", DoubleType),
       StructField("b", DoubleType))).toAttributes)
 
-    val parsedPlan = AppendData.byPosition(table, widerTable)
+    val parsedPlan = byPosition(table, widerTable)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -312,7 +393,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType"))
   }
 
-  test("Append.byPosition: insert safe cast") {
+  test("byPosition: insert safe cast") {
     val widerTable = TestRelation(StructType(Seq(
       StructField("a", DoubleType),
       StructField("b", DoubleType))).toAttributes)
@@ -320,8 +401,8 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     val x = table.output.head
     val y = table.output.last
 
-    val parsedPlan = AppendData.byPosition(widerTable, table)
-    val expectedPlan = AppendData.byPosition(widerTable,
+    val parsedPlan = byPosition(widerTable, table)
+    val expectedPlan = byPosition(widerTable,
       Project(Seq(
         Alias(Cast(x, DoubleType, Some(conf.sessionLocalTimeZone)), "a")(),
         Alias(Cast(y, DoubleType, Some(conf.sessionLocalTimeZone)), "b")()),
@@ -332,13 +413,13 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
     assertResolved(expectedPlan)
   }
 
-  test("Append.byPosition: fail extra data fields") {
+  test("byPosition: fail extra data fields") {
     val query = TestRelation(StructType(Seq(
       StructField("a", FloatType),
       StructField("b", FloatType),
       StructField("c", FloatType))).toAttributes)
 
-    val parsedPlan = AppendData.byName(table, query)
+    val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
@@ -347,7 +428,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       "Data columns: 'a', 'b', 'c'"))
   }
 
-  test("Append.byPosition: multiple field errors are reported") {
+  test("byPosition: multiple field errors are reported") {
     val xRequiredTable = TestRelation(StructType(Seq(
       StructField("x", FloatType, nullable = false),
       StructField("y", DoubleType))).toAttributes)
@@ -356,7 +437,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest {
       StructField("x", DoubleType),
       StructField("b", FloatType))).toAttributes)
 
-    val parsedPlan = AppendData.byPosition(xRequiredTable, query)
+    val parsedPlan = byPosition(xRequiredTable, query)
 
     assertNotResolved(parsedPlan)
     assertAnalysisError(parsedPlan, Seq(
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
index 296d3e4..f10fd88 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
@@ -29,6 +29,9 @@ public interface SupportsPushDownFilters extends ScanBuilder {
 
   /**
    * Pushes down filters, and returns filters that need to be evaluated after 
scanning.
+   * <p>
+   * Rows should be returned from the data source if and only if all of the 
filters match. That is,
+   * filters must be interpreted as ANDed together.
    */
   Filter[] pushFilters(Filter[] filters);
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsDynamicOverwrite.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsDynamicOverwrite.java
new file mode 100644
index 0000000..8058964
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsDynamicOverwrite.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+/**
+ * Write builder trait for tables that support dynamic partition overwrite.
+ * <p>
+ * A write that dynamically overwrites partitions removes all existing data in 
each logical
+ * partition for which the write will commit new data. Any existing logical 
partition for which the
+ * write does not contain data will remain unchanged.
+ * <p>
+ * This is provided to implement SQL compatible with Hive table operations but 
is not recommended.
+ * Instead, use the {@link SupportsOverwrite overwrite by filter API} to 
explicitly replace data.
+ */
+public interface SupportsDynamicOverwrite extends WriteBuilder {
+  /**
+   * Configures a write to dynamically replace partitions with data committed 
in the write.
+   *
+   * @return this write builder for method chaining
+   */
+  WriteBuilder overwriteDynamicPartitions();
+}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java
new file mode 100644
index 0000000..b443b3c3
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsOverwrite.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.sql.sources.AlwaysTrue$;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * Write builder trait for tables that support overwrite by filter.
+ * <p>
+ * Overwriting data by filter will delete any data that matches the filter and 
replace it with data
+ * that is committed in the write.
+ */
+public interface SupportsOverwrite extends WriteBuilder, SupportsTruncate {
+  /**
+   * Configures a write to replace data matching the filters with data 
committed in the write.
+   * <p>
+   * Rows must be deleted from the data source if and only if all of the 
filters match. That is,
+   * filters must be interpreted as ANDed together.
+   *
+   * @param filters filters used to match data to overwrite
+   * @return this write builder for method chaining
+   */
+  WriteBuilder overwrite(Filter[] filters);
+
+  @Override
+  default WriteBuilder truncate() {
+    return overwrite(new Filter[] { AlwaysTrue$.MODULE$ });
+  }
+}
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsTruncate.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsTruncate.java
new file mode 100644
index 0000000..69c2ba5
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/SupportsTruncate.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+/**
+ * Write builder trait for tables that support truncation.
+ * <p>
+ * Truncation removes all data in a table and replaces it with data that is 
committed in the write.
+ */
+public interface SupportsTruncate extends WriteBuilder {
+  /**
+   * Configures a write to replace all existing data with data committed in 
the write.
+   *
+   * @return this write builder for method chaining
+   */
+  WriteBuilder truncate();
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index e5f9473..4508281 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,8 @@ import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
InsertIntoTable, LogicalPlan, OverwriteByExpression}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
LogicalRelation}
@@ -264,29 +265,38 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       val dsOptions = new DataSourceOptions(options.asJava)
       provider.getTable(dsOptions) match {
         case table: SupportsBatchWrite =>
-          if (mode == SaveMode.Append) {
-            val relation = DataSourceV2Relation.create(table, options)
-            runCommand(df.sparkSession, "save") {
-              AppendData.byName(relation, df.logicalPlan)
-            }
-          } else {
-            val writeBuilder = table.newWriteBuilder(dsOptions)
-              .withQueryId(UUID.randomUUID().toString)
-              .withInputDataSchema(df.logicalPlan.schema)
-            writeBuilder match {
-              case s: SupportsSaveMode =>
-                val write = s.mode(mode).buildForBatch()
-                // It can only return null with `SupportsSaveMode`. We can 
clean it up after
-                // removing `SupportsSaveMode`.
-                if (write != null) {
-                  runCommand(df.sparkSession, "save") {
-                    WriteToDataSourceV2(write, df.logicalPlan)
+          lazy val relation = DataSourceV2Relation.create(table, options)
+          mode match {
+            case SaveMode.Append =>
+              runCommand(df.sparkSession, "save") {
+                AppendData.byName(relation, df.logicalPlan)
+              }
+
+            case SaveMode.Overwrite =>
+              // truncate the table
+              runCommand(df.sparkSession, "save") {
+                OverwriteByExpression.byName(relation, df.logicalPlan, 
Literal(true))
+              }
+
+            case _ =>
+              table.newWriteBuilder(dsOptions) match {
+                case writeBuilder: SupportsSaveMode =>
+                  val write = writeBuilder.mode(mode)
+                      .withQueryId(UUID.randomUUID().toString)
+                      .withInputDataSchema(df.logicalPlan.schema)
+                      .buildForBatch()
+                  // It can only return null with `SupportsSaveMode`. We can 
clean it up after
+                  // removing `SupportsSaveMode`.
+                  if (write != null) {
+                    runCommand(df.sparkSession, "save") {
+                      WriteToDataSourceV2(write, df.logicalPlan)
+                    }
                   }
-                }
 
-              case _ => throw new AnalysisException(
-                s"data source ${table.name} does not support SaveMode $mode")
-            }
+                case _ =>
+                  throw new AnalysisException(
+                    s"data source ${table.name} does not support SaveMode 
$mode")
+              }
           }
 
         // Streaming also uses the data source V2 API. So it may be that the 
data source implements
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 273cc3b..b73dc30 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -529,6 +529,12 @@ object DataSourceStrategy {
       case expressions.Contains(a: Attribute, Literal(v: UTF8String, 
StringType)) =>
         Some(sources.StringContains(a.name, v.toString))
 
+      case expressions.Literal(true, BooleanType) =>
+        Some(sources.AlwaysTrue)
+
+      case expressions.Literal(false, BooleanType) =>
+        Some(sources.AlwaysFalse)
+
       case _ => None
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
new file mode 100644
index 0000000..c8542bf
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, 
SupportsBatchWrite, Table}
+
+object DataSourceV2Implicits {
+  implicit class TableHelper(table: Table) {
+    def asBatchReadable: SupportsBatchRead = {
+      table match {
+        case support: SupportsBatchRead =>
+          support
+        case _ =>
+          throw new AnalysisException(s"Table does not support batch reads: 
${table.name}")
+      }
+    }
+
+    def asBatchWritable: SupportsBatchWrite = {
+      table match {
+        case support: SupportsBatchWrite =>
+          support
+        case _ =>
+          throw new AnalysisException(s"Table does not support batch writes: 
${table.name}")
+      }
+    }
+  }
+
+  implicit class OptionsHelper(options: Map[String, String]) {
+    def toDataSourceOptions: DataSourceOptions = new 
DataSourceOptions(options.asJava)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 47cf26d..5367778 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -17,11 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.util.UUID
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
@@ -30,7 +25,6 @@ import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, 
SparkDataStream}
 import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.types.StructType
 
 /**
  * A logical plan representing a data source v2 table.
@@ -45,26 +39,16 @@ case class DataSourceV2Relation(
     options: Map[String, String])
   extends LeafNode with MultiInstanceRelation with NamedRelation {
 
+  import DataSourceV2Implicits._
+
   override def name: String = table.name()
 
   override def simpleString(maxFields: Int): String = {
     s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
   }
 
-  def newScanBuilder(): ScanBuilder = table match {
-    case s: SupportsBatchRead =>
-      val dsOptions = new DataSourceOptions(options.asJava)
-      s.newScanBuilder(dsOptions)
-    case _ => throw new AnalysisException(s"Table is not readable: 
${table.name()}")
-  }
-
-  def newWriteBuilder(schema: StructType): WriteBuilder = table match {
-    case s: SupportsBatchWrite =>
-      val dsOptions = new DataSourceOptions(options.asJava)
-      s.newWriteBuilder(dsOptions)
-        .withQueryId(UUID.randomUUID().toString)
-        .withInputDataSchema(schema)
-    case _ => throw new AnalysisException(s"Table is not writable: 
${table.name()}")
+  def newScanBuilder(): ScanBuilder = {
+    table.asBatchReadable.newScanBuilder(options.toDataSourceOptions)
   }
 
   override def computeStats(): Statistics = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index d6d17d6..55d7b0a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -19,18 +19,18 @@ package org.apache.spark.sql.execution.datasources.v2
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy}
-import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, SubqueryExpression}
+import org.apache.spark.sql.{AnalysisException, Strategy}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, PredicateHelper}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
Repartition}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, 
OverwriteByExpression, OverwritePartitionsDynamic, Repartition}
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
+import org.apache.spark.sql.sources
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, 
MicroBatchStream}
-import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
 
-object DataSourceV2Strategy extends Strategy {
+object DataSourceV2Strategy extends Strategy with PredicateHelper {
 
   /**
    * Pushes down filters to the data source reader
@@ -100,6 +100,7 @@ object DataSourceV2Strategy extends Strategy {
     }
   }
 
+  import DataSourceV2Implicits._
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
@@ -146,14 +147,22 @@ object DataSourceV2Strategy extends Strategy {
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
 
     case AppendData(r: DataSourceV2Relation, query, _) =>
-      val writeBuilder = r.newWriteBuilder(query.schema)
-      writeBuilder match {
-        case s: SupportsSaveMode =>
-          val write = s.mode(SaveMode.Append).buildForBatch()
-          assert(write != null)
-          WriteToDataSourceV2Exec(write, planLater(query)) :: Nil
-        case _ => throw new AnalysisException(s"data source ${r.name} does not 
support SaveMode")
-      }
+      AppendDataExec(
+        r.table.asBatchWritable, r.options.toDataSourceOptions, 
planLater(query)) :: Nil
+
+    case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) 
=>
+      // fail if any filter cannot be converted. correctness depends on 
removing all matching data.
+      val filters = splitConjunctivePredicates(deleteExpr).map {
+        filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
+          throw new AnalysisException(s"Cannot translate expression to source 
filter: $filter"))
+      }.toArray
+
+      OverwriteByExpressionExec(
+        r.table.asBatchWritable, filters, r.options.toDataSourceOptions, 
planLater(query)) :: Nil
+
+    case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) =>
+      OverwritePartitionsDynamicExec(r.table.asBatchWritable,
+        r.options.toDataSourceOptions, planLater(query)) :: Nil
 
     case WriteToContinuousDataSource(writer, query) =>
       WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 50c5e4f..d7cb245 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -17,17 +17,22 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import java.util.UUID
+
 import scala.util.control.NonFatal
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.{AlwaysTrue, Filter}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchWrite}
+import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, 
SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, 
SupportsTruncate, WriteBuilder, WriterCommitMessage}
 import org.apache.spark.util.{LongAccumulator, Utils}
 
 /**
@@ -42,17 +47,137 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, 
query: LogicalPlan)
 }
 
 /**
- * The physical plan for writing data into data source v2.
+ * Physical plan node for append into a v2 table.
+ *
+ * Rows in the output data set are appended.
+ */
+case class AppendDataExec(
+    table: SupportsBatchWrite,
+    writeOptions: DataSourceOptions,
+    query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val batchWrite = newWriteBuilder() match {
+      case builder: SupportsSaveMode =>
+        builder.mode(SaveMode.Append).buildForBatch()
+
+      case builder =>
+        builder.buildForBatch()
+    }
+    doWrite(batchWrite)
+  }
+}
+
+/**
+ * Physical plan node for overwrite into a v2 table.
+ *
+ * Overwrites data in a table matched by a set of filters. Rows matching all 
of the filters will be
+ * deleted and rows in the output data set are appended.
+ *
+ * This plan is used to implement SaveMode.Overwrite. The behavior of 
SaveMode.Overwrite is to
+ * truncate the table -- delete all rows -- and append the output data set. 
This uses the filter
+ * AlwaysTrue to delete all rows.
  */
-case class WriteToDataSourceV2Exec(batchWrite: BatchWrite, query: SparkPlan)
-  extends UnaryExecNode {
+case class OverwriteByExpressionExec(
+    table: SupportsBatchWrite,
+    deleteWhere: Array[Filter],
+    writeOptions: DataSourceOptions,
+    query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
+
+  private def isTruncate(filters: Array[Filter]): Boolean = {
+    filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val batchWrite = newWriteBuilder() match {
+      case builder: SupportsTruncate if isTruncate(deleteWhere) =>
+        builder.truncate().buildForBatch()
+
+      case builder: SupportsSaveMode if isTruncate(deleteWhere) =>
+        builder.mode(SaveMode.Overwrite).buildForBatch()
+
+      case builder: SupportsOverwrite =>
+        builder.overwrite(deleteWhere).buildForBatch()
+
+      case _ =>
+        throw new SparkException(s"Table does not support dynamic partition 
overwrite: $table")
+    }
+
+    doWrite(batchWrite)
+  }
+}
+
+/**
+ * Physical plan node for dynamic partition overwrite into a v2 table.
+ *
+ * Dynamic partition overwrite is the behavior of Hive INSERT OVERWRITE ... 
PARTITION queries, and
+ * Spark INSERT OVERWRITE queries when 
spark.sql.sources.partitionOverwriteMode=dynamic. Each
+ * partition in the output data set replaces the corresponding existing 
partition in the table or
+ * creates a new partition. Existing partitions for which there is no data in 
the output data set
+ * are not modified.
+ */
+case class OverwritePartitionsDynamicExec(
+    table: SupportsBatchWrite,
+    writeOptions: DataSourceOptions,
+    query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val batchWrite = newWriteBuilder() match {
+      case builder: SupportsDynamicOverwrite =>
+        builder.overwriteDynamicPartitions().buildForBatch()
+
+      case builder: SupportsSaveMode =>
+        builder.mode(SaveMode.Overwrite).buildForBatch()
+
+      case _ =>
+        throw new SparkException(s"Table does not support dynamic partition 
overwrite: $table")
+    }
+
+    doWrite(batchWrite)
+  }
+}
+
+case class WriteToDataSourceV2Exec(
+    batchWrite: BatchWrite,
+    query: SparkPlan
+  ) extends V2TableWriteExec {
+
+  import DataSourceV2Implicits._
+
+  def writeOptions: DataSourceOptions = Map.empty[String, 
String].toDataSourceOptions
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    doWrite(batchWrite)
+  }
+}
+
+/**
+ * Helper for physical plans that build batch writes.
+ */
+trait BatchWriteHelper {
+  def table: SupportsBatchWrite
+  def query: SparkPlan
+  def writeOptions: DataSourceOptions
+
+  def newWriteBuilder(): WriteBuilder = {
+    table.newWriteBuilder(writeOptions)
+        .withInputDataSchema(query.schema)
+        .withQueryId(UUID.randomUUID().toString)
+  }
+}
+
+/**
+ * The base physical plan for writing data into data source v2.
+ */
+trait V2TableWriteExec extends UnaryExecNode {
+  def query: SparkPlan
 
   var commitProgress: Option[StreamWriterCommitProgress] = None
 
   override def child: SparkPlan = query
   override def output: Seq[Attribute] = Nil
 
-  override protected def doExecute(): RDD[InternalRow] = {
+  protected def doWrite(batchWrite: BatchWrite): RDD[InternalRow] = {
     val writerFactory = batchWrite.createBatchWriterFactory()
     val useCommitCoordinator = batchWrite.useCommitCoordinator
     val rdd = query.execute()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 3f941cc..a1ab55a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.spark.annotation.Stable
+import org.apache.spark.annotation.{Evolving, Stable}
 
 
////////////////////////////////////////////////////////////////////////////////////////////////////
 // This file defines all the filters that we can push down to the data sources.
@@ -218,3 +218,27 @@ case class StringEndsWith(attribute: String, value: 
String) extends Filter {
 case class StringContains(attribute: String, value: String) extends Filter {
   override def references: Array[String] = Array(attribute)
 }
+
+/**
+ * A filter that always evaluates to `true`.
+ */
+@Evolving
+case class AlwaysTrue() extends Filter {
+  override def references: Array[String] = Array.empty
+}
+
+@Evolving
+object AlwaysTrue extends AlwaysTrue {
+}
+
+/**
+ * A filter that always evaluates to `false`.
+ */
+@Evolving
+case class AlwaysFalse() extends Filter {
+  override def references: Array[String] = Array.empty
+}
+
+@Evolving
+object AlwaysFalse extends AlwaysFalse {
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 511fdfe..6b5c45e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -351,19 +351,21 @@ class DataSourceV2Suite extends QueryTest with 
SharedSQLContext {
     }
   }
 
-  test("SPARK-25700: do not read schema when writing in other modes except 
append mode") {
+  test("SPARK-25700: do not read schema when writing in other modes except 
append and overwrite") {
     withTempPath { file =>
       val cls = classOf[SimpleWriteOnlyDataSource]
       val path = file.getCanonicalPath
       val df = spark.range(5).select('id as 'i, -'id as 'j)
       // non-append mode should not throw exception, as they don't access 
schema.
       df.write.format(cls.getName).option("path", path).mode("error").save()
-      df.write.format(cls.getName).option("path", 
path).mode("overwrite").save()
       df.write.format(cls.getName).option("path", path).mode("ignore").save()
-      // append mode will access schema and should throw exception.
+      // append and overwrite modes will access the schema and should throw 
exception.
       intercept[SchemaReadAttemptException] {
         df.write.format(cls.getName).option("path", path).mode("append").save()
       }
+      intercept[SchemaReadAttemptException] {
+        df.write.format(cls.getName).option("path", 
path).mode("overwrite").save()
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to