This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7db85642600 [SPARK-46273][SQL] Support INSERT INTO/OVERWRITE using DSv2 sources 7db85642600 is described below commit 7db85642600b1e3b39ca11e41d4e3e0bf1c8962b Author: allisonwang-db <allison.w...@databricks.com> AuthorDate: Mon Dec 11 10:32:48 2023 -0800 [SPARK-46273][SQL] Support INSERT INTO/OVERWRITE using DSv2 sources ### What changes were proposed in this pull request? This PR adds test cases for INSERT INTO and INSERT OVERWRITE queries with DSv2 sources. ### Why are the changes needed? To improve test coverage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44213 from allisonwang-db/spark-46273-dsv2-insert. Authored-by: allisonwang-db <allison.w...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/errors/QueryCompilationErrors.scala | 4 +- .../datasources/v2/TableCapabilityCheck.scala | 2 +- .../spark/sql/connector/DataSourceV2Suite.scala | 127 ++++++++++++++++++++- 3 files changed, 129 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index b7e10dc194a..1195e9dd78d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat unsupportedTableOperationError(table.name(), "truncate in batch mode") } - def unsupportedOverwriteByFilterInBatchModeError(table: Table): Throwable = { - unsupportedTableOperationError(table.name(), "overwrite by filter in batch mode") + def unsupportedOverwriteByFilterInBatchModeError(name: String): Throwable = { + unsupportedTableOperationError(name, "overwrite by filter in batch mode") } def catalogOperationNotSupported(catalog: CatalogPlugin, operation: String): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala index a3afaa36ab9..b1a93addc80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala @@ -63,7 +63,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) { case _ => if (!supportsBatchWrite(r.table) || !r.table.supports(OVERWRITE_BY_FILTER)) { throw QueryCompilationErrors.unsupportedOverwriteByFilterInBatchModeError( - r.table) + r.name) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index f2e518e8acc..6e365e1d605 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector import java.io.File +import java.util import java.util.OptionalLong import test.org.apache.spark.sql.connector._ @@ -723,8 +724,116 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS } } } -} + test("SPARK-46273: insert into") { + val cls = classOf[SupportsExternalMetadataDataSource] + withTable("test") { + sql( + s""" + |CREATE TABLE test (x INT, y INT) USING ${cls.getName} + |""".stripMargin) + sql("INSERT INTO test VALUES (1, 2)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2))) + // Insert by name + sql("INSERT INTO test(y, x) VALUES (3, 2)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3))) + // Can be casted automatically + sql("INSERT INTO test(y, x) VALUES (4L, 3L)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3, 4))) + // Insert values by name + sql("INSERT INTO test BY NAME VALUES (5, 4) t(y, x)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3, 4), Row(4, 5))) + // Missing columns + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO test VALUES (4)") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`test`", + "tableColumns" -> "`x`, `y`", + "dataColumns" -> "`col1`" + ) + ) + // Duplicate columns + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO test(x, x) VALUES (4, 5)") + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`x`")) + } + } + + test("SPARK-46273: insert overwrite") { + val cls = classOf[SupportsExternalMetadataDataSource] + withTable("test") { + sql( + s""" + |CREATE TABLE test USING ${cls.getName} + |AS VALUES (0, 1), (1, 2) t(x, y) + |""".stripMargin) + sql( + s""" + |INSERT OVERWRITE test VALUES (2, 3), (3, 4) + |""".stripMargin) + checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3), Row(3, 4))) + // Insert overwrite by name + sql("INSERT OVERWRITE test(y, x) VALUES (3, 2)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(2, 3))) + // Can be casted automatically + sql("INSERT OVERWRITE test(y, x) VALUES (4L, 3L)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(3, 4))) + } + } + + test("SPARK-46273: insert into with partition") { + val cls = classOf[SupportsExternalMetadataDataSource] + withTable("test") { + sql(s"CREATE TABLE test(x INT, y INT) USING ${cls.getName} PARTITIONED BY (x, y)") + sql("INSERT INTO test PARTITION(x = 1) VALUES (2)") + sql("INSERT INTO test PARTITION(x = 2, y) VALUES (3)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3))) + sql("INSERT INTO test PARTITION(y, x = 3) VALUES (4)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3, 4))) + sql("INSERT INTO test PARTITION(x, y) VALUES (4, 5)") + checkAnswer(sql("SELECT * FROM test"), Seq(Row(1, 2), Row(2, 3), Row(3, 4), Row(4, 5))) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO test PARTITION(z = 1) VALUES (2)") + }, + errorClass = "NON_PARTITION_COLUMN", + parameters = Map("columnName" -> "`z`")) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO test PARTITION(x, y = 1) VALUES (2, 3)") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`test`", + "tableColumns" -> "`x`, `y`", + "dataColumns" -> "`col1`, `y`, `col2`") + ) + } + } + + test("SPARK-46273: insert overwrite with partition") { + val cls = classOf[SupportsExternalMetadataDataSource] + withTable("test") { + sql(s"CREATE TABLE test (x INT, y INT) USING ${cls.getName} PARTITIONED BY (x, y)") + sql("INSERT INTO test PARTITION(x = 1) VALUES (2)") + checkError( + exception = intercept[AnalysisException] { + sql("INSERT OVERWRITE test PARTITION(x = 1) VALUES (5)") + }, + errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`test`", + "operation" -> "overwrite by filter in batch mode") + ) + } + } +} case class RangeInputPartition(start: Int, end: Int) extends InputPartition @@ -1227,6 +1336,22 @@ class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { class SupportsExternalMetadataDataSource extends SimpleWritableDataSource { override def supportsExternalMetadata(): Boolean = true + + class TestTable( + schema: StructType, + partitioning: Array[Transform], + options: CaseInsensitiveStringMap) extends MyTable(options) { + override def schema(): StructType = schema + + override def partitioning(): Array[Transform] = partitioning + } + + override def getTable( + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + new TestTable(schema, partitioning, new CaseInsensitiveStringMap(properties)) + } } class SupportsExternalMetadataWritableDataSource extends SimpleWritableDataSource { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org