This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b10a0eceb4 [GLUTEN-11106][VL] Spark 3.5 / Delta 3.3: Add
DeltaInsertIntoTableSuite, DeltaDDLSuite (#11107)
b10a0eceb4 is described below
commit b10a0eceb471fd186181fa8c6e94f058b5e27296
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Nov 21 10:30:28 2025 +0000
[GLUTEN-11106][VL] Spark 3.5 / Delta 3.3: Add DeltaInsertIntoTableSuite,
DeltaDDLSuite (#11107)
---
.../org/apache/spark/sql/delta/DeltaDDLSuite.scala | 686 +++++++
.../sql/delta/DeltaInsertIntoTableSuite.scala | 2107 ++++++++++++++++++++
.../sql/delta/DeltaInsertIntoTableSuiteShims.scala | 23 +
.../org/apache/spark/sql/delta/DeltaSuite.scala | 9 +-
.../apache/spark/sql/delta/FakeFileSystem.scala | 32 +
.../apache/spark/sql/delta/UpdateSuiteBase.scala | 5 +-
.../spark/sql/delta/test/DeltaSQLCommandTest.scala | 3 +-
7 files changed, 2857 insertions(+), 8 deletions(-)
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
new file mode 100644
index 0000000000..dcb7d7317e
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.schema.InvariantViolationException
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType,
StructType}
+
+// scalastyle:off import.ordering.noEmptyLine
+import org.apache.hadoop.fs.UnsupportedFileSystemException
+
+import scala.collection.JavaConverters._
+
+class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession with
DeltaSQLCommandTest {
+
+ override protected def verifyNullabilityFailure(exception:
AnalysisException): Unit = {
+ exception.getMessage.contains("Cannot change nullable column to
non-nullable")
+ }
+
+ test("protocol-related properties are not considered during duplicate table
creation") {
+ def createTable(tableName: String, location: String): Unit = {
+ sql(
+ s"""
+ |CREATE TABLE $tableName (id INT, val STRING)
+ |USING DELTA
+ |LOCATION '$location'
+ |TBLPROPERTIES (
+ | 'delta.columnMapping.mode' = 'name',
+ | 'delta.minReaderVersion' = '2',
+ | 'delta.minWriterVersion' = '5'
+ |)""".stripMargin
+ )
+ }
+ withTempDir {
+ dir =>
+ val table1 = "t1"
+ val table2 = "t2"
+ withTable(table1, table2) {
+ withSQLConf(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true")
{
+ createTable(table1, dir.getCanonicalPath)
+ createTable(table2, dir.getCanonicalPath)
+ val catalogTable1 =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(table1))
+ val catalogTable2 =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(table2))
+ assert(catalogTable1.properties("delta.columnMapping.mode") ==
"name")
+ assert(catalogTable2.properties("delta.columnMapping.mode") ==
"name")
+ }
+ }
+ }
+ }
+
+ test("table creation with ambiguous paths only allowed with legacy flag") {
+ // ambiguous paths not allowed
+ withTempDir {
+ foo =>
+ withTempDir {
+ bar =>
+ val fooPath = foo.getCanonicalPath()
+ val barPath = bar.getCanonicalPath()
+ val e = intercept[AnalysisException] {
+ sql(s"CREATE TABLE delta.`$fooPath`(id LONG) USING delta
LOCATION '$barPath'")
+ }
+
assert(e.message.contains("legacy.allowAmbiguousPathsInCreateTable"))
+ }
+ }
+
+ // allowed with legacy flag
+ withTempDir {
+ foo =>
+ withTempDir {
+ bar =>
+ val fooPath = foo.getCanonicalPath()
+ val barPath = bar.getCanonicalPath()
+ withSQLConf(DeltaSQLConf.DELTA_LEGACY_ALLOW_AMBIGUOUS_PATHS.key ->
"true") {
+ sql(s"CREATE TABLE delta.`$fooPath`(id LONG) USING delta
LOCATION '$barPath'")
+ assert(io.delta.tables.DeltaTable.isDeltaTable(fooPath))
+ assert(!io.delta.tables.DeltaTable.isDeltaTable(barPath))
+ }
+ }
+ }
+
+ // allowed if paths are the same
+ withTempDir {
+ foo =>
+ val fooPath = foo.getCanonicalPath()
+ sql(s"CREATE TABLE delta.`$fooPath`(id LONG) USING delta LOCATION
'$fooPath'")
+ assert(io.delta.tables.DeltaTable.isDeltaTable(fooPath))
+ }
+ }
+
+ test("append table when column name with special chars") {
+ withTable("t") {
+ val schema = new StructType().add("a`b", "int")
+ val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
+ df.write.format("delta").saveAsTable("t")
+ df.write.format("delta").mode("append").saveAsTable("t")
+ assert(spark.table("t").collect().isEmpty)
+ }
+ }
+
+ test("CREATE TABLE with OPTIONS") {
+ withTempPath {
+ path =>
+ spark.range(10).write.format("delta").save(path.getCanonicalPath)
+ withTable("t") {
+ def createTableWithOptions(simulateUC: Boolean): Unit = {
+ sql(s"""
+ |CREATE TABLE t USING delta LOCATION
'fake://${path.getCanonicalPath}'
+ |${if (simulateUC) "TBLPROPERTIES (test.simulateUC=true)"
else ""}
+ |OPTIONS (
+ | fs.fake.impl='${classOf[FakeFileSystem].getName}',
+ | fs.fake.impl.disable.cache=true)
+ |""".stripMargin)
+ }
+
intercept[UnsupportedFileSystemException](createTableWithOptions(false))
+ createTableWithOptions(true)
+ }
+ }
+ }
+}
+
+class DeltaDDLNameColumnMappingSuite extends DeltaDDLSuite with
DeltaColumnMappingEnableNameMode {
+
+ override protected def runOnlyTests = Seq(
+ "create table with NOT NULL - check violation through file writing",
+ "ALTER TABLE CHANGE COLUMN with nullability change in struct type -
relaxed"
+ )
+}
+
+abstract class DeltaDDLTestBase extends QueryTest with DeltaSQLTestUtils {
+ import testImplicits._
+
+ protected def verifyDescribeTable(tblName: String): Unit = {
+ val res = sql(s"DESCRIBE TABLE $tblName").collect()
+ assert(res.takeRight(2).map(_.getString(0)) === Seq("name", "dept"))
+ }
+
+ protected def verifyNullabilityFailure(exception: AnalysisException): Unit
+
+ protected def getDeltaLog(tableLocation: String): DeltaLog = {
+ DeltaLog.forTable(spark, tableLocation)
+ }
+
+ testQuietly("create table with NOT NULL - check violation through file
writing") {
+ withTempDir {
+ dir =>
+ withTable("delta_test") {
+ sql(s"""
+ |CREATE TABLE delta_test(a LONG, b String NOT NULL)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+ val expectedSchema = new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", StringType, nullable = false)
+ assert(spark.table("delta_test").schema === expectedSchema)
+
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("delta_test"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ Seq((1L, "a"))
+ .toDF("a", "b")
+ .write
+ .format("delta")
+ .mode("append")
+ .save(table.location.toString)
+ val read = spark.read.format("delta").load(table.location.toString)
+ checkAnswer(read, Seq(Row(1L, "a")))
+
+ intercept[InvariantViolationException] {
+ Seq((2L, null))
+ .toDF("a", "b")
+ .write
+ .format("delta")
+ .mode("append")
+ .save(table.location.getPath)
+ }
+ }
+ }
+ }
+
+ test("ALTER TABLE ADD COLUMNS with NOT NULL - not supported") {
+ withTempDir {
+ dir =>
+ val tableName = "delta_test_add_not_null"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $tableName(a LONG)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+
+ val expectedSchema = new StructType().add("a", LongType, nullable =
true)
+ assert(spark.table(tableName).schema === expectedSchema)
+
+ val e = intercept[AnalysisException] {
+ sql(s"""
+ |ALTER TABLE $tableName
+ |ADD COLUMNS (b String NOT NULL, c Int)""".stripMargin)
+ }
+ val msg = "`NOT NULL in ALTER TABLE ADD COLUMNS` is not supported
for Delta tables"
+ assert(e.getMessage.contains(msg))
+ }
+ }
+ }
+
+ test("ALTER TABLE CHANGE COLUMN from nullable to NOT NULL - not supported") {
+ withTempDir {
+ dir =>
+ val tableName = "delta_test_from_nullable_to_not_null"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $tableName(a LONG, b String)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+
+ val expectedSchema = new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", StringType, nullable = true)
+ assert(spark.table(tableName).schema === expectedSchema)
+
+ val e = intercept[AnalysisException] {
+ sql(s"""
+ |ALTER TABLE $tableName
+ |CHANGE COLUMN b b String NOT NULL""".stripMargin)
+ }
+ verifyNullabilityFailure(e)
+ }
+ }
+ }
+
+ test("ALTER TABLE CHANGE COLUMN from NOT NULL to nullable") {
+ withTempDir {
+ dir =>
+ val tableName = "delta_test_not_null_to_nullable"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $tableName(a LONG NOT NULL, b String)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+
+ val expectedSchema = new StructType()
+ .add("a", LongType, nullable = false)
+ .add("b", StringType, nullable = true)
+ assert(spark.table(tableName).schema === expectedSchema)
+
+ sql(s"INSERT INTO $tableName SELECT 1, 'a'")
+ checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1L, "a")))
+
+ sql(s"""
+ |ALTER TABLE $tableName
+ |ALTER COLUMN a DROP NOT NULL""".stripMargin)
+ val expectedSchema2 = new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", StringType, nullable = true)
+ assert(spark.table(tableName).schema === expectedSchema2)
+
+ sql(s"INSERT INTO $tableName SELECT NULL, 'b'")
+ checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1L, "a"),
Row(null, "b")))
+ }
+ }
+ }
+
+ testQuietly("create table with NOT NULL - check violation through SQL") {
+ withTempDir {
+ dir =>
+ withTable("delta_test") {
+ sql(s"""
+ |CREATE TABLE delta_test(a LONG, b String NOT NULL)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+ val expectedSchema = new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", StringType, nullable = false)
+ assert(spark.table("delta_test").schema === expectedSchema)
+
+ sql("INSERT INTO delta_test SELECT 1, 'a'")
+ checkAnswer(sql("SELECT * FROM delta_test"), Seq(Row(1L, "a")))
+
+ val e = intercept[InvariantViolationException] {
+ sql("INSERT INTO delta_test VALUES (2, null)")
+ }
+ if (!e.getMessage.contains("nullable values to non-null column")) {
+ verifyInvariantViolationException(e)
+ }
+ }
+ }
+ }
+
+ testQuietly("create table with NOT NULL in struct type - check violation") {
+ withTempDir {
+ dir =>
+ withTable("delta_test") {
+ sql(s"""
+ |CREATE TABLE delta_test
+ |(x struct<a: LONG, b: String NOT NULL>, y LONG)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+ val expectedSchema = new StructType()
+ .add(
+ "x",
+ new StructType()
+ .add("a", LongType, nullable = true)
+ .add("b", StringType, nullable = false))
+ .add("y", LongType, nullable = true)
+ assert(spark.table("delta_test").schema === expectedSchema)
+
+ sql("INSERT INTO delta_test SELECT (1, 'a'), 1")
+ checkAnswer(sql("SELECT * FROM delta_test"), Seq(Row(Row(1L, "a"),
1)))
+
+ val table =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("delta_test"))
+ assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+ val schema = new StructType()
+ .add(
+ "x",
+ new StructType()
+ .add("a", "bigint")
+ .add("b", "string"))
+ .add("y", "bigint")
+ val e = intercept[InvariantViolationException] {
+ spark
+ .createDataFrame(
+ Seq(Row(Row(2L, null), 2L)).asJava,
+ schema
+ )
+ .write
+ .format("delta")
+ .mode("append")
+ .save(table.location.getPath)
+ }
+ verifyInvariantViolationException(e)
+ }
+ }
+ }
+
+ test("ALTER TABLE ADD COLUMNS with NOT NULL in struct type - not supported")
{
+ withTempDir {
+ dir =>
+ val tableName = "delta_test_not_null_struct"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $tableName
+ |(y LONG)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+ val expectedSchema = new StructType()
+ .add("y", LongType, nullable = true)
+ assert(spark.table(tableName).schema === expectedSchema)
+
+ val e = intercept[AnalysisException] {
+ sql(s"""
+ |ALTER TABLE $tableName
+ |ADD COLUMNS (x struct<a: LONG, b: String NOT NULL>, z
INT)""".stripMargin)
+ }
+ val msg = "Operation not allowed: " +
+ "`NOT NULL in ALTER TABLE ADD COLUMNS` is not supported for Delta
tables"
+ assert(e.getMessage.contains(msg))
+ }
+ }
+ }
+
+ test("ALTER TABLE ADD COLUMNS to table with existing NOT NULL fields") {
+ withTempDir {
+ dir =>
+ val tableName = "delta_test_existing_not_null"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $tableName
+ |(y LONG NOT NULL)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+ val expectedSchema = new StructType()
+ .add("y", LongType, nullable = false)
+ assert(spark.table(tableName).schema === expectedSchema)
+
+ sql(s"""
+ |ALTER TABLE $tableName
+ |ADD COLUMNS (x struct<a: LONG, b: String>, z
INT)""".stripMargin)
+ val expectedSchema2 = new StructType()
+ .add("y", LongType, nullable = false)
+ .add(
+ "x",
+ new StructType()
+ .add("a", LongType)
+ .add("b", StringType))
+ .add("z", IntegerType)
+ assert(spark.table(tableName).schema === expectedSchema2)
+ }
+ }
+ }
+
+ /**
+ * Covers adding and changing a nested field using the ALTER TABLE command.
+ * @param initialColumnType
+ * Type of the single column used to create the initial test table.
+ * @param fieldToAdd
+ * Tuple (name, type) of the nested field to add and change.
+ * @param updatedColumnType
+ * Expected type of the single column after adding the nested field.
+ */
+ def testAlterTableNestedFields(testName: String)(
+ initialColumnType: String,
+ fieldToAdd: (String, String),
+ updatedColumnType: String): Unit = {
+ // Remove spaces in test name so we can re-use it as a unique table name.
+ val tableName = testName.replaceAll(" ", "")
+ test(s"ALTER TABLE ADD/CHANGE COLUMNS - nested $testName") {
+ withTempDir {
+ dir =>
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $tableName (data $initialColumnType)
+ |USING delta
+ |TBLPROPERTIES (${DeltaConfigs.COLUMN_MAPPING_MODE.key} =
'name')
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+
+ val expectedInitialType =
initialColumnType.filterNot(_.isWhitespace)
+ val expectedUpdatedType =
updatedColumnType.filterNot(_.isWhitespace)
+ val fieldName = s"data.${fieldToAdd._1}"
+ val fieldType = fieldToAdd._2
+
+ def columnType: DataFrame =
+ sql(s"DESCRIBE TABLE $tableName")
+ .where("col_name = 'data'")
+ .select("data_type")
+ checkAnswer(columnType, Row(expectedInitialType))
+
+ sql(s"ALTER TABLE $tableName ADD COLUMNS ($fieldName $fieldType)")
+ checkAnswer(columnType, Row(expectedUpdatedType))
+
+ sql(s"ALTER TABLE $tableName CHANGE COLUMN $fieldName TYPE
$fieldType")
+ checkAnswer(columnType, Row(expectedUpdatedType))
+ }
+ }
+ }
+ }
+
+ testAlterTableNestedFields("struct in map key")(
+ initialColumnType = "map<struct<a: int>, int>",
+ fieldToAdd = "key.b" -> "string",
+ updatedColumnType = "map<struct<a: int, b: string>, int>")
+
+ testAlterTableNestedFields("struct in map value")(
+ initialColumnType = "map<int, struct<a: int>>",
+ fieldToAdd = "value.b" -> "string",
+ updatedColumnType = "map<int, struct<a: int, b: string>>")
+
+ testAlterTableNestedFields("struct in array")(
+ initialColumnType = "array<struct<a: int>>",
+ fieldToAdd = "element.b" -> "string",
+ updatedColumnType = "array<struct<a: int, b: string>>")
+
+ testAlterTableNestedFields("struct in nested map keys")(
+ initialColumnType = "map<map<struct<a: int>, int>, int>",
+ fieldToAdd = "key.key.b" -> "string",
+ updatedColumnType = "map<map<struct<a: int, b: string>, int>, int>"
+ )
+
+ testAlterTableNestedFields("struct in nested map values")(
+ initialColumnType = "map<int, map<int, struct<a: int>>>",
+ fieldToAdd = "value.value.b" -> "string",
+ updatedColumnType = "map<int, map<int, struct<a: int, b: string>>>"
+ )
+
+ testAlterTableNestedFields("struct in nested arrays")(
+ initialColumnType = "array<array<struct<a: int>>>",
+ fieldToAdd = "element.element.b" -> "string",
+ updatedColumnType = "array<array<struct<a: int, b: string>>>")
+
+ testAlterTableNestedFields("struct in nested array and map")(
+ initialColumnType = "array<map<int, struct<a: int>>>",
+ fieldToAdd = "element.value.b" -> "string",
+ updatedColumnType = "array<map<int, struct<a: int, b: string>>>"
+ )
+
+ testAlterTableNestedFields("struct in nested map key and array")(
+ initialColumnType = "map<array<struct<a: int>>, int>",
+ fieldToAdd = "key.element.b" -> "string",
+ updatedColumnType = "map<array<struct<a: int, b: string>>, int>"
+ )
+
+ testAlterTableNestedFields("struct in nested map value and array")(
+ initialColumnType = "map<int, array<struct<a: int>>>",
+ fieldToAdd = "value.element.b" -> "string",
+ updatedColumnType = "map<int, array<struct<a: int, b: string>>>"
+ )
+
+ test("ALTER TABLE CHANGE COLUMN with nullability change in struct type - not
supported") {
+ withTempDir {
+ dir =>
+ val tableName = "not_supported_delta_test"
+ withTable(tableName) {
+ sql(s"""
+ |CREATE TABLE $tableName
+ |(x struct<a: LONG, b: String>, y LONG)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+ val expectedSchema = new StructType()
+ .add(
+ "x",
+ new StructType()
+ .add("a", LongType)
+ .add("b", StringType))
+ .add("y", LongType, nullable = true)
+ assert(spark.table(tableName).schema === expectedSchema)
+
+ val e1 = intercept[AnalysisException] {
+ sql(s"""
+ |ALTER TABLE $tableName
+ |CHANGE COLUMN x x struct<a: LONG, b: String NOT
NULL>""".stripMargin)
+ }
+ assert(e1.getMessage.contains("Cannot update"))
+ val e2 = intercept[AnalysisException] {
+ sql(s"""
+ |ALTER TABLE $tableName
+ |CHANGE COLUMN x.b b String NOT NULL""".stripMargin) //
this syntax may change
+ }
+ verifyNullabilityFailure(e2)
+ }
+ }
+ }
+
+ test("ALTER TABLE CHANGE COLUMN with nullability change in struct type -
relaxed") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ withTempDir {
+ dir =>
+ val tblName = "delta_test2"
+ withTable(tblName) {
+ sql(s"""
+ |CREATE TABLE $tblName
+ |(x struct<a: LONG, b: String NOT NULL> NOT NULL, y LONG)
+ |USING delta
+ |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+ val expectedSchema = new StructType()
+ .add(
+ "x",
+ new StructType()
+ .add("a", LongType)
+ .add("b", StringType, nullable = false),
+ nullable = false)
+ .add("y", LongType)
+ assert(spark.table(tblName).schema === expectedSchema)
+ sql(s"INSERT INTO $tblName SELECT (1, 'a'), 1")
+ checkAnswer(sql(s"SELECT * FROM $tblName"), Seq(Row(Row(1L, "a"),
1)))
+
+ sql(s"""
+ |ALTER TABLE $tblName
+ |ALTER COLUMN x.b DROP NOT NULL""".stripMargin) // relax
nullability
+ sql(s"INSERT INTO $tblName SELECT (2, null), null")
+ checkAnswer(
+ sql(s"SELECT * FROM $tblName"),
+ Seq(Row(Row(1L, "a"), 1), Row(Row(2L, null), null)))
+
+ sql(s"""
+ |ALTER TABLE $tblName
+ |ALTER COLUMN x DROP NOT NULL""".stripMargin)
+ sql(s"INSERT INTO $tblName SELECT null, 3")
+ checkAnswer(
+ sql(s"SELECT * FROM $tblName"),
+ Seq(Row(Row(1L, "a"), 1), Row(Row(2L, null), null), Row(null,
3)))
+ }
+ }
+ }
+ }
+
+ private def verifyInvariantViolationException(e:
InvariantViolationException): Unit = {
+ if (e == null) {
+ fail("Didn't receive a InvariantViolationException.")
+ }
+ assert(e.getMessage.contains("NOT NULL constraint violated for column"))
+ }
+
+ test("ALTER TABLE RENAME TO") {
+ withTable("tbl", "newTbl") {
+ sql(s"""
+ |CREATE TABLE tbl
+ |USING delta
+ |AS SELECT 1 as a, 'a' as b
+ """.stripMargin)
+ sql(s"ALTER TABLE tbl RENAME TO newTbl")
+ checkDatasetUnorderly(sql("SELECT * FROM newTbl").as[(Long, String)], 1L
-> "a")
+ }
+ }
+
+ /**
+ * Although Spark 3.2 adds the support for SHOW CREATE TABLE for v2 tables,
it doesn't work
+ * properly for Delta. For example, table properties, constraints and
generated columns are not
+ * showed properly.
+ *
+ * TODO Implement Delta's own ShowCreateTableCommand to show the Delta table
definition correctly
+ */
+ test("SHOW CREATE TABLE is not supported") {
+ withTable("delta_test") {
+ sql(s"""
+ |CREATE TABLE delta_test(a LONG, b String)
+ |USING delta
+ """.stripMargin)
+
+ val e = intercept[AnalysisException] {
+ sql("SHOW CREATE TABLE delta_test").collect()(0).getString(0)
+ }
+ assert(e.message.contains("`SHOW CREATE TABLE` is not supported for
Delta table"))
+ }
+
+ withTempDir {
+ dir =>
+ withTable("delta_test") {
+ val path = dir.getCanonicalPath()
+ sql(s"""
+ |CREATE TABLE delta_test(a LONG, b String)
+ |USING delta
+ |LOCATION '$path'
+ """.stripMargin)
+
+ val e = intercept[AnalysisException] {
+ sql("SHOW CREATE TABLE delta_test").collect()(0).getString(0)
+ }
+ assert(e.message.contains("`SHOW CREATE TABLE` is not supported for
Delta table"))
+ }
+ }
+ }
+
+ test("DESCRIBE TABLE for partitioned table") {
+ withTempDir {
+ dir =>
+ withTable("delta_test") {
+ val path = dir.getCanonicalPath()
+
+ val df =
+ Seq((1, "IT", "Alice"), (2, "CS", "Bob"), (3, "IT",
"Carol")).toDF("id", "dept", "name")
+ df.write.format("delta").partitionBy("name", "dept").save(path)
+
+ sql(s"CREATE TABLE delta_test USING delta LOCATION '$path'")
+
+ verifyDescribeTable("delta_test")
+ verifyDescribeTable(s"delta.`$path`")
+
+ assert(sql("DESCRIBE EXTENDED delta_test").collect().length > 0)
+ }
+ }
+ }
+
+ test("snapshot returned after a dropped managed table should be empty") {
+ withTable("delta_test") {
+ sql("CREATE TABLE delta_test USING delta AS SELECT 'foo' as a")
+ val tableLocation = sql("DESC DETAIL
delta_test").select("location").as[String].head()
+ val snapshotBefore = getDeltaLog(tableLocation).update()
+ sql("DROP TABLE delta_test")
+ val snapshotAfter = getDeltaLog(tableLocation).update()
+ assert(snapshotBefore ne snapshotAfter)
+ assert(snapshotAfter.version === -1)
+ }
+ }
+
+ test("snapshot returned after renaming a managed table should be empty") {
+ val oldTableName = "oldTableName"
+ val newTableName = "newTableName"
+ withTable(oldTableName, newTableName) {
+ sql(s"CREATE TABLE $oldTableName USING delta AS SELECT 'foo' as a")
+ val tableLocation = sql(s"DESC DETAIL
$oldTableName").select("location").as[String].head()
+ val snapshotBefore = getDeltaLog(tableLocation).update()
+ sql(s"ALTER TABLE $oldTableName RENAME TO $newTableName")
+ val snapshotAfter = getDeltaLog(tableLocation).update()
+ assert(snapshotBefore ne snapshotAfter)
+ assert(snapshotAfter.version === -1)
+ }
+ }
+
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
new file mode 100644
index 0000000000..c18e98f7f3
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
@@ -0,0 +1,2107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+// scalastyle:off import.ordering.noEmptyLine
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.parser.ParseException
+import
org.apache.spark.sql.delta.DeltaInsertIntoTableSuiteShims.{INSERT_INTO_TMP_VIEW_ERROR_MSG,
INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG}
+import org.apache.spark.sql.delta.schema.{InvariantViolationException,
SchemaUtils}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin,
DeltaSQLCommandTest}
+import org.apache.spark.sql.functions.{lit, struct}
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE,
PartitionOverwriteMode}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+import org.scalatest.BeforeAndAfter
+
+import java.io.File
+import java.util.TimeZone
+
+import scala.collection.JavaConverters._
+
+class DeltaInsertIntoSQLSuite
+ extends DeltaInsertIntoTestsWithTempViews(
+ supportsDynamicOverwrite = true,
+ includeSQLOnlyTests = true)
+ with DeltaSQLCommandTest
+ with DeltaExcludedBySparkVersionTestMixinShims {
+
+ override protected def doInsert(tableName: String, insert: DataFrame, mode:
SaveMode): Unit = {
+ val tmpView = "tmp_view"
+ withTempView(tmpView) {
+ insert.createOrReplaceTempView(tmpView)
+ val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+ sql(s"INSERT $overwrite TABLE $tableName SELECT * FROM $tmpView")
+ }
+ }
+
+ testSparkMasterOnly("Variant type") {
+ withTable("t") {
+ sql("CREATE TABLE t (id LONG, v VARIANT) USING delta")
+ sql("INSERT INTO t (id, v) VALUES (1, parse_json('{\"a\": 1}'))")
+ sql("INSERT INTO t (id, v) VALUES (2, parse_json('{\"b\": 2}'))")
+ sql("INSERT INTO t SELECT id, parse_json(cast(id as string)) v FROM
range(2)")
+
+ checkAnswer(
+ sql("select * from t").selectExpr("id", "to_json(v)"),
+ Seq(Row(1, "{\"a\":1}"), Row(2, "{\"b\":2}"), Row(0, "0"), Row(1,
"1")))
+ }
+ }
+
+ test("insert overwrite should work with selecting constants") {
+ withTable("t1") {
+ sql("CREATE TABLE t1 (a int, b int, c int) USING delta PARTITIONED BY
(b, c)")
+ sql("INSERT OVERWRITE TABLE t1 PARTITION (c=3) SELECT 1, 2")
+ checkAnswer(
+ sql("SELECT * FROM t1"),
+ Row(1, 2, 3) :: Nil
+ )
+ sql("INSERT OVERWRITE TABLE t1 PARTITION (b=2, c=3) SELECT 1")
+ checkAnswer(
+ sql("SELECT * FROM t1"),
+ Row(1, 2, 3) :: Nil
+ )
+ sql("INSERT OVERWRITE TABLE t1 PARTITION (b=2, c) SELECT 1, 3")
+ checkAnswer(
+ sql("SELECT * FROM t1"),
+ Row(1, 2, 3) :: Nil
+ )
+ }
+ }
+
+ test("insertInto: append by name") {
+ import testImplicits._
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ sql(s"INSERT INTO $t1(id, data) VALUES(1L, 'a')")
+ // Can be in a different order
+ sql(s"INSERT INTO $t1(data, id) VALUES('b', 2L)")
+ // Can be casted automatically
+ sql(s"INSERT INTO $t1(data, id) VALUES('c', 3)")
+ verifyTable(t1, df)
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ // Missing columns
+ assert(intercept[AnalysisException] {
+ sql(s"INSERT INTO $t1(data) VALUES(4)")
+ }.getMessage.contains("Column id is not specified in INSERT"))
+ // Missing columns with matching dataType
+ assert(intercept[AnalysisException] {
+ sql(s"INSERT INTO $t1(data) VALUES('b')")
+ }.getMessage.contains("Column id is not specified in INSERT"))
+ }
+ // Duplicate columns
+ assert(
+ intercept[AnalysisException](
+ sql(s"INSERT INTO $t1(data, data) VALUES(5)")).getMessage.nonEmpty)
+ }
+ }
+
+ test("insertInto: overwrite by name") {
+ import testImplicits._
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ sql(s"INSERT OVERWRITE $t1(id, data) VALUES(1L, 'a')")
+ verifyTable(t1, Seq((1L, "a")).toDF("id", "data"))
+ // Can be in a different order
+ sql(s"INSERT OVERWRITE $t1(data, id) VALUES('b', 2L)")
+ verifyTable(t1, Seq((2L, "b")).toDF("id", "data"))
+ // Can be casted automatically
+ sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 3)")
+ verifyTable(t1, Seq((3L, "c")).toDF("id", "data"))
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ // Missing columns
+ assert(intercept[AnalysisException] {
+ sql(s"INSERT OVERWRITE $t1(data) VALUES(4)")
+ }.getMessage.contains("Column id is not specified in INSERT"))
+ // Missing columns with matching datatype
+ assert(intercept[AnalysisException] {
+ sql(s"INSERT OVERWRITE $t1(data) VALUES(4L)")
+ }.getMessage.contains("Column id is not specified in INSERT"))
+ }
+ // Duplicate columns
+ assert(
+ intercept[AnalysisException](
+ sql(s"INSERT OVERWRITE $t1(data, data)
VALUES(5)")).getMessage.nonEmpty)
+ }
+ }
+
+ test("insertInto should throw an AnalysisError on name mismatch") {
+ def testInsertByNameError(targetSchema: String, expectedErrorClass:
String): Unit = {
+ val sourceTableName = "source"
+ val targetTableName = "target"
+ val format = "delta"
+ withTable(sourceTableName, targetTableName) {
+ sql(s"CREATE TABLE $sourceTableName (a int, b int) USING $format")
+ sql(s"CREATE TABLE $targetTableName $targetSchema USING $format")
+ val e = intercept[AnalysisException] {
+ sql(s"INSERT INTO $targetTableName BY NAME SELECT * FROM
$sourceTableName")
+ }
+ assert(e.getErrorClass === expectedErrorClass)
+ }
+ }
+
+ // NOTE: We use upper case in the target schema so that
needsSchemaAdjustmentByName returns
+ // true (due to case sensitivity) so that we call
resolveQueryColumnsByName and hit the right
+ // code path.
+
+ // when the number of columns does not match, throw an arity mismatch
error.
+ testInsertByNameError(
+ targetSchema = "(A int)",
+ expectedErrorClass =
"INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS")
+
+ // when the number of columns matches, but the names do not, throw a
missing column error.
+ testInsertByNameError(
+ targetSchema = "(A int, c int)",
+ expectedErrorClass = "DELTA_MISSING_COLUMN")
+ }
+
+ dynamicOverwriteTest("insertInto: dynamic overwrite by name") {
+ import testImplicits._
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, data2 string) " +
+ s"USING $v2Format PARTITIONED BY (id)")
+ sql(s"INSERT OVERWRITE $t1(id, data, data2) VALUES(1L, 'a', 'b')")
+ verifyTable(t1, Seq((1L, "a", "b")).toDF("id", "data", "data2"))
+ // Can be in a different order
+ sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('b', 'd', 2L)")
+ verifyTable(t1, Seq((1L, "a", "b"), (2L, "b", "d")).toDF("id", "data",
"data2"))
+ // Can be casted automatically
+ sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('c', 'e', 1)")
+ verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data",
"data2"))
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"false") {
+ // Missing columns
+ assert(intercept[AnalysisException] {
+ sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 1)")
+ }.getMessage.contains("Column data2 is not specified in INSERT"))
+ // Missing columns with matching datatype
+ assert(intercept[AnalysisException] {
+ sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 1L)")
+ }.getMessage.contains("Column data2 is not specified in INSERT"))
+ }
+ // Duplicate columns
+ assert(
+ intercept[AnalysisException](
+ sql(s"INSERT OVERWRITE $t1(data, data)
VALUES(5)")).getMessage.nonEmpty)
+ }
+ }
+
+ test("insertInto: static partition column name should not be used in the
column list") {
+ withTable("t") {
+ sql(s"CREATE TABLE t(i STRING, c string) USING $v2Format PARTITIONED BY
(c)")
+ checkError(
+ intercept[AnalysisException] {
+ sql("INSERT OVERWRITE t PARTITION (c='1') (c) VALUES ('2')")
+ },
+ "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST",
+ parameters = Map("staticName" -> "c")
+ )
+ }
+ }
+
+ Seq(("ordinal", ""), ("name", "(id, col2, col)")).foreach {
+ case (testName, values) =>
+ test(s"INSERT OVERWRITE schema evolution works for array struct types -
$testName") {
+ val sourceSchema =
+ "id INT, col2 STRING, col ARRAY<STRUCT<f1: STRING, f2: STRING, f3:
DATE>>"
+ val sourceRecord = "1, '2022-11-01', array(struct('s1', 's2',
DATE'2022-11-01'))"
+ val targetSchema = "id INT, col2 DATE, col ARRAY<STRUCT<f1: STRING,
f2: STRING>>"
+ val targetRecord = "1, DATE'2022-11-02', array(struct('t1', 't2'))"
+
+ runInsertOverwrite(sourceSchema, sourceRecord, targetSchema,
targetRecord) {
+ (sourceTable, targetTable) =>
+ sql(s"INSERT OVERWRITE $targetTable $values SELECT * FROM
$sourceTable")
+
+ // make sure table is still writeable
+ sql(s"""INSERT INTO $targetTable VALUES (2, DATE'2022-11-02',
+ | array(struct('s3', 's4',
DATE'2022-11-02')))""".stripMargin)
+ sql(s"""INSERT INTO $targetTable VALUES (3, DATE'2022-11-03',
+ |array(struct('s5', 's6', NULL)))""".stripMargin)
+ val df = spark.sql("""SELECT 1 as id, DATE'2022-11-01' as col2,
+ | array(struct('s1', 's2', DATE'2022-11-01'))
as col UNION
+ | SELECT 2 as id, DATE'2022-11-02' as col2,
+ | array(struct('s3', 's4', DATE'2022-11-02'))
as col UNION
+ | SELECT 3 as id, DATE'2022-11-03' as col2,
+ | array(struct('s5', 's6', NULL)) as
col""".stripMargin)
+ verifyTable(targetTable, df)
+ }
+ }
+ }
+
+ Seq(("ordinal", ""), ("name", "(id, col2, col)")).foreach {
+ case (testName, values) =>
+ test(s"INSERT OVERWRITE schema evolution works for array nested types -
$testName") {
+ val sourceSchema = "id INT, col2 STRING, " +
+ "col ARRAY<STRUCT<f1: INT, f2: STRUCT<f21: STRING, f22: DATE>, f3:
STRUCT<f31: STRING>>>"
+ val sourceRecord = "1, '2022-11-01', " +
+ "array(struct(1, struct('s1', DATE'2022-11-01'), struct('s1')))"
+ val targetSchema = "id INT, col2 DATE, col ARRAY<STRUCT<f1: INT, f2:
STRUCT<f21: STRING>>>"
+ val targetRecord = "2, DATE'2022-11-02', array(struct(2,
struct('s2')))"
+
+ runInsertOverwrite(sourceSchema, sourceRecord, targetSchema,
targetRecord) {
+ (sourceTable, targetTable) =>
+ sql(s"INSERT OVERWRITE $targetTable $values SELECT * FROM
$sourceTable")
+
+ // make sure table is still writeable
+ sql(s"""INSERT INTO $targetTable VALUES (2, DATE'2022-11-02',
+ | array(struct(2, struct('s2', DATE'2022-11-02'),
struct('s2'))))""".stripMargin)
+ sql(s"""INSERT INTO $targetTable VALUES (3, DATE'2022-11-03',
+ | array(struct(3, struct('s3', NULL),
struct(NULL))))""".stripMargin)
+ val df = spark.sql(
+ """SELECT 1 as id, DATE'2022-11-01' as col2,
+ | array(struct(1, struct('s1', DATE'2022-11-01'),
struct('s1'))) as col UNION
+ | SELECT 2 as id, DATE'2022-11-02' as col2,
+ | array(struct(2, struct('s2', DATE'2022-11-02'),
struct('s2'))) as col UNION
+ | SELECT 3 as id, DATE'2022-11-03' as col2,
+ | array(struct(3, struct('s3', NULL), struct(NULL))) as col
+ |""".stripMargin)
+ verifyTable(targetTable, df)
+ }
+ }
+ }
+
+ // Schema evolution for complex map type
+ test("insertInto schema evolution with map type - append mode: field
renaming + new field") {
+ withTable("map_schema_evolution") {
+ val tableName = "map_schema_evolution"
+ val initialSchema = StructType(
+ Seq(
+ StructField("key", IntegerType, nullable = false),
+ StructField(
+ "metrics",
+ MapType(
+ StringType,
+ StructType(
+ Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("value", IntegerType, nullable = false)
+ ))))
+ ))
+
+ val initialData = Seq(
+ Row(1, Map("event" -> Row(1, 1)))
+ )
+
+ val initialRdd = spark.sparkContext.parallelize(initialData)
+ val initialDf = spark.createDataFrame(initialRdd, initialSchema)
+
+ // Write initial data
+ initialDf.write
+ .option("overwriteSchema", "true")
+ .mode("overwrite")
+ .format("delta")
+ .saveAsTable(tableName)
+
+ // Evolved schema with field renamed and additional field in map struct
+ val evolvedSchema = StructType(
+ Seq(
+ StructField("renamed_key", IntegerType, nullable = false),
+ StructField(
+ "metrics",
+ MapType(
+ StringType,
+ StructType(
+ Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("value", IntegerType, nullable = false),
+ StructField("comment", StringType, nullable = true)
+ ))
+ )
+ )
+ ))
+
+ val evolvedData = Seq(
+ Row(1, Map("event" -> Row(1, 1, "deprecated")))
+ )
+
+ val evolvedRdd = spark.sparkContext.parallelize(evolvedData)
+ val evolvedDf = spark.createDataFrame(evolvedRdd, evolvedSchema)
+
+ // insert data without schema evolution
+ val err = intercept[AnalysisException] {
+ evolvedDf.write
+ .mode("append")
+ .format("delta")
+ .insertInto(tableName)
+ }
+ checkErrorMatchPVals(
+ exception = err,
+ "_LEGACY_ERROR_TEMP_DELTA_0007",
+ parameters = Map(
+ "message" -> "A schema mismatch detected when writing to the Delta
table(.|\\n)*"
+ )
+ )
+
+ // insert data with schema evolution
+ withSQLConf("spark.databricks.delta.schema.autoMerge.enabled" -> "true")
{
+ evolvedDf.write
+ .mode("append")
+ .format("delta")
+ .insertInto(tableName)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $tableName"),
+ Seq(
+ Row(1, Map("event" -> Row(1, 1, null))),
+ Row(1, Map("event" -> Row(1, 1, "deprecated")))
+ ))
+ }
+ }
+ }
+
+ test("not enough column in source to insert in nested map types") {
+ withTable("source", "target") {
+ sql("""CREATE TABLE source (
+ | id INT,
+ | metrics MAP<STRING, STRUCT<id: INT, value: INT>>
+ |) USING delta""".stripMargin)
+
+ sql("""CREATE TABLE target (
+ | id INT,
+ | metrics MAP<STRING, STRUCT<id: INT, value: INT, comment:
STRING>>
+ |) USING delta""".stripMargin)
+
+ sql("INSERT INTO source VALUES (1, map('event', struct(1, 1)))")
+
+ val e = intercept[AnalysisException] {
+ sql("INSERT INTO target SELECT * FROM source")
+ }
+ checkError(
+ exception = e,
+ "DELTA_INSERT_COLUMN_ARITY_MISMATCH",
+ parameters = Map(
+ "tableName" -> "spark_catalog.default.target",
+ "columnName" -> "not enough nested fields in value",
+ "numColumns" -> "3",
+ "insertColumns" -> "2"
+ )
+ )
+ }
+ }
+
+ // not enough nested fields in value
+ test("more columns in source to insert in nested map types") {
+ withTable("source", "target") {
+ sql("""CREATE TABLE source (
+ | id INT,
+ | metrics MAP<STRING, STRUCT<id: INT, value: INT, comment:
STRING>>
+ |) USING delta""".stripMargin)
+
+ sql("""CREATE TABLE target (
+ | id INT,
+ | metrics MAP<STRING, STRUCT<id: INT, value: INT>>
+ |) USING delta""".stripMargin)
+
+ sql("INSERT INTO source VALUES (1, map('event', struct(1, 1,
'deprecated')))")
+
+ val e = intercept[AnalysisException] {
+ sql("INSERT INTO target SELECT * FROM source")
+ }
+ checkErrorMatchPVals(
+ exception = e,
+ "_LEGACY_ERROR_TEMP_DELTA_0007",
+ parameters = Map(
+ "message" -> "A schema mismatch detected when writing to the Delta
table(.|\\n)*"
+ )
+ )
+
+ withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
+ sql("INSERT INTO target SELECT * FROM source")
+ checkAnswer(
+ spark.sql(s"SELECT * FROM source"),
+ Seq(
+ Row(1, Map("event" -> Row(1, 1, "deprecated")))
+ ))
+ }
+ }
+ }
+
+ test("more columns in source to insert in nested 2-level deep map types") {
+ withTable("source", "target") {
+ sql("""CREATE TABLE source (
+ | id INT,
+ | metrics MAP<STRING, MAP<STRING, STRUCT<id: INT, value: INT,
comment: STRING>>>
+ |) USING delta""".stripMargin)
+
+ sql("""CREATE TABLE target (
+ | id INT,
+ | metrics MAP<STRING, MAP<STRING, STRUCT<id: INT, value: INT>>>
+ |) USING delta""".stripMargin)
+
+ sql("""INSERT INTO source VALUES
+ | (1, map('event', map('subEvent', struct(1, 1, 'deprecated'))))
+ """.stripMargin)
+
+ val e = intercept[AnalysisException] {
+ sql("INSERT INTO target SELECT * FROM source")
+ }
+ checkErrorMatchPVals(
+ exception = e,
+ "_LEGACY_ERROR_TEMP_DELTA_0007",
+ parameters = Map(
+ "message" -> "A schema mismatch detected when writing to the Delta
table(.|\\n)*"
+ )
+ )
+
+ withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
+ sql("INSERT INTO target SELECT * FROM source")
+ checkAnswer(
+ spark.sql(s"SELECT * FROM source"),
+ Seq(
+ Row(1, Map("event" -> Map("subEvent" -> Row(1, 1, "deprecated"))))
+ ))
+ }
+ }
+ }
+
+ test("insert map type with different data type in key") {
+ withTable("source", "target") {
+ sql("""CREATE TABLE source (
+ | id INT,
+ | metrics MAP<STRING, STRUCT<id: INT, value: INT>>
+ |) USING delta""".stripMargin)
+
+ sql("""CREATE TABLE target (
+ | id INT,
+ | metrics MAP<INT, STRUCT<id: INT, value: INT>>
+ |) USING delta""".stripMargin)
+
+ sql("INSERT INTO source VALUES (1, map('1', struct(2, 3)))")
+
+ sql("INSERT INTO target SELECT * FROM source")
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target"),
+ Seq(
+ Row(1, Map(1 -> Row(2, 3)))
+ ))
+ }
+ }
+
+ test("insert map type with different data type in value") {
+ withTable("source", "target") {
+ sql("""CREATE TABLE source (
+ | id INT,
+ | metrics MAP<STRING, STRUCT<id: INT, value: LONG>>
+ |) USING delta""".stripMargin)
+
+ sql("""CREATE TABLE target (
+ | id INT,
+ | metrics MAP<STRING, STRUCT<id: INT, value: INT>>
+ |) USING delta""".stripMargin)
+
+ sql("INSERT INTO source VALUES (1, map('m1', struct(2, 3L)))")
+
+ sql("INSERT INTO target SELECT * FROM source")
+
+ checkAnswer(
+ spark.sql("SELECT * FROM target"),
+ Seq(
+ Row(1, Map("m1" -> Row(2, 3)))
+ ))
+ }
+ }
+
+ def runInsertOverwrite(
+ sourceSchema: String,
+ sourceRecord: String,
+ targetSchema: String,
+ targetRecord: String)(runAndVerify: (String, String) => Unit): Unit = {
+ val sourceTable = "source"
+ val targetTable = "target"
+ withTable(sourceTable) {
+ withTable(targetTable) {
+ withSQLConf("spark.databricks.delta.schema.autoMerge.enabled" ->
"true") {
+ // prepare source table
+ sql(s"""CREATE TABLE $sourceTable ($sourceSchema)
+ | USING DELTA""".stripMargin)
+ sql(s"INSERT INTO $sourceTable VALUES ($sourceRecord)")
+ // prepare target table
+ sql(s"""CREATE TABLE $targetTable ($targetSchema)
+ | USING DELTA""".stripMargin)
+ sql(s"INSERT INTO $targetTable VALUES ($targetRecord)")
+ runAndVerify(sourceTable, targetTable)
+ }
+ }
+ }
+ }
+}
+
+class DeltaInsertIntoSQLByPathSuite
+ extends DeltaInsertIntoTests(supportsDynamicOverwrite = true,
includeSQLOnlyTests = true)
+ with DeltaSQLCommandTest {
+ override protected def doInsert(tableName: String, insert: DataFrame, mode:
SaveMode): Unit = {
+ val tmpView = "tmp_view"
+ withTempView(tmpView) {
+ insert.createOrReplaceTempView(tmpView)
+ val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+ val ident = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(ident)
+ sql(s"INSERT $overwrite TABLE delta.`${catalogTable.location}` SELECT *
FROM $tmpView")
+ }
+ }
+
+ testQuietly("insertInto: cannot insert into a table that doesn't exist") {
+ import testImplicits._
+ Seq(SaveMode.Append, SaveMode.Overwrite).foreach {
+ mode =>
+ withTempDir {
+ dir =>
+ val t1 = s"delta.`${dir.getCanonicalPath}`"
+ val tmpView = "tmp_view"
+ withTempView(tmpView) {
+ val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else
"INTO"
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ df.createOrReplaceTempView(tmpView)
+
+ intercept[AnalysisException] {
+ sql(s"INSERT $overwrite TABLE $t1 SELECT * FROM $tmpView")
+ }
+
+ assert(
+ new File(dir, "_delta_log").mkdirs(),
+ "Failed to create a _delta_log directory")
+ intercept[AnalysisException] {
+ sql(s"INSERT $overwrite TABLE $t1 SELECT * FROM $tmpView")
+ }
+ }
+ }
+ }
+ }
+}
+
+class DeltaInsertIntoDataFrameSuite
+ extends DeltaInsertIntoTestsWithTempViews(
+ supportsDynamicOverwrite = true,
+ includeSQLOnlyTests = false)
+ with DeltaSQLCommandTest {
+ override protected def doInsert(tableName: String, insert: DataFrame, mode:
SaveMode): Unit = {
+ val dfw = insert.write.format(v2Format)
+ if (mode != null) {
+ dfw.mode(mode)
+ }
+ dfw.insertInto(tableName)
+ }
+}
+
+class DeltaInsertIntoDataFrameByPathSuite
+ extends DeltaInsertIntoTests(supportsDynamicOverwrite = true,
includeSQLOnlyTests = false)
+ with DeltaSQLCommandTest {
+ override protected def doInsert(tableName: String, insert: DataFrame, mode:
SaveMode): Unit = {
+ val dfw = insert.write.format(v2Format)
+ if (mode != null) {
+ dfw.mode(mode)
+ }
+ val ident = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+ val catalogTable = spark.sessionState.catalog.getTableMetadata(ident)
+ dfw.insertInto(s"delta.`${catalogTable.location}`")
+ }
+
+ testQuietly("insertInto: cannot insert into a table that doesn't exist") {
+ import testImplicits._
+ Seq(SaveMode.Append, SaveMode.Overwrite).foreach {
+ mode =>
+ withTempDir {
+ dir =>
+ val t1 = s"delta.`${dir.getCanonicalPath}`"
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+
+ intercept[AnalysisException] {
+ df.write.mode(mode).insertInto(t1)
+ }
+
+ assert(new File(dir, "_delta_log").mkdirs(), "Failed to create a
_delta_log directory")
+ intercept[AnalysisException] {
+ df.write.mode(mode).insertInto(t1)
+ }
+
+ // Test DataFrameWriterV2 as well
+ val dfW2 = df.writeTo(t1)
+ if (mode == SaveMode.Append) {
+ intercept[AnalysisException] {
+ dfW2.append()
+ }
+ } else {
+ intercept[AnalysisException] {
+ dfW2.overwrite(lit(true))
+ }
+ }
+ }
+ }
+ }
+}
+
+trait DeltaInsertIntoColumnMappingSelectedTests extends
DeltaColumnMappingSelectedTestMixin {
+ override protected def runOnlyTests = Seq(
+ "InsertInto: overwrite - mixed clause reordered - static mode",
+ "InsertInto: overwrite - multiple static partitions - dynamic mode"
+ )
+}
+
+class DeltaInsertIntoSQLNameColumnMappingSuite
+ extends DeltaInsertIntoSQLSuite
+ with DeltaColumnMappingEnableNameMode
+ with DeltaInsertIntoColumnMappingSelectedTests {
+ override protected def runOnlyTests: Seq[String] = super.runOnlyTests :+
+ "insert overwrite should work with selecting constants"
+}
+
+class DeltaInsertIntoSQLByPathNameColumnMappingSuite
+ extends DeltaInsertIntoSQLByPathSuite
+ with DeltaColumnMappingEnableNameMode
+ with DeltaInsertIntoColumnMappingSelectedTests
+
+class DeltaInsertIntoDataFrameNameColumnMappingSuite
+ extends DeltaInsertIntoDataFrameSuite
+ with DeltaColumnMappingEnableNameMode
+ with DeltaInsertIntoColumnMappingSelectedTests
+
+class DeltaInsertIntoDataFrameByPathNameColumnMappingSuite
+ extends DeltaInsertIntoDataFrameByPathSuite
+ with DeltaColumnMappingEnableNameMode
+ with DeltaInsertIntoColumnMappingSelectedTests
+
+abstract class DeltaInsertIntoTestsWithTempViews(
+ supportsDynamicOverwrite: Boolean,
+ includeSQLOnlyTests: Boolean)
+ extends DeltaInsertIntoTests(supportsDynamicOverwrite, includeSQLOnlyTests)
+ with DeltaTestUtilsForTempViews {
+ protected def testComplexTempViews(name: String)(text: String,
expectedResult: Seq[Row]): Unit = {
+ testWithTempView(s"insertInto a temp view created on top of a table -
$name") {
+ isSQLTempView =>
+ import testImplicits._
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (key int, value int) USING $v2Format")
+ Seq(SaveMode.Append, SaveMode.Overwrite).foreach {
+ mode =>
+ createTempViewFromSelect(text, isSQLTempView)
+ val df = Seq((0, 3), (1, 2)).toDF("key", "value")
+ try {
+ doInsert("v", df, mode)
+ checkAnswer(spark.table("v"), expectedResult)
+ } catch {
+ case e: AnalysisException =>
+ assert(
+ e.getMessage.contains(INSERT_INTO_TMP_VIEW_ERROR_MSG) ||
+ e.getMessage.contains("Inserting into an RDD-based table
is not allowed") ||
+ e.getMessage.contains("Table default.v not found") ||
+ e.getMessage.contains("Table or view 'v' not found in
database 'default'") ||
+ e.getMessage.contains("The table or view `default`.`v`
cannot be found") ||
+ e.getMessage.contains(
+ "[UNSUPPORTED_INSERT.RDD_BASED] Can't insert into the
target."))
+ }
+ }
+ }
+ }
+
+ testComplexTempViews("basic")(
+ "SELECT * FROM tbl",
+ Seq(Row(0, 3), Row(1, 2))
+ )
+
+ testComplexTempViews("subset cols")(
+ "SELECT key FROM tbl",
+ Seq(Row(0), Row(1))
+ )
+
+ testComplexTempViews("superset cols")(
+ "SELECT key, value, 1 FROM tbl",
+ Seq(Row(0, 3, 1), Row(1, 2, 1))
+ )
+
+ testComplexTempViews("nontrivial projection")(
+ "SELECT value as key, key as value FROM tbl",
+ Seq(Row(3, 0), Row(2, 1))
+ )
+
+ testComplexTempViews("view with too many internal aliases")(
+ "SELECT * FROM (SELECT * FROM tbl AS t1) AS t2",
+ Seq(Row(0, 3), Row(1, 2))
+ )
+
+}
+
+class DeltaColumnDefaultsInsertSuite extends InsertIntoSQLOnlyTests with
DeltaSQLCommandTest {
+
+ import testImplicits._
+
+ override val supportsDynamicOverwrite = true
+ override val includeSQLOnlyTests = true
+
+ val tblPropertiesAllowDefaults =
+ """tblproperties (
+ | 'delta.feature.allowColumnDefaults' = 'enabled',
+ | 'delta.columnMapping.mode' = 'name'
+ |)""".stripMargin
+
+ // Ignored in Gluten: Results mismatch.
+ ignore("Column DEFAULT value support with Delta Lake, positive tests") {
+ Seq(
+ PartitionOverwriteMode.STATIC.toString,
+ PartitionOverwriteMode.DYNAMIC.toString
+ ).foreach {
+ partitionOverwriteMode =>
+ withSQLConf(
+ SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+ SQLConf.PARTITION_OVERWRITE_MODE.key -> partitionOverwriteMode,
+ // Set these configs to allow writing test values like timestamps of
Jan. 1, year 1, etc.
+ SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key ->
LegacyBehaviorPolicy.LEGACY.toString,
+ SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key ->
LegacyBehaviorPolicy.LEGACY.toString
+ ) {
+ withTable("t1", "t2", "t3", "t4") {
+ // Positive tests:
+ // Create some columns with default values and then insert into
them.
+ sql("create table t1(" +
+ s"a int default 42, b boolean default true, c string default
'abc') using $v2Format " +
+ s"partitioned by (a) $tblPropertiesAllowDefaults")
+ sql("insert into t1 values (1, false, default)")
+ sql("insert into t1 values (1, default, default)")
+ sql("alter table t1 alter column c set default 'def'")
+ sql("insert into t1 values (default, default, default)")
+ sql("alter table t1 alter column c drop default")
+ // Exercise INSERT INTO commands with VALUES lists mapping columns
positionally.
+ sql("insert into t1 values (default, default, default)")
+ // Write the data in the table 't1' to new table 't4' and then
perform an INSERT OVERWRITE
+ // back to 't1' here, to exercise static and dynamic partition
overwrites.
+ sql(
+ f"create table t4(a int, b boolean, c string) using $v2Format " +
+ s"partitioned by (a) $tblPropertiesAllowDefaults")
+ // Exercise INSERT INTO commands with SELECT queries mapping
columns by name.
+ sql("insert into t4(a, b, c) select a, b, c from t1")
+ sql("insert overwrite table t1 select * from t4")
+ checkAnswer(
+ spark.table("t1"),
+ Seq(
+ Row(1, false, "abc"),
+ Row(1, true, "abc"),
+ Row(42, true, "def"),
+ Row(42, true, null)
+ ))
+ // Insert default values with all supported types.
+ sql(
+ "create table t2(" +
+ "s boolean default true, " +
+ "t byte default cast(null as byte), " +
+ "u short default cast(42 as short), " +
+ "v float default 0, " +
+ "w double default 0, " +
+ "x date default date'0000', " +
+ "y timestamp default timestamp'0000', " +
+ "z decimal(5, 2) default 123.45," +
+ "a1 bigint default 43," +
+ "a2 smallint default cast(5 as smallint)," +
+ s"a3 tinyint default cast(6 as tinyint)) using $v2Format " +
+ tblPropertiesAllowDefaults)
+ sql(
+ "insert into t2 values (default, default, default, default,
default, default, " +
+ "default, default, default, default, default)")
+ val result: Array[Row] = spark.table("t2").collect()
+ assert(result.length == 1)
+ val row: Row = result(0)
+ assert(row.length == 11)
+ assert(row(0) == true)
+ assert(row(1) == null)
+ assert(row(2) == 42)
+ assert(row(3) == 0.0f)
+ assert(row(4) == 0.0d)
+ assert(row(5).toString == "0001-01-01")
+ assert(row(6).toString == "0001-01-01 00:00:00.0")
+ assert(row(7).toString == "123.45")
+ assert(row(8) == 43L)
+ assert(row(9) == 5)
+ assert(row(10) == 6)
+ }
+ withTable("t3") {
+ // Set a default value for a partitioning column.
+ sql(
+ s"create table t3(i boolean, s bigint, q int default 42) using
$v2Format " +
+ s"partitioned by (i) $tblPropertiesAllowDefaults")
+ sql("alter table t3 alter column i set default true")
+ sql("insert into t3(i, s, q) values (default, default, default)")
+ checkAnswer(spark.table("t3"), Seq(Row(true, null, 42)))
+ // Drop the column and add it again without the default. Querying
the column now returns
+ // NULL.
+ sql("alter table t3 drop column q")
+ sql("alter table t3 add column q int")
+ checkAnswer(spark.table("t3"), Seq(Row(true, null, null)))
+ }
+ }
+ }
+ }
+
+ test("Column DEFAULT value support with Delta Lake, negative tests") {
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+ // The table feature is not enabled via TBLPROPERTIES.
+ withTable("createTableWithDefaultFeatureNotEnabled") {
+ checkError(
+ intercept[DeltaAnalysisException] {
+ sql(
+ s"create table createTableWithDefaultFeatureNotEnabled(" +
+ s"i boolean, s bigint, q int default 42) using $v2Format " +
+ "partitioned by (i)")
+ },
+ "WRONG_COLUMN_DEFAULTS_FOR_DELTA_FEATURE_NOT_ENABLED",
+ parameters = Map("commandType" -> "CREATE TABLE")
+ )
+ }
+ withTable("alterTableSetDefaultFeatureNotEnabled") {
+ sql(s"create table alterTableSetDefaultFeatureNotEnabled(a int) using
$v2Format")
+ checkError(
+ intercept[DeltaAnalysisException] {
+ sql("alter table alterTableSetDefaultFeatureNotEnabled alter
column a set default 42")
+ },
+ "WRONG_COLUMN_DEFAULTS_FOR_DELTA_FEATURE_NOT_ENABLED",
+ parameters = Map("commandType" -> "ALTER TABLE")
+ )
+ }
+ // Adding a new column with a default value to an existing table is not
allowed.
+ withTable("alterTableTest") {
+ sql(
+ s"create table alterTableTest(i boolean, s bigint, q int default 42)
using $v2Format " +
+ s"partitioned by (i) $tblPropertiesAllowDefaults")
+ checkError(
+ intercept[DeltaAnalysisException] {
+ sql("alter table alterTableTest add column z int default 42")
+ },
+
"WRONG_COLUMN_DEFAULTS_FOR_DELTA_ALTER_TABLE_ADD_COLUMN_NOT_SUPPORTED"
+ )
+ }
+ // The default value fails to analyze.
+ checkError(
+ intercept[AnalysisException] {
+ sql(
+ s"create table t4 (s int default badvalue) using $v2Format " +
+ s"$tblPropertiesAllowDefaults")
+ },
+ INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG,
+ parameters =
+ Map("statement" -> "CREATE TABLE", "colName" -> "`s`",
"defaultValue" -> "badvalue")
+ )
+
+ // The default value analyzes to a table not in the catalog.
+ // The error message reports that we failed to execute the command
because subquery
+ // expressions are not allowed in DEFAULT values.
+ checkError(
+ intercept[AnalysisException] {
+ sql(
+ s"create table t4 (s int default (select min(x) from badtable))
using $v2Format " +
+ tblPropertiesAllowDefaults)
+ },
+ "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
+ parameters = Map(
+ "statement" -> "CREATE TABLE",
+ "colName" -> "`s`",
+ "defaultValue" -> "(select min(x) from badtable)")
+ )
+ // The default value has an explicit alias. It fails to evaluate when
inlined into the
+ // VALUES list at the INSERT INTO time.
+ // The error message reports that we failed to execute the command
because subquery
+ // expressions are not allowed in DEFAULT values.
+ checkError(
+ intercept[AnalysisException] {
+ sql(
+ s"create table t4 (s int default (select 42 as alias)) using
$v2Format " +
+ tblPropertiesAllowDefaults)
+ },
+ "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
+ parameters = Map(
+ "statement" -> "CREATE TABLE",
+ "colName" -> "`s`",
+ "defaultValue" -> "(select 42 as alias)")
+ )
+ // The default value parses but the type is not coercible.
+ checkError(
+ intercept[AnalysisException] {
+ sql(
+ s"create table t4 (s bigint default false) " +
+ s"using $v2Format $tblPropertiesAllowDefaults")
+ },
+ "INVALID_DEFAULT_VALUE.DATA_TYPE",
+ parameters = Map(
+ "statement" -> "CREATE TABLE",
+ "colName" -> "`s`",
+ "expectedType" -> "\"BIGINT\"",
+ "actualType" -> "\"BOOLEAN\"",
+ "defaultValue" -> "false")
+ )
+ // It is possible to create a table with NOT NULL constraint and a
DEFAULT value of NULL.
+ // However, future inserts into that table will fail.
+ withTable("t4") {
+ sql(
+ s"create table t4(i boolean, s bigint, q int default null not null)
using $v2Format " +
+ s"partitioned by (i) $tblPropertiesAllowDefaults")
+ // The InvariantViolationException is not a SparkThrowable, so just
check we receive one.
+ assert(intercept[InvariantViolationException] {
+ sql("insert into t4 values (default, default, default)")
+ }.getMessage.nonEmpty)
+ }
+ // It is possible to create a table with a check constraint and a
DEFAULT value that does not
+ // conform. However, future inserts into that table will fail.
+ withTable("t4") {
+ sql(
+ s"create table t4(i boolean, s bigint, q int default 42) using
$v2Format " +
+ s"partitioned by (i) $tblPropertiesAllowDefaults")
+ sql("alter table t4 add constraint smallq check (q < 10)")
+ assert(intercept[InvariantViolationException] {
+ sql("insert into t4 values (default, default, default)")
+ }.getMessage.nonEmpty)
+ }
+ }
+ // Column default values are disabled per configuration in general.
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+ checkError(
+ intercept[ParseException] {
+ sql(
+ s"create table t4 (s int default 41 + 1) using $v2Format " +
+ tblPropertiesAllowDefaults)
+ },
+ "UNSUPPORTED_DEFAULT_VALUE.WITH_SUGGESTION",
+ parameters = Map.empty,
+ context = ExpectedContext(fragment = "s int default 41 + 1", start =
17, stop = 36)
+ )
+ }
+ }
+
+ test("Exercise column defaults with dataframe writes") {
+ // There are three column types exercising various combinations of
implicit and explicit
+ // default column value references in the 'insert into' statements. Note
these tests depend on
+ // enabling the configuration to use NULLs for missing DEFAULT column
values.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"true") {
+ for (useDataFrames <- Seq(false, true)) {
+ withTable("t1", "t2") {
+ sql(
+ s"create table t1(j int, s bigint default 42, x bigint default 43)
using $v2Format " +
+ tblPropertiesAllowDefaults)
+ if (useDataFrames) {
+ // Use 'saveAsTable' to exercise mapping columns by name. Note
that we have to specify
+ // values for all columns of the target table here whether we use
'saveAsTable' or
+ // 'insertInto', since the DataFrame generates a LogicalPlan
equivalent to a SQL INSERT
+ // INTO command without any explicit user-specified column list.
For example, if we
+ // used Seq((1)).toDF("j", "s", "x").write.mode("append") here
instead, it would
+ // generate an unresolved LogicalPlan equivalent to the SQL query
+ // "INSERT INTO t1 VALUES (1)". This would fail with an error
reporting the VALUES
+ // list is not long enough, since the analyzer would consider this
equivalent to
+ // "INSERT INTO t1 (j, s, x) VALUES (1)".
+ Seq((1, 42L, 43L))
+ .toDF("j", "s", "x")
+ .write
+ .mode("append")
+ .format("delta")
+ .saveAsTable("t1")
+ Seq((2, 42L, 43L))
+ .toDF("j", "s", "x")
+ .write
+ .mode("append")
+ .format("delta")
+ .saveAsTable("t1")
+ Seq((3, 42L, 43L))
+ .toDF("j", "s", "x")
+ .write
+ .mode("append")
+ .format("delta")
+ .saveAsTable("t1")
+ Seq((4, 44L, 43L))
+ .toDF("j", "s", "x")
+ .write
+ .mode("append")
+ .format("delta")
+ .saveAsTable("t1")
+ Seq((5, 44L, 45L))
+ .toDF("j", "s", "x")
+ .write
+ .mode("append")
+ .format("delta")
+ .saveAsTable("t1")
+ } else {
+ sql("insert into t1(j) values(1)")
+ sql("insert into t1(j, s) values(2, default)")
+ sql("insert into t1(j, s, x) values(3, default, default)")
+ sql("insert into t1(j, s) values(4, 44)")
+ sql("insert into t1(j, s, x) values(5, 44, 45)")
+ }
+ sql(
+ s"create table t2(j int, s bigint default 42, x bigint default 43)
using $v2Format " +
+ tblPropertiesAllowDefaults)
+ if (useDataFrames) {
+ // Use 'insertInto' to exercise mapping columns positionally.
+ spark.table("t1").where("j = 1").write.insertInto("t2")
+ spark.table("t1").where("j = 2").write.insertInto("t2")
+ spark.table("t1").where("j = 3").write.insertInto("t2")
+ spark.table("t1").where("j = 4").write.insertInto("t2")
+ spark.table("t1").where("j = 5").write.insertInto("t2")
+ } else {
+ sql("insert into t2(j) select j from t1 where j = 1")
+ sql("insert into t2(j, s) select j, default from t1 where j = 2")
+ sql("insert into t2(j, s, x) select j, default, default from t1
where j = 3")
+ sql("insert into t2(j, s) select j, s from t1 where j = 4")
+ sql("insert into t2(j, s, x) select j, s, 45L from t1 where j = 5")
+ }
+ checkAnswer(
+ spark.table("t2"),
+ Row(1, 42L, 43L) ::
+ Row(2, 42L, 43L) ::
+ Row(3, 42L, 43L) ::
+ Row(4, 44L, 43L) ::
+ Row(5, 44L, 45L) :: Nil)
+ // Also exercise schema evolution with DataFrames.
+ if (useDataFrames) {
+ Seq((5, 44L, 45L, 46L))
+ .toDF("j", "s", "x", "y")
+ .write
+ .mode("append")
+ .format("delta")
+ .option("mergeSchema", "true")
+ .saveAsTable("t2")
+ checkAnswer(
+ spark.table("t2"),
+ Row(1, 42L, 43L, null) ::
+ Row(2, 42L, 43L, null) ::
+ Row(3, 42L, 43L, null) ::
+ Row(4, 44L, 43L, null) ::
+ Row(5, 44L, 45L, null) ::
+ Row(5, 44L, 45L, 46L) :: Nil)
+ }
+ }
+ }
+ }
+ }
+
+ test("ReplaceWhere with column defaults with dataframe writes") {
+ withTable("t1", "t2", "t3") {
+ sql(
+ s"create table t1(j int, s bigint default 42, x bigint default 43)
using $v2Format " +
+ tblPropertiesAllowDefaults)
+ Seq((1, 42L, 43L)).toDF.write.insertInto("t1")
+ Seq((2, 42L, 43L)).toDF.write.insertInto("t1")
+ Seq((3, 42L, 43L)).toDF.write.insertInto("t1")
+ Seq((4, 44L, 43L)).toDF.write.insertInto("t1")
+ Seq((5, 44L, 45L)).toDF.write.insertInto("t1")
+ spark
+ .table("t1")
+ .write
+ .format("delta")
+ .mode("overwrite")
+ .option("replaceWhere", "j = default and s = default and x = default")
+ .saveAsTable("t2")
+ Seq("t1", "t2").foreach {
+ t =>
+ checkAnswer(
+ spark.table(t),
+ Row(1, 42L, 43L) ::
+ Row(2, 42L, 43L) ::
+ Row(3, 42L, 43L) ::
+ Row(4, 44L, 43L) ::
+ Row(5, 44L, 45L) :: Nil)
+ }
+ }
+ }
+
+ test("DESCRIBE and SHOW CREATE TABLE with column defaults") {
+ withTable("t") {
+ spark.sql(
+ s"CREATE TABLE t (id bigint default 42) " +
+ s"using $v2Format $tblPropertiesAllowDefaults")
+ val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED t")
+ assert(
+ descriptionDf.schema.map(field => (field.name, field.dataType)) ===
Seq(
+ ("col_name", StringType),
+ ("data_type", StringType),
+ ("comment", StringType)))
+ QueryTest.checkAnswer(
+ descriptionDf.filter(
+ "!(col_name in ('Catalog', 'Created Time', 'Created By', 'Database',
" +
+ "'index', 'Is_managed_location', 'Location', 'Name', 'Owner',
'Partition Provider'," +
+ "'Provider', 'Table', 'Table Properties', 'Type', '_partition',
'Last Access', " +
+ "'Statistics', ''))"),
+ Seq(
+ Row("# Column Default Values", "", ""),
+ Row("# Detailed Table Information", "", ""),
+ Row("id", "bigint", "42"),
+ Row("id", "bigint", null)
+ )
+ )
+ }
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE t (
+ | a bigint NOT NULL,
+ | b bigint DEFAULT 42,
+ | c string DEFAULT 'abc, "def"' COMMENT 'comment'
+ |)
+ |USING parquet
+ |COMMENT 'This is a comment'
+ |$tblPropertiesAllowDefaults
+ """.stripMargin)
+ val currentCatalog =
spark.sessionState.catalogManager.currentCatalog.name()
+ QueryTest.checkAnswer(
+ sql("SHOW CREATE TABLE T"),
+ Seq(Row(s"""CREATE TABLE $currentCatalog.default.T (
+ | a BIGINT,
+ | b BIGINT DEFAULT 42,
+ | c STRING DEFAULT 'abc, "def"' COMMENT 'comment')
+ |USING parquet
+ |COMMENT 'This is a comment'
+ |TBLPROPERTIES (
+ | 'delta.columnMapping.mode' = 'name',
+ | 'delta.feature.allowColumnDefaults' = 'enabled')
+ |""".stripMargin))
+ )
+ }
+ }
+}
+
+/** These tests come from Apache Spark with some modifications to match Delta
behavior. */
+abstract class DeltaInsertIntoTests(
+ override protected val supportsDynamicOverwrite: Boolean,
+ override protected val includeSQLOnlyTests: Boolean)
+ extends InsertIntoSQLOnlyTests {
+
+ import testImplicits._
+
+ override def afterEach(): Unit = {
+ spark.catalog.listTables().collect().foreach(t => sql(s"drop table
${t.name}"))
+ super.afterEach()
+ }
+
+ // START Apache Spark tests
+
+ /**
+ * Insert data into a table using the insertInto statement. Implementations
can be in SQL
+ * ("INSERT") or using the DataFrameWriter (`df.write.insertInto`).
Insertions will be by column
+ * ordinal and not by column name.
+ */
+ protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode
= null): Unit
+
+ test("insertInto: append") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ doInsert(t1, df)
+ verifyTable(t1, df)
+ }
+
+ test("insertInto: append by position") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+
+ doInsert(t1, dfr)
+ verifyTable(t1, df)
+ }
+
+ test("insertInto: append cast automatically") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val df = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+ doInsert(t1, df)
+ verifyTable(t1, df)
+ }
+
+ test("insertInto: append partitioned table") {
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ doInsert(t1, df)
+ verifyTable(t1, df)
+ }
+ }
+
+ test("insertInto: overwrite non-partitioned table") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data")
+ doInsert(t1, df)
+ doInsert(t1, df2, SaveMode.Overwrite)
+ verifyTable(t1, df2)
+ }
+
+ test("insertInto: overwrite partitioned table in static mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+ doInsert(t1, init)
+
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ doInsert(t1, df, SaveMode.Overwrite)
+ verifyTable(t1, df)
+ }
+ }
+
+ test("insertInto: overwrite by position") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+ doInsert(t1, init)
+
+ val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+ doInsert(t1, dfr, SaveMode.Overwrite)
+
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ verifyTable(t1, df)
+ }
+ }
+ }
+
+ test("insertInto: overwrite cast automatically") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data")
+ val df2c = Seq((4, "d"), (5, "e"), (6, "f")).toDF("id", "data")
+ doInsert(t1, df)
+ doInsert(t1, df2c, SaveMode.Overwrite)
+ verifyTable(t1, df2)
+ }
+
+ test("insertInto: fails when missing a column") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING
$v2Format")
+ val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ // mismatched datatype
+ val df2 = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+ for (df <- Seq(df1, df2)) {
+ val exc = intercept[AnalysisException] {
+ doInsert(t1, df)
+ }
+ verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data",
"missing"))
+ assert(exc.getMessage.contains("not enough data columns"))
+ }
+ }
+
+ test("insertInto: overwrite fails when missing a column") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING
$v2Format")
+ val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ // mismatched datatype
+ val df2 = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+ for (df <- Seq(df1, df2)) {
+ val exc = intercept[AnalysisException] {
+ doInsert(t1, df, SaveMode.Overwrite)
+ }
+ verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data",
"missing"))
+ assert(exc.getMessage.contains("not enough data columns"))
+ }
+ }
+
+ // This behavior is specific to Delta
+ test("insertInto: fails when an extra column is present but can evolve
schema") {
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit")
+ val exc = intercept[AnalysisException] {
+ doInsert(t1, df)
+ }
+
+ verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+ assert(exc.getMessage.contains(s"mergeSchema"))
+
+ withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
+ doInsert(t1, df)
+ }
+ verifyTable(t1, Seq((1L, "a", "mango")).toDF("id", "data", "fruit"))
+ }
+ }
+
+ test("insertInto: UTC timestamp partition values round trip across different
session TZ") {
+ val t1 = "utc_timestamp_partitioned_values"
+ withTable(t1) {
+ withTimeZone("UTC") {
+ sql(s"CREATE TABLE $t1 (data int, ts timestamp) USING delta
PARTITIONED BY (ts)")
+ sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15T04:00:00UTC')")
+ sql(s"INSERT INTO $t1 VALUES (2, timestamp'2024-06-15T4:00:00UTC+8')")
+ sql(s"INSERT INTO $t1 VALUES (3, timestamp'2024-06-15T5:00:00
UTC+01:00')")
+ sql(s"INSERT INTO $t1 VALUES (4,
timestamp'2024-06-16T5:00:00.123456UTC')")
+ sql(s"INSERT INTO $t1 VALUES (5, timestamp'1903-12-28T5:00:00')")
+ }
+
+ withTimeZone("GMT-8") {
+ val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+ val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+ val partitionColName =
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+ checkAnswer(
+ allFiles.select("partitionValues").orderBy("modificationTime"),
+ Seq(
+ Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z")),
+ Row(Map(partitionColName -> "2024-06-14T20:00:00.000000Z")),
+ Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z")),
+ Row(Map(partitionColName -> "2024-06-16T05:00:00.123456Z")),
+ Row(Map(partitionColName -> "1903-12-28T05:00:00.000000Z"))
+ )
+ )
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-15T4:00:00UTC+8'"),
+ Seq(Row(2)))
+
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-14T20:00:00UTC-08'"),
+ Seq(Row(1), Row(3)))
+
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'1903-12-27T21:00:00UTC-08'"),
+ Seq(Row(5)))
+
+ checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(4)))
+ }
+ }
+ }
+
+ test("insertInto: Non-UTC and UTC partition values round trip same session
TZ") {
+ val t1 = "utc_timestamp_partitioned_values"
+ withTable(t1) {
+ withTimeZone("GMT-8") {
+ sql(s"CREATE TABLE $t1 (data int, ts timestamp) USING delta
PARTITIONED BY (ts)")
+ sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15T4:00:00UTC')")
+ sql(s"INSERT INTO $t1 VALUES (2, timestamp'2024-06-15T4:00:00UTC+8')")
+ }
+
+ withTimeZone("GMT-8") {
+ withSQLConf(DeltaSQLConf.UTC_TIMESTAMP_PARTITION_VALUES.key ->
"false") {
+ sql(s"INSERT INTO $t1 VALUES (3, timestamp'2024-06-15T5:00:00
UTC+01:00')")
+ sql(s"INSERT INTO $t1 VALUES (4, timestamp'1903-12-28T5:00:00')")
+ }
+
+ val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+ val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+ val partitionColName =
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+ checkAnswer(
+ allFiles.select("partitionValues").orderBy("modificationTime"),
+ Seq(
+ Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z")),
+ Row(Map(partitionColName -> "2024-06-14T20:00:00.000000Z")),
+ Row(Map(partitionColName -> "2024-06-14 20:00:00")),
+ Row(Map(partitionColName -> "1903-12-28 05:00:00"))
+ )
+ )
+ }
+
+ withTimeZone("GMT-8") {
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-15T4:00:00UTC+8'"),
+ Seq(Row(2)))
+
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-14T20:00:00'"),
+ Seq(Row(1), Row(3)))
+
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'1903-12-28T05:00:00'"),
+ Seq(Row(4)))
+
+ checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(3)))
+ }
+ }
+ }
+
+ test("insertInto: Timestamp No Timezone round trips across timezones") {
+ val t1 = "timestamp_ntz"
+ withTable(t1) {
+ withTimeZone("GMT-8") {
+ sql(s"CREATE TABLE $t1 (data int, ts timestamp_ntz) USING delta
PARTITIONED BY (ts)")
+ sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15T4:00:00')")
+ sql(s"INSERT INTO $t1 VALUES (2, timestamp'2024-06-16T5:00:00')")
+ sql(s"INSERT INTO $t1 VALUES (3, timestamp'1903-12-28T5:00:00')")
+
+ val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+ val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+ val partitionColName =
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+ checkAnswer(
+ allFiles.select("partitionValues").orderBy("modificationTime"),
+ Seq(
+ Row(Map(partitionColName -> "2024-06-15 04:00:00")),
+ Row(Map(partitionColName -> "2024-06-16 05:00:00")),
+ Row(Map(partitionColName -> "1903-12-28 05:00:00"))
+ )
+ )
+ }
+
+ withSQLConf("spark.sql.session.timeZone" -> "UTC-03:00") {
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-15T4:00:00'"),
+ Seq(Row(1)))
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-16T05:00:00'"),
+ Seq(Row(2)))
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'1903-12-28T05:00:00'"),
+ Seq(Row(3)))
+ checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(3)))
+ }
+ }
+ }
+
+ test("insertInto: Timestamp round trips across same session time zone: UTC
normalized") {
+ val t1 = "utc_timestamp_partitioned_values"
+
+ withTimeZone("GMT-8") {
+ sql(s"CREATE TABLE $t1 (data int, ts timestamp) USING delta PARTITIONED
BY (ts)")
+ sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15 04:00:00UTC')")
+ sql(s"INSERT INTO $t1 VALUES (2, timestamp'2024-06-15T4:00:00UTC+8')")
+ sql(s"INSERT INTO $t1 VALUES (3, timestamp'2024-06-15T5:00:00
UTC+01:00')")
+ val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+ val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+ val partitionColName =
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+ checkAnswer(
+ allFiles.select("partitionValues").orderBy("modificationTime"),
+ Seq(
+ Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z")),
+ Row(Map(partitionColName -> "2024-06-14T20:00:00.000000Z")),
+ Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z"))
+ )
+ )
+
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-15T04:00:00UTC'"),
+ Seq(Row(1), Row(3)))
+
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-14T20:00:00UTC'"),
+ Seq(Row(2)))
+
+ checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(2)))
+ }
+ }
+
+ test("insertInto: Timestamp round trips across same session time zone:
session time normalized") {
+ val t1 = "utc_timestamp_partitioned_values"
+
+ withTimeZone("UTC") {
+ withSQLConf(DeltaSQLConf.UTC_TIMESTAMP_PARTITION_VALUES.key -> "false") {
+ sql(s"CREATE TABLE $t1 (data int, ts timestamp) USING delta
PARTITIONED BY (ts)")
+ sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15
04:00:00UTC+08:00')")
+ sql(s"INSERT INTO $t1 VALUES (2,
timestamp'2024-06-15T4:00:00UTC-08:00')")
+ sql(s"INSERT INTO $t1 VALUES (3,
timestamp'2024-06-15T5:00:00UTC+09:00')")
+ val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+ val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+ val partitionColName =
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+
+ checkAnswer(
+ allFiles.select("partitionValues").orderBy("modificationTime"),
+ Seq(
+ Row(Map(partitionColName -> "2024-06-14 20:00:00")),
+ Row(Map(partitionColName -> "2024-06-15 12:00:00")),
+ Row(Map(partitionColName -> "2024-06-14 20:00:00"))
+ )
+ )
+
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-14T20:00:00'"),
+ Seq(Row(1), Row(3)))
+
+ checkAnswer(
+ sql(s"SELECT data FROM $t1 where ts =
timestamp'2024-06-15T12:00:00'"),
+ Seq(Row(2)))
+
+ checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(2)))
+ }
+ }
+ }
+
+ private def withTimeZone(zone: String)(f: => Unit): Unit = {
+ val currentDefault = TimeZone.getDefault
+ try {
+ TimeZone.setDefault(TimeZone.getTimeZone(zone))
+ f
+ } finally {
+ TimeZone.setDefault(currentDefault)
+ }
+ }
+
+ // This behavior is specific to Delta
+ testQuietly("insertInto: schema enforcement") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val df = Seq(("a", 1L)).toDF("id", "data") // reverse order
+
+ def getDF(rows: Row*): DataFrame = {
+ spark.createDataFrame(spark.sparkContext.parallelize(rows),
spark.table(t1).schema)
+ }
+
+ withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> "strict") {
+ intercept[AnalysisException] {
+ doInsert(t1, df, SaveMode.Overwrite)
+ }
+
+ verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+
+ intercept[AnalysisException] {
+ doInsert(t1, df)
+ }
+
+ verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+ }
+
+ withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> "ansi") {
+ intercept[SparkException] {
+ doInsert(t1, df, SaveMode.Overwrite)
+ }
+
+ verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+
+ intercept[SparkException] {
+ doInsert(t1, df)
+ }
+
+ verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+ }
+
+ withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> "legacy") {
+ doInsert(t1, df, SaveMode.Overwrite)
+ verifyTable(t1, getDF(Row(null, "1")))
+
+ doInsert(t1, df)
+
+ verifyTable(t1, getDF(Row(null, "1"), Row(null, "1")))
+ }
+ }
+
+ testQuietly("insertInto: struct types and schema enforcement") {
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"""CREATE TABLE $t1 (
+ | id bigint,
+ | point struct<x: double, y: double>
+ |)
+ |USING delta""".stripMargin)
+ val init = Seq((1L, (0.0, 1.0))).toDF("id", "point")
+ doInsert(t1, init)
+
+ doInsert(t1, Seq((2L, (1.0, 0.0))).toDF("col1", "col2")) // naming
doesn't matter
+
+ // can handle null types
+ doInsert(t1, Seq((3L, (1.0, null))).toDF("col1", "col2"))
+ doInsert(t1, Seq((4L, (null, 1.0))).toDF("col1", "col2"))
+
+ val expected = Seq(
+ Row(1L, Row(0.0, 1.0)),
+ Row(2L, Row(1.0, 0.0)),
+ Row(3L, Row(1.0, null)),
+ Row(4L, Row(null, 1.0)))
+ verifyTable(t1, spark.createDataFrame(expected.asJava,
spark.table(t1).schema))
+
+ // schema enforcement
+ val complexSchema = Seq((5L, (0.5, 0.5), (2.5, 2.5, 1.0), "a", (0.5,
"b")))
+ .toDF("long", "struct", "newstruct", "string", "badstruct")
+ .select(
+ $"long",
+ $"struct",
+ struct($"newstruct._1".as("x"), $"newstruct._2".as("y"),
$"newstruct._3".as("z"))
+ .as("newstruct"),
+ $"string",
+ $"badstruct")
+
+ // new column in root
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("long", "struct", "string"))
+ }
+
+ // new column in struct not accepted
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("long", "newstruct"))
+ }
+
+ withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> "strict") {
+ // bad data type not accepted
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("string", "struct"))
+ }
+
+ // nested bad data type in struct not accepted
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("long", "badstruct"))
+ }
+ }
+
+ // missing column in struct
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select($"long", struct(lit(0.1))))
+ }
+
+ // wrong ordering
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("struct", "long"))
+ }
+
+ // schema evolution
+ withSQLConf(
+ DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true",
+ SQLConf.STORE_ASSIGNMENT_POLICY.key -> "strict") {
+ // ordering should still match
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("struct", "long"))
+ }
+
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("struct", "long", "string"))
+ }
+
+ // new column to the end works
+ doInsert(t1, complexSchema.select($"long", $"struct",
$"string".as("letter")))
+
+ // still cannot insert missing column
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("long", "struct"))
+ }
+
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select($"long", struct(lit(0.1)),
$"string"))
+ }
+
+ // still perform nested data type checks
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("long", "badstruct", "string"))
+ }
+
+ // bad column within struct
+ intercept[AnalysisException] {
+ doInsert(
+ t1,
+ complexSchema.select($"long", struct(lit(0.1), lit("a"),
lit(0.2)), $"string"))
+ }
+
+ // Add column to nested field
+ doInsert(t1, complexSchema.select($"long", $"newstruct", lit(null)))
+
+ // cannot insert missing field into struct now
+ intercept[AnalysisException] {
+ doInsert(t1, complexSchema.select("long", "struct", "string"))
+ }
+ }
+
+ val expected2 = Seq(
+ Row(1L, Row(0.0, 1.0, null), null),
+ Row(2L, Row(1.0, 0.0, null), null),
+ Row(3L, Row(1.0, null, null), null),
+ Row(4L, Row(null, 1.0, null), null),
+ Row(5L, Row(0.5, 0.5, null), "a"),
+ Row(5L, Row(2.5, 2.5, 1.0), null)
+ )
+ verifyTable(t1, spark.createDataFrame(expected2.asJava,
spark.table(t1).schema))
+
+ val expectedSchema = new StructType()
+ .add("id", LongType)
+ .add(
+ "point",
+ new StructType()
+ .add("x", DoubleType)
+ .add("y", DoubleType)
+ .add("z", DoubleType))
+ .add("letter", StringType)
+ val diff = SchemaUtils.reportDifferences(spark.table(t1).schema,
expectedSchema)
+ if (diff.nonEmpty) {
+ fail(diff.mkString("\n"))
+ }
+ }
+ }
+
+ dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic
mode") {
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+ doInsert(t1, init)
+
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ doInsert(t1, df, SaveMode.Overwrite)
+
+ verifyTable(t1, df.union(sql("SELECT 4L, 'keep'")))
+ }
+ }
+
+ dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic
mode by position") {
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+ doInsert(t1, init)
+
+ val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+ doInsert(t1, dfr, SaveMode.Overwrite)
+
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "keep")).toDF("id",
"data")
+ verifyTable(t1, df)
+ }
+ }
+
+ dynamicOverwriteTest(
+ "insertInto: overwrite partitioned table in dynamic mode automatic
casting") {
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+ doInsert(t1, init)
+
+ val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ val dfc = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+ doInsert(t1, df, SaveMode.Overwrite)
+
+ verifyTable(t1, df.union(sql("SELECT 4L, 'keep'")))
+ }
+ }
+
+ dynamicOverwriteTest("insertInto: overwrite fails when missing a column in
dynamic mode") {
+ val t1 = "tbl"
+ sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING
$v2Format")
+ val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+ // mismatched datatype
+ val df2 = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+ for (df <- Seq(df1, df2)) {
+ val exc = intercept[AnalysisException] {
+ doInsert(t1, df, SaveMode.Overwrite)
+ }
+ verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data",
"missing"))
+ assert(exc.getMessage.contains("not enough data columns"))
+ }
+ }
+
+ test("insert nested struct from view into delta") {
+ withTable("testNestedStruct") {
+ sql(
+ s"CREATE TABLE testNestedStruct " +
+ s" (num INT, text STRING, s STRUCT<a:STRING, s2:
STRUCT<c:STRING,d:STRING>, b:STRING>)" +
+ s" USING DELTA")
+ val data = sql(s"SELECT 1, 'a', struct('a', struct('c', 'd'), 'b')")
+ doInsert("testNestedStruct", data)
+ verifyTable(
+ "testNestedStruct",
+ sql(s"SELECT 1 AS num, 'a' AS text, struct('a', struct('c', 'd') AS
s2, 'b') AS s"))
+ }
+ }
+}
+
+trait InsertIntoSQLOnlyTests extends QueryTest with SharedSparkSession with
BeforeAndAfter {
+
+ import testImplicits._
+
+ /** Check that the results in `tableName` match the `expected` DataFrame. */
+ protected def verifyTable(tableName: String, expected: DataFrame): Unit = {
+ checkAnswer(spark.table(tableName), expected)
+ }
+
+ protected val v2Format: String = "delta"
+
+ /**
+ * Whether dynamic partition overwrites are supported by the `Table`
definitions used in the test
+ * suites. Tables that leverage the V1 Write interface do not support
dynamic partition
+ * overwrites.
+ */
+ protected val supportsDynamicOverwrite: Boolean
+
+ /** Whether to include the SQL specific tests in this trait within the
extending test suite. */
+ protected val includeSQLOnlyTests: Boolean
+
+ private def withTableAndData(tableName: String)(testFn: String => Unit):
Unit = {
+ withTable(tableName) {
+ val viewName = "tmp_view"
+ val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L,
"c"))).toDF("id", "data")
+ df.createOrReplaceTempView(viewName)
+ withTempView(viewName) {
+ testFn(viewName)
+ }
+ }
+ }
+
+ protected def dynamicOverwriteTest(testName: String)(f: => Unit): Unit = {
+ test(testName) {
+ try {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.DYNAMIC.toString) {
+ f
+ }
+ if (!supportsDynamicOverwrite) {
+ fail("Expected failure from test, because the table doesn't support
dynamic overwrites")
+ }
+ } catch {
+ case a: AnalysisException if !supportsDynamicOverwrite =>
+ assert(a.getMessage.contains("does not support dynamic overwrite"))
+ }
+ }
+ }
+
+ if (includeSQLOnlyTests) {
+ test("InsertInto: when the table doesn't exist") {
+ val t1 = "tbl"
+ val t2 = "tbl2"
+ withTableAndData(t1) {
+ _ =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+ val e = intercept[AnalysisException] {
+ sql(s"INSERT INTO $t2 VALUES (2L, 'dummy')")
+ }
+ assert(e.getMessage.contains(t2))
+ assert(
+ e.getMessage.contains("Table not found") ||
+ e.getMessage.contains(s"table or view `$t2` cannot be found")
+ )
+ }
+ }
+
+ test("InsertInto: append to partitioned table - static clause") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ sql(s"INSERT INTO $t1 PARTITION (id = 23) SELECT data FROM $view")
+ verifyTable(t1, sql(s"SELECT 23, data FROM $view"))
+ }
+ }
+
+ test("InsertInto: static PARTITION clause fails with non-partition
column") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (data)")
+
+ val exc = intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $t1 PARTITION (id=1) SELECT data FROM
$view")
+ }
+
+ verifyTable(t1, spark.emptyDataFrame)
+ assert(
+ exc.getMessage.contains("PARTITION clause cannot contain a
non-partition column") ||
+ exc.getMessage.contains("PARTITION clause cannot contain the
non-partition column") ||
+ exc.getMessage.contains(
+ "[NON_PARTITION_COLUMN] PARTITION clause cannot contain the
non-partition column"))
+ assert(exc.getMessage.contains("id"))
+ }
+ }
+
+ test("InsertInto: dynamic PARTITION clause fails with non-partition
column") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+
+ val exc = intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $t1 PARTITION (data) SELECT * FROM $view")
+ }
+
+ verifyTable(t1, spark.emptyDataFrame)
+ assert(
+ exc.getMessage.contains("PARTITION clause cannot contain a
non-partition column") ||
+ exc.getMessage.contains("PARTITION clause cannot contain the
non-partition column") ||
+ exc.getMessage.contains(
+ "[NON_PARTITION_COLUMN] PARTITION clause cannot contain the
non-partition column"))
+ assert(exc.getMessage.contains("data"))
+ }
+ }
+
+ test("InsertInto: overwrite - dynamic clause - static mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'also-deleted')")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id) SELECT * FROM
$view")
+ verifyTable(t1, Seq((1, "a"), (2, "b"), (3, "c")).toDF())
+ }
+ }
+ }
+
+ dynamicOverwriteTest("InsertInto: overwrite - dynamic clause - dynamic
mode") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'keep')")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id) SELECT * FROM $view")
+ verifyTable(t1, Seq((1, "a"), (2, "b"), (3, "c"), (4,
"keep")).toDF("id", "data"))
+ }
+ }
+
+ test("InsertInto: overwrite - missing clause - static mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'also-deleted')")
+ sql(s"INSERT OVERWRITE TABLE $t1 SELECT * FROM $view")
+ verifyTable(t1, Seq((1, "a"), (2, "b"), (3, "c")).toDF("id",
"data"))
+ }
+ }
+ }
+
+ dynamicOverwriteTest("InsertInto: overwrite - missing clause - dynamic
mode") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'keep')")
+ sql(s"INSERT OVERWRITE TABLE $t1 SELECT * FROM $view")
+ verifyTable(t1, Seq((1, "a"), (2, "b"), (3, "c"), (4,
"keep")).toDF("id", "data"))
+ }
+ }
+
+ test("InsertInto: overwrite - static clause") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, p1 int) " +
+ s"USING $v2Format PARTITIONED BY (p1)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 23), (4L, 'keep', 2)")
+ verifyTable(t1, Seq((2L, "dummy", 23), (4L, "keep", 2)).toDF("id",
"data", "p1"))
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p1 = 23) SELECT * FROM
$view")
+ verifyTable(
+ t1,
+ Seq((1, "a", 23), (2, "b", 23), (3, "c", 23), (4, "keep",
2)).toDF("id", "data", "p1"))
+ }
+ }
+
+ test("InsertInto: overwrite - mixed clause - static mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+ s"USING $v2Format PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L,
'also-deleted', 2)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id, p = 2) SELECT *
FROM $view")
+ verifyTable(t1, Seq((1, "a", 2), (2, "b", 2), (3, "c",
2)).toDF("id", "data", "p"))
+ }
+ }
+ }
+
+ test("InsertInto: overwrite - mixed clause reordered - static mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+ s"USING $v2Format PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L,
'also-deleted', 2)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2, id) SELECT *
FROM $view")
+ verifyTable(t1, Seq((1, "a", 2), (2, "b", 2), (3, "c",
2)).toDF("id", "data", "p"))
+ }
+ }
+ }
+
+ test("InsertInto: overwrite - implicit dynamic partition - static mode") {
+ withSQLConf(PARTITION_OVERWRITE_MODE.key ->
PartitionOverwriteMode.STATIC.toString) {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+ s"USING $v2Format PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L,
'also-deleted', 2)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2) SELECT * FROM
$view")
+ verifyTable(t1, Seq((1, "a", 2), (2, "b", 2), (3, "c",
2)).toDF("id", "data", "p"))
+ }
+ }
+ }
+
+ dynamicOverwriteTest("InsertInto: overwrite - mixed clause - dynamic
mode") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+ s"USING $v2Format PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2, id) SELECT * FROM
$view")
+ verifyTable(
+ t1,
+ Seq((1, "a", 2), (2, "b", 2), (3, "c", 2), (4, "keep",
2)).toDF("id", "data", "p"))
+ }
+ }
+
+ dynamicOverwriteTest("InsertInto: overwrite - mixed clause reordered -
dynamic mode") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+ s"USING $v2Format PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id, p = 2) SELECT * FROM
$view")
+ verifyTable(
+ t1,
+ Seq((1, "a", 2), (2, "b", 2), (3, "c", 2), (4, "keep",
2)).toDF("id", "data", "p"))
+ }
+ }
+
+ dynamicOverwriteTest("InsertInto: overwrite - implicit dynamic partition -
dynamic mode") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+ s"USING $v2Format PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2) SELECT * FROM
$view")
+ verifyTable(
+ t1,
+ Seq((1, "a", 2), (2, "b", 2), (3, "c", 2), (4, "keep",
2)).toDF("id", "data", "p"))
+ }
+ }
+
+ test("insert nested struct literal into delta") {
+ withTable("insertNestedTest") {
+ sql(s"CREATE TABLE insertNestedTest " +
+ s" (num INT, text STRING, s STRUCT<a:STRING, s2:
STRUCT<c:STRING,d:STRING>, b:STRING>)" +
+ s" USING DELTA")
+ sql(s"INSERT INTO insertNestedTest VALUES (1, 'a', struct('a',
struct('c', 'd'), 'b'))")
+ }
+ }
+
+ dynamicOverwriteTest("InsertInto: overwrite - multiple static partitions -
dynamic mode") {
+ val t1 = "tbl"
+ withTableAndData(t1) {
+ view =>
+ sql(
+ s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+ s"USING $v2Format PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 2, p = 2) SELECT
data FROM $view")
+ verifyTable(
+ t1,
+ Seq((2, "a", 2), (2, "b", 2), (2, "c", 2), (4, "keep",
2)).toDF("id", "data", "p"))
+ }
+ }
+
+ test("InsertInto: overwrite - dot in column names - static mode") {
+ import testImplicits._
+ val t1 = "tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (`a.b` string, `c.d` string) USING $v2Format
PARTITIONED BY (`a.b`)")
+ sql(s"INSERT OVERWRITE $t1 PARTITION (`a.b` = 'a') VALUES('b')")
+ verifyTable(t1, Seq("a" -> "b").toDF("id", "data"))
+ }
+ }
+ }
+
+ // END Apache Spark tests
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
new file mode 100644
index 0000000000..9d90d0c885
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+object DeltaInsertIntoTableSuiteShims {
+ val INSERT_INTO_TMP_VIEW_ERROR_MSG = "Inserting into a view is not allowed"
+
+ val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG =
"INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION"
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
index c01e3f3f7c..e41df6180f 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
@@ -1516,8 +1516,7 @@ class DeltaSuite
}
}
- // Ignore in Gluten.
- ignore("SC-8727 - default snapshot num partitions") {
+ test("SC-8727 - default snapshot num partitions") {
withTempDir {
tempDir =>
spark.range(10).write.format("delta").save(tempDir.toString)
@@ -1560,7 +1559,7 @@ class DeltaSuite
}
}
- // Ignore in Gluten.
+ // Ignore in Gluten: Gluten has less partitions.
ignore("SC-8810: skip deleted file") {
withSQLConf(("spark.sql.files.ignoreMissingFiles", "true")) {
withTempDir {
@@ -1595,7 +1594,7 @@ class DeltaSuite
}
}
- // Ignore in Gluten.
+ // Ignore in Gluten: Error message mismatch.
ignore("SC-8810: skipping deleted file still throws on corrupted file") {
withSQLConf(("spark.sql.files.ignoreMissingFiles", "true")) {
withTempDir {
@@ -1661,7 +1660,7 @@ class DeltaSuite
}
}
- // Ignore in Gluten.
+ // Ignore in Gluten: Error message mismatch.
ignore("deleted files cause failure by default") {
withTempDir {
tempDir =>
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/FakeFileSystem.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/FakeFileSystem.scala
new file mode 100644
index 0000000000..1b7d4441e3
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/FakeFileSystem.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.hadoop.fs.RawLocalFileSystem
+
+import java.net.URI
+
+/** A fake file system to test whether session Hadoop configuration will be
picked up. */
+class FakeFileSystem extends RawLocalFileSystem {
+ override def getScheme: String = FakeFileSystem.scheme
+ override def getUri: URI = FakeFileSystem.uri
+}
+
+object FakeFileSystem {
+ val scheme = "fake"
+ val uri = URI.create(s"$scheme:///")
+}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
index f047278b67..327ef2dad8 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
@@ -1035,7 +1035,8 @@ abstract class UpdateSuiteBase
expectedErrorClassForSQLTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
expectedErrorClassForDataSetTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION"
)
-// Ignore in Gluten.
+
+// Ignore in Gluten: Error message mismatch.
// testInvalidTempViews("superset cols")(
// text = "SELECT key, value, 1 FROM tab",
// // The analyzer can't tell whether the table originally had the extra
column or not.
@@ -1059,7 +1060,7 @@ abstract class UpdateSuiteBase
}
}
-// Ignore in Gluten - result mismatch.
+// Ignore in Gluten - result mismatch, but Gluten's answer is correct.
// testComplexTempViews("nontrivial projection")(
// text = "SELECT value as key, key as value FROM tab",
// expectedResult = Seq(Row(3, 0), Row(3, 3))
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
index bf64858399..53adfbf4a4 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
@@ -46,10 +46,11 @@ trait DeltaSQLCommandTest extends SharedSparkSession {
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.default.parallelism", "1")
.set("spark.memory.offHeap.enabled", "true")
- .set("spark.sql.shuffle.partitions", "1")
+ .set("spark.sql.shuffle.partitions", "5")
.set("spark.memory.offHeap.size", "2g")
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key, "true")
+ .set("spark.databricks.delta.snapshotPartitions", "2")
}
}
// spotless:on
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]