This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7db85642600 [SPARK-46273][SQL] Support INSERT INTO/OVERWRITE using 
DSv2 sources
7db85642600 is described below

commit 7db85642600b1e3b39ca11e41d4e3e0bf1c8962b
Author: allisonwang-db <allison.w...@databricks.com>
AuthorDate: Mon Dec 11 10:32:48 2023 -0800

    [SPARK-46273][SQL] Support INSERT INTO/OVERWRITE using DSv2 sources
    
    ### What changes were proposed in this pull request?
    
    This PR adds test cases for INSERT INTO and INSERT OVERWRITE queries with 
DSv2 sources.
    
    ### Why are the changes needed?
    
    To improve test coverage
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44213 from allisonwang-db/spark-46273-dsv2-insert.
    
    Authored-by: allisonwang-db <allison.w...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/errors/QueryCompilationErrors.scala  |   4 +-
 .../datasources/v2/TableCapabilityCheck.scala      |   2 +-
 .../spark/sql/connector/DataSourceV2Suite.scala    | 127 ++++++++++++++++++++-
 3 files changed, 129 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index b7e10dc194a..1195e9dd78d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
     unsupportedTableOperationError(table.name(), "truncate in batch mode")
   }
 
-  def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = {
-    unsupportedTableOperationError(table.name(), "overwrite by filter in batch 
mode")
+  def unsupportedOverwriteByFilterInBatchModeError(name: String): Throwable = {
+    unsupportedTableOperationError(name, "overwrite by filter in batch mode")
   }
 
   def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String): 
Throwable = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
index a3afaa36ab9..b1a93addc80 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
@@ -63,7 +63,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
           case _ =>
             if (!supportsBatchWrite(r.table) || 
!r.table.supports(OVERWRITE_BY_FILTER)) {
               throw 
QueryCompilationErrors.unsupportedOverwriteByFilterInBatchModeError(
-                r.table)
+               r.name)
             }
         }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index f2e518e8acc..6e365e1d605 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.connector
 
 import java.io.File
+import java.util
 import java.util.OptionalLong
 
 import test.org.apache.spark.sql.connector._
@@ -723,8 +724,116 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
       }
     }
   }
-}
 
+  test("SPARK-46273: insert into") {
+    val cls = classOf[SupportsExternalMetadataDataSource]
+    withTable("test") {
+      sql(
+        s"""
+           |CREATE TABLE test (x INT, y INT) USING ${cls.getName}
+           |""".stripMargin)
+      sql("INSERT INTO test VALUES (1, 2)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2)))
+      // Insert by name
+      sql("INSERT INTO test(y, x) VALUES (3, 2)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3)))
+      // Can be casted automatically
+      sql("INSERT INTO test(y, x) VALUES (4L, 3L)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3, 
4)))
+      // Insert values by name
+      sql("INSERT INTO test BY NAME VALUES (5, 4) t(y, x)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3, 
4), Row(4, 5)))
+      // Missing columns
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT INTO test VALUES (4)")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`test`",
+          "tableColumns" -> "`x`, `y`",
+          "dataColumns" -> "`col1`"
+        )
+      )
+      // Duplicate columns
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT INTO test(x, x) VALUES (4, 5)")
+        },
+        errorClass = "COLUMN_ALREADY_EXISTS",
+        parameters = Map("columnName" -> "`x`"))
+    }
+  }
+
+  test("SPARK-46273: insert overwrite") {
+    val cls = classOf[SupportsExternalMetadataDataSource]
+    withTable("test") {
+      sql(
+        s"""
+           |CREATE TABLE test USING ${cls.getName}
+           |AS VALUES (0, 1), (1, 2) t(x, y)
+           |""".stripMargin)
+      sql(
+        s"""
+           |INSERT OVERWRITE test VALUES (2, 3), (3, 4)
+           |""".stripMargin)
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3), Row(3, 4)))
+      // Insert overwrite by name
+      sql("INSERT OVERWRITE test(y, x) VALUES (3, 2)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3)))
+      // Can be casted automatically
+      sql("INSERT OVERWRITE test(y, x) VALUES (4L, 3L)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(3, 4)))
+    }
+  }
+
+  test("SPARK-46273: insert into with partition") {
+    val cls = classOf[SupportsExternalMetadataDataSource]
+    withTable("test") {
+      sql(s"CREATE TABLE test(x INT, y INT) USING ${cls.getName} PARTITIONED 
BY (x, y)")
+      sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
+      sql("INSERT INTO test PARTITION(x = 2, y) VALUES (3)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3)))
+      sql("INSERT INTO test PARTITION(y, x = 3) VALUES (4)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3, 
4)))
+      sql("INSERT INTO test PARTITION(x, y) VALUES (4, 5)")
+      checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3, 
4), Row(4, 5)))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT INTO test PARTITION(z = 1) VALUES (2)")
+        },
+        errorClass = "NON_PARTITION_COLUMN",
+        parameters = Map("columnName" -> "`z`"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT INTO test PARTITION(x, y = 1) VALUES (2, 3)")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`test`",
+          "tableColumns" -> "`x`, `y`",
+          "dataColumns" -> "`col1`, `y`, `col2`")
+      )
+    }
+  }
+
+  test("SPARK-46273: insert overwrite with partition") {
+    val cls = classOf[SupportsExternalMetadataDataSource]
+    withTable("test") {
+      sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName} PARTITIONED 
BY (x, y)")
+      sql("INSERT INTO test PARTITION(x = 1) VALUES (2)")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT OVERWRITE test PARTITION(x = 1) VALUES (5)")
+        },
+        errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+        parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`test`",
+          "operation" -> "overwrite by filter in batch mode")
+      )
+    }
+  }
+}
 
 case class RangeInputPartition(start: Int, end: Int) extends InputPartition
 
@@ -1227,6 +1336,22 @@ class SimpleWriteOnlyDataSource extends 
SimpleWritableDataSource {
 
 class SupportsExternalMetadataDataSource extends SimpleWritableDataSource {
   override def supportsExternalMetadata(): Boolean = true
+
+  class TestTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      options: CaseInsensitiveStringMap) extends MyTable(options) {
+    override def schema(): StructType = schema
+
+    override def partitioning(): Array[Transform] = partitioning
+  }
+
+  override def getTable(
+      schema: StructType,
+      partitioning: Array[Transform],
+      properties: util.Map[String, String]): Table = {
+    new TestTable(schema, partitioning, new 
CaseInsensitiveStringMap(properties))
+  }
 }
 
 class SupportsExternalMetadataWritableDataSource extends 
SimpleWritableDataSource {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to