This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b10a0eceb4 [GLUTEN-11106][VL] Spark 3.5 / Delta 3.3: Add 
DeltaInsertIntoTableSuite, DeltaDDLSuite (#11107)
b10a0eceb4 is described below

commit b10a0eceb471fd186181fa8c6e94f058b5e27296
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Nov 21 10:30:28 2025 +0000

    [GLUTEN-11106][VL] Spark 3.5 / Delta 3.3: Add DeltaInsertIntoTableSuite, 
DeltaDDLSuite (#11107)
---
 .../org/apache/spark/sql/delta/DeltaDDLSuite.scala |  686 +++++++
 .../sql/delta/DeltaInsertIntoTableSuite.scala      | 2107 ++++++++++++++++++++
 .../sql/delta/DeltaInsertIntoTableSuiteShims.scala |   23 +
 .../org/apache/spark/sql/delta/DeltaSuite.scala    |    9 +-
 .../apache/spark/sql/delta/FakeFileSystem.scala    |   32 +
 .../apache/spark/sql/delta/UpdateSuiteBase.scala   |    5 +-
 .../spark/sql/delta/test/DeltaSQLCommandTest.scala |    3 +-
 7 files changed, 2857 insertions(+), 8 deletions(-)

diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
new file mode 100644
index 0000000000..dcb7d7317e
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.schema.InvariantViolationException
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, 
StructType}
+
+// scalastyle:off import.ordering.noEmptyLine
+import org.apache.hadoop.fs.UnsupportedFileSystemException
+
+import scala.collection.JavaConverters._
+
+class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession with 
DeltaSQLCommandTest {
+
+  override protected def verifyNullabilityFailure(exception: 
AnalysisException): Unit = {
+    exception.getMessage.contains("Cannot change nullable column to 
non-nullable")
+  }
+
+  test("protocol-related properties are not considered during duplicate table 
creation") {
+    def createTable(tableName: String, location: String): Unit = {
+      sql(
+        s"""
+           |CREATE TABLE $tableName (id INT, val STRING)
+           |USING DELTA
+           |LOCATION '$location'
+           |TBLPROPERTIES (
+           |  'delta.columnMapping.mode' = 'name',
+           |  'delta.minReaderVersion' = '2',
+           |  'delta.minWriterVersion' = '5'
+           |)""".stripMargin
+      )
+    }
+    withTempDir {
+      dir =>
+        val table1 = "t1"
+        val table2 = "t2"
+        withTable(table1, table2) {
+          withSQLConf(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") 
{
+            createTable(table1, dir.getCanonicalPath)
+            createTable(table2, dir.getCanonicalPath)
+            val catalogTable1 = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(table1))
+            val catalogTable2 = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(table2))
+            assert(catalogTable1.properties("delta.columnMapping.mode") == 
"name")
+            assert(catalogTable2.properties("delta.columnMapping.mode") == 
"name")
+          }
+        }
+    }
+  }
+
+  test("table creation with ambiguous paths only allowed with legacy flag") {
+    // ambiguous paths not allowed
+    withTempDir {
+      foo =>
+        withTempDir {
+          bar =>
+            val fooPath = foo.getCanonicalPath()
+            val barPath = bar.getCanonicalPath()
+            val e = intercept[AnalysisException] {
+              sql(s"CREATE TABLE delta.`$fooPath`(id LONG) USING delta 
LOCATION '$barPath'")
+            }
+            
assert(e.message.contains("legacy.allowAmbiguousPathsInCreateTable"))
+        }
+    }
+
+    // allowed with legacy flag
+    withTempDir {
+      foo =>
+        withTempDir {
+          bar =>
+            val fooPath = foo.getCanonicalPath()
+            val barPath = bar.getCanonicalPath()
+            withSQLConf(DeltaSQLConf.DELTA_LEGACY_ALLOW_AMBIGUOUS_PATHS.key -> 
"true") {
+              sql(s"CREATE TABLE delta.`$fooPath`(id LONG) USING delta 
LOCATION '$barPath'")
+              assert(io.delta.tables.DeltaTable.isDeltaTable(fooPath))
+              assert(!io.delta.tables.DeltaTable.isDeltaTable(barPath))
+            }
+        }
+    }
+
+    // allowed if paths are the same
+    withTempDir {
+      foo =>
+        val fooPath = foo.getCanonicalPath()
+        sql(s"CREATE TABLE delta.`$fooPath`(id LONG) USING delta LOCATION 
'$fooPath'")
+        assert(io.delta.tables.DeltaTable.isDeltaTable(fooPath))
+    }
+  }
+
+  test("append table when column name with special chars") {
+    withTable("t") {
+      val schema = new StructType().add("a`b", "int")
+      val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
+      df.write.format("delta").saveAsTable("t")
+      df.write.format("delta").mode("append").saveAsTable("t")
+      assert(spark.table("t").collect().isEmpty)
+    }
+  }
+
+  test("CREATE TABLE with OPTIONS") {
+    withTempPath {
+      path =>
+        spark.range(10).write.format("delta").save(path.getCanonicalPath)
+        withTable("t") {
+          def createTableWithOptions(simulateUC: Boolean): Unit = {
+            sql(s"""
+                   |CREATE TABLE t USING delta LOCATION 
'fake://${path.getCanonicalPath}'
+                   |${if (simulateUC) "TBLPROPERTIES (test.simulateUC=true)" 
else ""}
+                   |OPTIONS (
+                   |  fs.fake.impl='${classOf[FakeFileSystem].getName}',
+                   |  fs.fake.impl.disable.cache=true)
+                   |""".stripMargin)
+          }
+          
intercept[UnsupportedFileSystemException](createTableWithOptions(false))
+          createTableWithOptions(true)
+        }
+    }
+  }
+}
+
+class DeltaDDLNameColumnMappingSuite extends DeltaDDLSuite with 
DeltaColumnMappingEnableNameMode {
+
+  override protected def runOnlyTests = Seq(
+    "create table with NOT NULL - check violation through file writing",
+    "ALTER TABLE CHANGE COLUMN with nullability change in struct type - 
relaxed"
+  )
+}
+
+abstract class DeltaDDLTestBase extends QueryTest with DeltaSQLTestUtils {
+  import testImplicits._
+
+  protected def verifyDescribeTable(tblName: String): Unit = {
+    val res = sql(s"DESCRIBE TABLE $tblName").collect()
+    assert(res.takeRight(2).map(_.getString(0)) === Seq("name", "dept"))
+  }
+
+  protected def verifyNullabilityFailure(exception: AnalysisException): Unit
+
+  protected def getDeltaLog(tableLocation: String): DeltaLog = {
+    DeltaLog.forTable(spark, tableLocation)
+  }
+
+  testQuietly("create table with NOT NULL - check violation through file 
writing") {
+    withTempDir {
+      dir =>
+        withTable("delta_test") {
+          sql(s"""
+                 |CREATE TABLE delta_test(a LONG, b String NOT NULL)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+          val expectedSchema = new StructType()
+            .add("a", LongType, nullable = true)
+            .add("b", StringType, nullable = false)
+          assert(spark.table("delta_test").schema === expectedSchema)
+
+          val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("delta_test"))
+          assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+          Seq((1L, "a"))
+            .toDF("a", "b")
+            .write
+            .format("delta")
+            .mode("append")
+            .save(table.location.toString)
+          val read = spark.read.format("delta").load(table.location.toString)
+          checkAnswer(read, Seq(Row(1L, "a")))
+
+          intercept[InvariantViolationException] {
+            Seq((2L, null))
+              .toDF("a", "b")
+              .write
+              .format("delta")
+              .mode("append")
+              .save(table.location.getPath)
+          }
+        }
+    }
+  }
+
+  test("ALTER TABLE ADD COLUMNS with NOT NULL - not supported") {
+    withTempDir {
+      dir =>
+        val tableName = "delta_test_add_not_null"
+        withTable(tableName) {
+          sql(s"""
+                 |CREATE TABLE $tableName(a LONG)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+
+          val expectedSchema = new StructType().add("a", LongType, nullable = 
true)
+          assert(spark.table(tableName).schema === expectedSchema)
+
+          val e = intercept[AnalysisException] {
+            sql(s"""
+                   |ALTER TABLE $tableName
+                   |ADD COLUMNS (b String NOT NULL, c Int)""".stripMargin)
+          }
+          val msg = "`NOT NULL in ALTER TABLE ADD COLUMNS` is not supported 
for Delta tables"
+          assert(e.getMessage.contains(msg))
+        }
+    }
+  }
+
+  test("ALTER TABLE CHANGE COLUMN from nullable to NOT NULL - not supported") {
+    withTempDir {
+      dir =>
+        val tableName = "delta_test_from_nullable_to_not_null"
+        withTable(tableName) {
+          sql(s"""
+                 |CREATE TABLE $tableName(a LONG, b String)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+
+          val expectedSchema = new StructType()
+            .add("a", LongType, nullable = true)
+            .add("b", StringType, nullable = true)
+          assert(spark.table(tableName).schema === expectedSchema)
+
+          val e = intercept[AnalysisException] {
+            sql(s"""
+                   |ALTER TABLE $tableName
+                   |CHANGE COLUMN b b String NOT NULL""".stripMargin)
+          }
+          verifyNullabilityFailure(e)
+        }
+    }
+  }
+
+  test("ALTER TABLE CHANGE COLUMN from NOT NULL to nullable") {
+    withTempDir {
+      dir =>
+        val tableName = "delta_test_not_null_to_nullable"
+        withTable(tableName) {
+          sql(s"""
+                 |CREATE TABLE $tableName(a LONG NOT NULL, b String)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+
+          val expectedSchema = new StructType()
+            .add("a", LongType, nullable = false)
+            .add("b", StringType, nullable = true)
+          assert(spark.table(tableName).schema === expectedSchema)
+
+          sql(s"INSERT INTO $tableName SELECT 1, 'a'")
+          checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1L, "a")))
+
+          sql(s"""
+                 |ALTER TABLE $tableName
+                 |ALTER COLUMN a DROP NOT NULL""".stripMargin)
+          val expectedSchema2 = new StructType()
+            .add("a", LongType, nullable = true)
+            .add("b", StringType, nullable = true)
+          assert(spark.table(tableName).schema === expectedSchema2)
+
+          sql(s"INSERT INTO $tableName SELECT NULL, 'b'")
+          checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1L, "a"), 
Row(null, "b")))
+        }
+    }
+  }
+
+  testQuietly("create table with NOT NULL - check violation through SQL") {
+    withTempDir {
+      dir =>
+        withTable("delta_test") {
+          sql(s"""
+                 |CREATE TABLE delta_test(a LONG, b String NOT NULL)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+          val expectedSchema = new StructType()
+            .add("a", LongType, nullable = true)
+            .add("b", StringType, nullable = false)
+          assert(spark.table("delta_test").schema === expectedSchema)
+
+          sql("INSERT INTO delta_test SELECT 1, 'a'")
+          checkAnswer(sql("SELECT * FROM delta_test"), Seq(Row(1L, "a")))
+
+          val e = intercept[InvariantViolationException] {
+            sql("INSERT INTO delta_test VALUES (2, null)")
+          }
+          if (!e.getMessage.contains("nullable values to non-null column")) {
+            verifyInvariantViolationException(e)
+          }
+        }
+    }
+  }
+
+  testQuietly("create table with NOT NULL in struct type - check violation") {
+    withTempDir {
+      dir =>
+        withTable("delta_test") {
+          sql(s"""
+                 |CREATE TABLE delta_test
+                 |(x struct<a: LONG, b: String NOT NULL>, y LONG)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+          val expectedSchema = new StructType()
+            .add(
+              "x",
+              new StructType()
+                .add("a", LongType, nullable = true)
+                .add("b", StringType, nullable = false))
+            .add("y", LongType, nullable = true)
+          assert(spark.table("delta_test").schema === expectedSchema)
+
+          sql("INSERT INTO delta_test SELECT (1, 'a'), 1")
+          checkAnswer(sql("SELECT * FROM delta_test"), Seq(Row(Row(1L, "a"), 
1)))
+
+          val table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("delta_test"))
+          assert(table.location == makeQualifiedPath(dir.getAbsolutePath))
+
+          val schema = new StructType()
+            .add(
+              "x",
+              new StructType()
+                .add("a", "bigint")
+                .add("b", "string"))
+            .add("y", "bigint")
+          val e = intercept[InvariantViolationException] {
+            spark
+              .createDataFrame(
+                Seq(Row(Row(2L, null), 2L)).asJava,
+                schema
+              )
+              .write
+              .format("delta")
+              .mode("append")
+              .save(table.location.getPath)
+          }
+          verifyInvariantViolationException(e)
+        }
+    }
+  }
+
+  test("ALTER TABLE ADD COLUMNS with NOT NULL in struct type - not supported") 
{
+    withTempDir {
+      dir =>
+        val tableName = "delta_test_not_null_struct"
+        withTable(tableName) {
+          sql(s"""
+                 |CREATE TABLE $tableName
+                 |(y LONG)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+          val expectedSchema = new StructType()
+            .add("y", LongType, nullable = true)
+          assert(spark.table(tableName).schema === expectedSchema)
+
+          val e = intercept[AnalysisException] {
+            sql(s"""
+                   |ALTER TABLE $tableName
+                   |ADD COLUMNS (x struct<a: LONG, b: String NOT NULL>, z 
INT)""".stripMargin)
+          }
+          val msg = "Operation not allowed: " +
+            "`NOT NULL in ALTER TABLE ADD COLUMNS` is not supported for Delta 
tables"
+          assert(e.getMessage.contains(msg))
+        }
+    }
+  }
+
+  test("ALTER TABLE ADD COLUMNS to table with existing NOT NULL fields") {
+    withTempDir {
+      dir =>
+        val tableName = "delta_test_existing_not_null"
+        withTable(tableName) {
+          sql(s"""
+                 |CREATE TABLE $tableName
+                 |(y LONG NOT NULL)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+          val expectedSchema = new StructType()
+            .add("y", LongType, nullable = false)
+          assert(spark.table(tableName).schema === expectedSchema)
+
+          sql(s"""
+                 |ALTER TABLE $tableName
+                 |ADD COLUMNS (x struct<a: LONG, b: String>, z 
INT)""".stripMargin)
+          val expectedSchema2 = new StructType()
+            .add("y", LongType, nullable = false)
+            .add(
+              "x",
+              new StructType()
+                .add("a", LongType)
+                .add("b", StringType))
+            .add("z", IntegerType)
+          assert(spark.table(tableName).schema === expectedSchema2)
+        }
+    }
+  }
+
+  /**
+   * Covers adding and changing a nested field using the ALTER TABLE command.
+   * @param initialColumnType
+   *   Type of the single column used to create the initial test table.
+   * @param fieldToAdd
+   *   Tuple (name, type) of the nested field to add and change.
+   * @param updatedColumnType
+   *   Expected type of the single column after adding the nested field.
+   */
+  def testAlterTableNestedFields(testName: String)(
+      initialColumnType: String,
+      fieldToAdd: (String, String),
+      updatedColumnType: String): Unit = {
+    // Remove spaces in test name so we can re-use it as a unique table name.
+    val tableName = testName.replaceAll(" ", "")
+    test(s"ALTER TABLE ADD/CHANGE COLUMNS - nested $testName") {
+      withTempDir {
+        dir =>
+          withTable(tableName) {
+            sql(s"""
+                   |CREATE TABLE $tableName (data $initialColumnType)
+                   |USING delta
+                   |TBLPROPERTIES (${DeltaConfigs.COLUMN_MAPPING_MODE.key} = 
'name')
+                   |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+
+            val expectedInitialType = 
initialColumnType.filterNot(_.isWhitespace)
+            val expectedUpdatedType = 
updatedColumnType.filterNot(_.isWhitespace)
+            val fieldName = s"data.${fieldToAdd._1}"
+            val fieldType = fieldToAdd._2
+
+            def columnType: DataFrame =
+              sql(s"DESCRIBE TABLE $tableName")
+                .where("col_name = 'data'")
+                .select("data_type")
+            checkAnswer(columnType, Row(expectedInitialType))
+
+            sql(s"ALTER TABLE $tableName ADD COLUMNS ($fieldName $fieldType)")
+            checkAnswer(columnType, Row(expectedUpdatedType))
+
+            sql(s"ALTER TABLE $tableName CHANGE COLUMN $fieldName TYPE 
$fieldType")
+            checkAnswer(columnType, Row(expectedUpdatedType))
+          }
+      }
+    }
+  }
+
+  testAlterTableNestedFields("struct in map key")(
+    initialColumnType = "map<struct<a: int>, int>",
+    fieldToAdd = "key.b" -> "string",
+    updatedColumnType = "map<struct<a: int, b: string>, int>")
+
+  testAlterTableNestedFields("struct in map value")(
+    initialColumnType = "map<int, struct<a: int>>",
+    fieldToAdd = "value.b" -> "string",
+    updatedColumnType = "map<int, struct<a: int, b: string>>")
+
+  testAlterTableNestedFields("struct in array")(
+    initialColumnType = "array<struct<a: int>>",
+    fieldToAdd = "element.b" -> "string",
+    updatedColumnType = "array<struct<a: int, b: string>>")
+
+  testAlterTableNestedFields("struct in nested map keys")(
+    initialColumnType = "map<map<struct<a: int>, int>, int>",
+    fieldToAdd = "key.key.b" -> "string",
+    updatedColumnType = "map<map<struct<a: int, b: string>, int>, int>"
+  )
+
+  testAlterTableNestedFields("struct in nested map values")(
+    initialColumnType = "map<int, map<int, struct<a: int>>>",
+    fieldToAdd = "value.value.b" -> "string",
+    updatedColumnType = "map<int, map<int, struct<a: int, b: string>>>"
+  )
+
+  testAlterTableNestedFields("struct in nested arrays")(
+    initialColumnType = "array<array<struct<a: int>>>",
+    fieldToAdd = "element.element.b" -> "string",
+    updatedColumnType = "array<array<struct<a: int, b: string>>>")
+
+  testAlterTableNestedFields("struct in nested array and map")(
+    initialColumnType = "array<map<int, struct<a: int>>>",
+    fieldToAdd = "element.value.b" -> "string",
+    updatedColumnType = "array<map<int, struct<a: int, b: string>>>"
+  )
+
+  testAlterTableNestedFields("struct in nested map key and array")(
+    initialColumnType = "map<array<struct<a: int>>, int>",
+    fieldToAdd = "key.element.b" -> "string",
+    updatedColumnType = "map<array<struct<a: int, b: string>>, int>"
+  )
+
+  testAlterTableNestedFields("struct in nested map value and array")(
+    initialColumnType = "map<int, array<struct<a: int>>>",
+    fieldToAdd = "value.element.b" -> "string",
+    updatedColumnType = "map<int, array<struct<a: int, b: string>>>"
+  )
+
+  test("ALTER TABLE CHANGE COLUMN with nullability change in struct type - not 
supported") {
+    withTempDir {
+      dir =>
+        val tableName = "not_supported_delta_test"
+        withTable(tableName) {
+          sql(s"""
+                 |CREATE TABLE $tableName
+                 |(x struct<a: LONG, b: String>, y LONG)
+                 |USING delta
+                 |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+          val expectedSchema = new StructType()
+            .add(
+              "x",
+              new StructType()
+                .add("a", LongType)
+                .add("b", StringType))
+            .add("y", LongType, nullable = true)
+          assert(spark.table(tableName).schema === expectedSchema)
+
+          val e1 = intercept[AnalysisException] {
+            sql(s"""
+                   |ALTER TABLE $tableName
+                   |CHANGE COLUMN x x struct<a: LONG, b: String NOT 
NULL>""".stripMargin)
+          }
+          assert(e1.getMessage.contains("Cannot update"))
+          val e2 = intercept[AnalysisException] {
+            sql(s"""
+                   |ALTER TABLE $tableName
+                   |CHANGE COLUMN x.b b String NOT NULL""".stripMargin) // 
this syntax may change
+          }
+          verifyNullabilityFailure(e2)
+        }
+    }
+  }
+
+  test("ALTER TABLE CHANGE COLUMN with nullability change in struct type - 
relaxed") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      withTempDir {
+        dir =>
+          val tblName = "delta_test2"
+          withTable(tblName) {
+            sql(s"""
+                   |CREATE TABLE $tblName
+                   |(x struct<a: LONG, b: String NOT NULL> NOT NULL, y LONG)
+                   |USING delta
+                   |OPTIONS('path'='${dir.getCanonicalPath}')""".stripMargin)
+            val expectedSchema = new StructType()
+              .add(
+                "x",
+                new StructType()
+                  .add("a", LongType)
+                  .add("b", StringType, nullable = false),
+                nullable = false)
+              .add("y", LongType)
+            assert(spark.table(tblName).schema === expectedSchema)
+            sql(s"INSERT INTO $tblName SELECT (1, 'a'), 1")
+            checkAnswer(sql(s"SELECT * FROM $tblName"), Seq(Row(Row(1L, "a"), 
1)))
+
+            sql(s"""
+                   |ALTER TABLE $tblName
+                   |ALTER COLUMN x.b DROP NOT NULL""".stripMargin) // relax 
nullability
+            sql(s"INSERT INTO $tblName SELECT (2, null), null")
+            checkAnswer(
+              sql(s"SELECT * FROM $tblName"),
+              Seq(Row(Row(1L, "a"), 1), Row(Row(2L, null), null)))
+
+            sql(s"""
+                   |ALTER TABLE $tblName
+                   |ALTER COLUMN x DROP NOT NULL""".stripMargin)
+            sql(s"INSERT INTO $tblName SELECT null, 3")
+            checkAnswer(
+              sql(s"SELECT * FROM $tblName"),
+              Seq(Row(Row(1L, "a"), 1), Row(Row(2L, null), null), Row(null, 
3)))
+          }
+      }
+    }
+  }
+
+  private def verifyInvariantViolationException(e: 
InvariantViolationException): Unit = {
+    if (e == null) {
+      fail("Didn't receive a InvariantViolationException.")
+    }
+    assert(e.getMessage.contains("NOT NULL constraint violated for column"))
+  }
+
+  test("ALTER TABLE RENAME TO") {
+    withTable("tbl", "newTbl") {
+      sql(s"""
+             |CREATE TABLE tbl
+             |USING delta
+             |AS SELECT 1 as a, 'a' as b
+           """.stripMargin)
+      sql(s"ALTER TABLE tbl RENAME TO newTbl")
+      checkDatasetUnorderly(sql("SELECT * FROM newTbl").as[(Long, String)], 1L 
-> "a")
+    }
+  }
+
+  /**
+   * Although Spark 3.2 adds the support for SHOW CREATE TABLE for v2 tables, 
it doesn't work
+   * properly for Delta. For example, table properties, constraints and 
generated columns are not
+   * showed properly.
+   *
+   * TODO Implement Delta's own ShowCreateTableCommand to show the Delta table 
definition correctly
+   */
+  test("SHOW CREATE TABLE is not supported") {
+    withTable("delta_test") {
+      sql(s"""
+             |CREATE TABLE delta_test(a LONG, b String)
+             |USING delta
+           """.stripMargin)
+
+      val e = intercept[AnalysisException] {
+        sql("SHOW CREATE TABLE delta_test").collect()(0).getString(0)
+      }
+      assert(e.message.contains("`SHOW CREATE TABLE` is not supported for 
Delta table"))
+    }
+
+    withTempDir {
+      dir =>
+        withTable("delta_test") {
+          val path = dir.getCanonicalPath()
+          sql(s"""
+                 |CREATE TABLE delta_test(a LONG, b String)
+                 |USING delta
+                 |LOCATION '$path'
+             """.stripMargin)
+
+          val e = intercept[AnalysisException] {
+            sql("SHOW CREATE TABLE delta_test").collect()(0).getString(0)
+          }
+          assert(e.message.contains("`SHOW CREATE TABLE` is not supported for 
Delta table"))
+        }
+    }
+  }
+
+  test("DESCRIBE TABLE for partitioned table") {
+    withTempDir {
+      dir =>
+        withTable("delta_test") {
+          val path = dir.getCanonicalPath()
+
+          val df =
+            Seq((1, "IT", "Alice"), (2, "CS", "Bob"), (3, "IT", 
"Carol")).toDF("id", "dept", "name")
+          df.write.format("delta").partitionBy("name", "dept").save(path)
+
+          sql(s"CREATE TABLE delta_test USING delta LOCATION '$path'")
+
+          verifyDescribeTable("delta_test")
+          verifyDescribeTable(s"delta.`$path`")
+
+          assert(sql("DESCRIBE EXTENDED delta_test").collect().length > 0)
+        }
+    }
+  }
+
+  test("snapshot returned after a dropped managed table should be empty") {
+    withTable("delta_test") {
+      sql("CREATE TABLE delta_test USING delta AS SELECT 'foo' as a")
+      val tableLocation = sql("DESC DETAIL 
delta_test").select("location").as[String].head()
+      val snapshotBefore = getDeltaLog(tableLocation).update()
+      sql("DROP TABLE delta_test")
+      val snapshotAfter = getDeltaLog(tableLocation).update()
+      assert(snapshotBefore ne snapshotAfter)
+      assert(snapshotAfter.version === -1)
+    }
+  }
+
+  test("snapshot returned after renaming a managed table should be empty") {
+    val oldTableName = "oldTableName"
+    val newTableName = "newTableName"
+    withTable(oldTableName, newTableName) {
+      sql(s"CREATE TABLE $oldTableName USING delta AS SELECT 'foo' as a")
+      val tableLocation = sql(s"DESC DETAIL 
$oldTableName").select("location").as[String].head()
+      val snapshotBefore = getDeltaLog(tableLocation).update()
+      sql(s"ALTER TABLE $oldTableName RENAME TO $newTableName")
+      val snapshotAfter = getDeltaLog(tableLocation).update()
+      assert(snapshotBefore ne snapshotAfter)
+      assert(snapshotAfter.version === -1)
+    }
+  }
+
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
new file mode 100644
index 0000000000..c18e98f7f3
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
@@ -0,0 +1,2107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+// scalastyle:off import.ordering.noEmptyLine
+import org.apache.spark.SparkException
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.parser.ParseException
+import 
org.apache.spark.sql.delta.DeltaInsertIntoTableSuiteShims.{INSERT_INTO_TMP_VIEW_ERROR_MSG,
 INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG}
+import org.apache.spark.sql.delta.schema.{InvariantViolationException, 
SchemaUtils}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.{DeltaColumnMappingSelectedTestMixin, 
DeltaSQLCommandTest}
+import org.apache.spark.sql.functions.{lit, struct}
+import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
+import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, 
PartitionOverwriteMode}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+import org.scalatest.BeforeAndAfter
+
+import java.io.File
+import java.util.TimeZone
+
+import scala.collection.JavaConverters._
+
+class DeltaInsertIntoSQLSuite
+  extends DeltaInsertIntoTestsWithTempViews(
+    supportsDynamicOverwrite = true,
+    includeSQLOnlyTests = true)
+  with DeltaSQLCommandTest
+  with DeltaExcludedBySparkVersionTestMixinShims {
+
+  override protected def doInsert(tableName: String, insert: DataFrame, mode: 
SaveMode): Unit = {
+    val tmpView = "tmp_view"
+    withTempView(tmpView) {
+      insert.createOrReplaceTempView(tmpView)
+      val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+      sql(s"INSERT $overwrite TABLE $tableName SELECT * FROM $tmpView")
+    }
+  }
+
+  testSparkMasterOnly("Variant type") {
+    withTable("t") {
+      sql("CREATE TABLE t (id LONG, v VARIANT) USING delta")
+      sql("INSERT INTO t (id, v) VALUES (1, parse_json('{\"a\": 1}'))")
+      sql("INSERT INTO t (id, v) VALUES (2, parse_json('{\"b\": 2}'))")
+      sql("INSERT INTO t SELECT id, parse_json(cast(id as string)) v FROM 
range(2)")
+
+      checkAnswer(
+        sql("select * from t").selectExpr("id", "to_json(v)"),
+        Seq(Row(1, "{\"a\":1}"), Row(2, "{\"b\":2}"), Row(0, "0"), Row(1, 
"1")))
+    }
+  }
+
+  test("insert overwrite should work with selecting constants") {
+    withTable("t1") {
+      sql("CREATE TABLE t1 (a int, b int, c int) USING delta PARTITIONED BY 
(b, c)")
+      sql("INSERT OVERWRITE TABLE t1 PARTITION (c=3) SELECT 1, 2")
+      checkAnswer(
+        sql("SELECT * FROM t1"),
+        Row(1, 2, 3) :: Nil
+      )
+      sql("INSERT OVERWRITE TABLE t1 PARTITION (b=2, c=3) SELECT 1")
+      checkAnswer(
+        sql("SELECT * FROM t1"),
+        Row(1, 2, 3) :: Nil
+      )
+      sql("INSERT OVERWRITE TABLE t1 PARTITION (b=2, c) SELECT 1, 3")
+      checkAnswer(
+        sql("SELECT * FROM t1"),
+        Row(1, 2, 3) :: Nil
+      )
+    }
+  }
+
+  test("insertInto: append by name") {
+    import testImplicits._
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+      sql(s"INSERT INTO $t1(id, data) VALUES(1L, 'a')")
+      // Can be in a different order
+      sql(s"INSERT INTO $t1(data, id) VALUES('b', 2L)")
+      // Can be casted automatically
+      sql(s"INSERT INTO $t1(data, id) VALUES('c', 3)")
+      verifyTable(t1, df)
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        // Missing columns
+        assert(intercept[AnalysisException] {
+          sql(s"INSERT INTO $t1(data) VALUES(4)")
+        }.getMessage.contains("Column id is not specified in INSERT"))
+        // Missing columns with matching dataType
+        assert(intercept[AnalysisException] {
+          sql(s"INSERT INTO $t1(data) VALUES('b')")
+        }.getMessage.contains("Column id is not specified in INSERT"))
+      }
+      // Duplicate columns
+      assert(
+        intercept[AnalysisException](
+          sql(s"INSERT INTO $t1(data, data) VALUES(5)")).getMessage.nonEmpty)
+    }
+  }
+
+  test("insertInto: overwrite by name") {
+    import testImplicits._
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+      sql(s"INSERT OVERWRITE $t1(id, data) VALUES(1L, 'a')")
+      verifyTable(t1, Seq((1L, "a")).toDF("id", "data"))
+      // Can be in a different order
+      sql(s"INSERT OVERWRITE $t1(data, id) VALUES('b', 2L)")
+      verifyTable(t1, Seq((2L, "b")).toDF("id", "data"))
+      // Can be casted automatically
+      sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 3)")
+      verifyTable(t1, Seq((3L, "c")).toDF("id", "data"))
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        // Missing columns
+        assert(intercept[AnalysisException] {
+          sql(s"INSERT OVERWRITE $t1(data) VALUES(4)")
+        }.getMessage.contains("Column id is not specified in INSERT"))
+        // Missing columns with matching datatype
+        assert(intercept[AnalysisException] {
+          sql(s"INSERT OVERWRITE $t1(data) VALUES(4L)")
+        }.getMessage.contains("Column id is not specified in INSERT"))
+      }
+      // Duplicate columns
+      assert(
+        intercept[AnalysisException](
+          sql(s"INSERT OVERWRITE $t1(data, data) 
VALUES(5)")).getMessage.nonEmpty)
+    }
+  }
+
+  test("insertInto should throw an AnalysisError on name mismatch") {
+    def testInsertByNameError(targetSchema: String, expectedErrorClass: 
String): Unit = {
+      val sourceTableName = "source"
+      val targetTableName = "target"
+      val format = "delta"
+      withTable(sourceTableName, targetTableName) {
+        sql(s"CREATE TABLE $sourceTableName (a int, b int) USING $format")
+        sql(s"CREATE TABLE $targetTableName $targetSchema USING $format")
+        val e = intercept[AnalysisException] {
+          sql(s"INSERT INTO $targetTableName BY NAME SELECT * FROM 
$sourceTableName")
+        }
+        assert(e.getErrorClass === expectedErrorClass)
+      }
+    }
+
+    // NOTE: We use upper case in the target schema so that 
needsSchemaAdjustmentByName returns
+    // true (due to case sensitivity) so that we call 
resolveQueryColumnsByName and hit the right
+    // code path.
+
+    // when the number of columns does not match, throw an arity mismatch 
error.
+    testInsertByNameError(
+      targetSchema = "(A int)",
+      expectedErrorClass = 
"INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS")
+
+    // when the number of columns matches, but the names do not, throw a 
missing column error.
+    testInsertByNameError(
+      targetSchema = "(A int, c int)",
+      expectedErrorClass = "DELTA_MISSING_COLUMN")
+  }
+
+  dynamicOverwriteTest("insertInto: dynamic overwrite by name") {
+    import testImplicits._
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(
+        s"CREATE TABLE $t1 (id bigint, data string, data2 string) " +
+          s"USING $v2Format PARTITIONED BY (id)")
+      sql(s"INSERT OVERWRITE $t1(id, data, data2) VALUES(1L, 'a', 'b')")
+      verifyTable(t1, Seq((1L, "a", "b")).toDF("id", "data", "data2"))
+      // Can be in a different order
+      sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('b', 'd', 2L)")
+      verifyTable(t1, Seq((1L, "a", "b"), (2L, "b", "d")).toDF("id", "data", 
"data2"))
+      // Can be casted automatically
+      sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('c', 'e', 1)")
+      verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data", 
"data2"))
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        // Missing columns
+        assert(intercept[AnalysisException] {
+          sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 1)")
+        }.getMessage.contains("Column data2 is not specified in INSERT"))
+        // Missing columns with matching datatype
+        assert(intercept[AnalysisException] {
+          sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 1L)")
+        }.getMessage.contains("Column data2 is not specified in INSERT"))
+      }
+      // Duplicate columns
+      assert(
+        intercept[AnalysisException](
+          sql(s"INSERT OVERWRITE $t1(data, data) 
VALUES(5)")).getMessage.nonEmpty)
+    }
+  }
+
+  test("insertInto: static partition column name should not be used in the 
column list") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c string) USING $v2Format PARTITIONED BY 
(c)")
+      checkError(
+        intercept[AnalysisException] {
+          sql("INSERT OVERWRITE t PARTITION (c='1') (c) VALUES ('2')")
+        },
+        "STATIC_PARTITION_COLUMN_IN_INSERT_COLUMN_LIST",
+        parameters = Map("staticName" -> "c")
+      )
+    }
+  }
+
+  Seq(("ordinal", ""), ("name", "(id, col2, col)")).foreach {
+    case (testName, values) =>
+      test(s"INSERT OVERWRITE schema evolution works for array struct types - 
$testName") {
+        val sourceSchema =
+          "id INT, col2 STRING, col ARRAY<STRUCT<f1: STRING, f2: STRING, f3: 
DATE>>"
+        val sourceRecord = "1, '2022-11-01', array(struct('s1', 's2', 
DATE'2022-11-01'))"
+        val targetSchema = "id INT, col2 DATE, col ARRAY<STRUCT<f1: STRING, 
f2: STRING>>"
+        val targetRecord = "1, DATE'2022-11-02', array(struct('t1', 't2'))"
+
+        runInsertOverwrite(sourceSchema, sourceRecord, targetSchema, 
targetRecord) {
+          (sourceTable, targetTable) =>
+            sql(s"INSERT OVERWRITE $targetTable $values SELECT * FROM 
$sourceTable")
+
+            // make sure table is still writeable
+            sql(s"""INSERT INTO $targetTable VALUES (2, DATE'2022-11-02',
+                   | array(struct('s3', 's4', 
DATE'2022-11-02')))""".stripMargin)
+            sql(s"""INSERT INTO $targetTable VALUES (3, DATE'2022-11-03',
+                   |array(struct('s5', 's6', NULL)))""".stripMargin)
+            val df = spark.sql("""SELECT 1 as id, DATE'2022-11-01' as col2,
+                                 | array(struct('s1', 's2', DATE'2022-11-01')) 
as col UNION
+                                 | SELECT 2 as id, DATE'2022-11-02' as col2,
+                                 | array(struct('s3', 's4', DATE'2022-11-02')) 
as col UNION
+                                 | SELECT 3 as id, DATE'2022-11-03' as col2,
+                                 | array(struct('s5', 's6', NULL)) as 
col""".stripMargin)
+            verifyTable(targetTable, df)
+        }
+      }
+  }
+
+  Seq(("ordinal", ""), ("name", "(id, col2, col)")).foreach {
+    case (testName, values) =>
+      test(s"INSERT OVERWRITE schema evolution works for array nested types - 
$testName") {
+        val sourceSchema = "id INT, col2 STRING, " +
+          "col ARRAY<STRUCT<f1: INT, f2: STRUCT<f21: STRING, f22: DATE>, f3: 
STRUCT<f31: STRING>>>"
+        val sourceRecord = "1, '2022-11-01', " +
+          "array(struct(1, struct('s1', DATE'2022-11-01'), struct('s1')))"
+        val targetSchema = "id INT, col2 DATE, col ARRAY<STRUCT<f1: INT, f2: 
STRUCT<f21: STRING>>>"
+        val targetRecord = "2, DATE'2022-11-02', array(struct(2, 
struct('s2')))"
+
+        runInsertOverwrite(sourceSchema, sourceRecord, targetSchema, 
targetRecord) {
+          (sourceTable, targetTable) =>
+            sql(s"INSERT OVERWRITE $targetTable $values SELECT * FROM 
$sourceTable")
+
+            // make sure table is still writeable
+            sql(s"""INSERT INTO $targetTable VALUES (2, DATE'2022-11-02',
+                   | array(struct(2, struct('s2', DATE'2022-11-02'), 
struct('s2'))))""".stripMargin)
+            sql(s"""INSERT INTO $targetTable VALUES (3, DATE'2022-11-03',
+                   | array(struct(3, struct('s3', NULL), 
struct(NULL))))""".stripMargin)
+            val df = spark.sql(
+              """SELECT 1 as id, DATE'2022-11-01' as col2,
+                | array(struct(1, struct('s1', DATE'2022-11-01'), 
struct('s1'))) as col UNION
+                | SELECT 2 as id, DATE'2022-11-02' as col2,
+                | array(struct(2, struct('s2', DATE'2022-11-02'), 
struct('s2'))) as col UNION
+                | SELECT 3 as id, DATE'2022-11-03' as col2,
+                | array(struct(3, struct('s3', NULL), struct(NULL))) as col
+                |""".stripMargin)
+            verifyTable(targetTable, df)
+        }
+      }
+  }
+
+  // Schema evolution for complex map type
+  test("insertInto schema evolution with map type - append mode: field 
renaming + new field") {
+    withTable("map_schema_evolution") {
+      val tableName = "map_schema_evolution"
+      val initialSchema = StructType(
+        Seq(
+          StructField("key", IntegerType, nullable = false),
+          StructField(
+            "metrics",
+            MapType(
+              StringType,
+              StructType(
+                Seq(
+                  StructField("id", IntegerType, nullable = false),
+                  StructField("value", IntegerType, nullable = false)
+                ))))
+        ))
+
+      val initialData = Seq(
+        Row(1, Map("event" -> Row(1, 1)))
+      )
+
+      val initialRdd = spark.sparkContext.parallelize(initialData)
+      val initialDf = spark.createDataFrame(initialRdd, initialSchema)
+
+      // Write initial data
+      initialDf.write
+        .option("overwriteSchema", "true")
+        .mode("overwrite")
+        .format("delta")
+        .saveAsTable(tableName)
+
+      // Evolved schema with field renamed and additional field in map struct
+      val evolvedSchema = StructType(
+        Seq(
+          StructField("renamed_key", IntegerType, nullable = false),
+          StructField(
+            "metrics",
+            MapType(
+              StringType,
+              StructType(
+                Seq(
+                  StructField("id", IntegerType, nullable = false),
+                  StructField("value", IntegerType, nullable = false),
+                  StructField("comment", StringType, nullable = true)
+                ))
+            )
+          )
+        ))
+
+      val evolvedData = Seq(
+        Row(1, Map("event" -> Row(1, 1, "deprecated")))
+      )
+
+      val evolvedRdd = spark.sparkContext.parallelize(evolvedData)
+      val evolvedDf = spark.createDataFrame(evolvedRdd, evolvedSchema)
+
+      // insert data without schema evolution
+      val err = intercept[AnalysisException] {
+        evolvedDf.write
+          .mode("append")
+          .format("delta")
+          .insertInto(tableName)
+      }
+      checkErrorMatchPVals(
+        exception = err,
+        "_LEGACY_ERROR_TEMP_DELTA_0007",
+        parameters = Map(
+          "message" -> "A schema mismatch detected when writing to the Delta 
table(.|\\n)*"
+        )
+      )
+
+      // insert data with schema evolution
+      withSQLConf("spark.databricks.delta.schema.autoMerge.enabled" -> "true") 
{
+        evolvedDf.write
+          .mode("append")
+          .format("delta")
+          .insertInto(tableName)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $tableName"),
+          Seq(
+            Row(1, Map("event" -> Row(1, 1, null))),
+            Row(1, Map("event" -> Row(1, 1, "deprecated")))
+          ))
+      }
+    }
+  }
+
+  test("not enough column in source to insert in nested map types") {
+    withTable("source", "target") {
+      sql("""CREATE TABLE source (
+            |  id INT,
+            |  metrics MAP<STRING, STRUCT<id: INT, value: INT>>
+            |) USING delta""".stripMargin)
+
+      sql("""CREATE TABLE target (
+            |  id INT,
+            |  metrics MAP<STRING, STRUCT<id: INT, value: INT, comment: 
STRING>>
+            |) USING delta""".stripMargin)
+
+      sql("INSERT INTO source VALUES (1, map('event', struct(1, 1)))")
+
+      val e = intercept[AnalysisException] {
+        sql("INSERT INTO target SELECT * FROM source")
+      }
+      checkError(
+        exception = e,
+        "DELTA_INSERT_COLUMN_ARITY_MISMATCH",
+        parameters = Map(
+          "tableName" -> "spark_catalog.default.target",
+          "columnName" -> "not enough nested fields in value",
+          "numColumns" -> "3",
+          "insertColumns" -> "2"
+        )
+      )
+    }
+  }
+
+  // not enough nested fields in value
+  test("more columns in source to insert in nested map types") {
+    withTable("source", "target") {
+      sql("""CREATE TABLE source (
+            |  id INT,
+            |  metrics MAP<STRING, STRUCT<id: INT, value: INT, comment: 
STRING>>
+            |) USING delta""".stripMargin)
+
+      sql("""CREATE TABLE target (
+            |  id INT,
+            |  metrics MAP<STRING, STRUCT<id: INT, value: INT>>
+            |) USING delta""".stripMargin)
+
+      sql("INSERT INTO source VALUES (1, map('event', struct(1, 1, 
'deprecated')))")
+
+      val e = intercept[AnalysisException] {
+        sql("INSERT INTO target SELECT * FROM source")
+      }
+      checkErrorMatchPVals(
+        exception = e,
+        "_LEGACY_ERROR_TEMP_DELTA_0007",
+        parameters = Map(
+          "message" -> "A schema mismatch detected when writing to the Delta 
table(.|\\n)*"
+        )
+      )
+
+      withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
+        sql("INSERT INTO target SELECT * FROM source")
+        checkAnswer(
+          spark.sql(s"SELECT * FROM source"),
+          Seq(
+            Row(1, Map("event" -> Row(1, 1, "deprecated")))
+          ))
+      }
+    }
+  }
+
+  test("more columns in source to insert in nested 2-level deep map types") {
+    withTable("source", "target") {
+      sql("""CREATE TABLE source (
+            |  id INT,
+            |  metrics MAP<STRING, MAP<STRING, STRUCT<id: INT, value: INT, 
comment: STRING>>>
+            |) USING delta""".stripMargin)
+
+      sql("""CREATE TABLE target (
+            |  id INT,
+            |  metrics MAP<STRING, MAP<STRING, STRUCT<id: INT, value: INT>>>
+            |) USING delta""".stripMargin)
+
+      sql("""INSERT INTO source VALUES
+            | (1, map('event', map('subEvent', struct(1, 1, 'deprecated'))))
+         """.stripMargin)
+
+      val e = intercept[AnalysisException] {
+        sql("INSERT INTO target SELECT * FROM source")
+      }
+      checkErrorMatchPVals(
+        exception = e,
+        "_LEGACY_ERROR_TEMP_DELTA_0007",
+        parameters = Map(
+          "message" -> "A schema mismatch detected when writing to the Delta 
table(.|\\n)*"
+        )
+      )
+
+      withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
+        sql("INSERT INTO target SELECT * FROM source")
+        checkAnswer(
+          spark.sql(s"SELECT * FROM source"),
+          Seq(
+            Row(1, Map("event" -> Map("subEvent" -> Row(1, 1, "deprecated"))))
+          ))
+      }
+    }
+  }
+
+  test("insert map type with different data type in key") {
+    withTable("source", "target") {
+      sql("""CREATE TABLE source (
+            |  id INT,
+            |  metrics MAP<STRING, STRUCT<id: INT, value: INT>>
+            |) USING delta""".stripMargin)
+
+      sql("""CREATE TABLE target (
+            |  id INT,
+            |  metrics MAP<INT, STRUCT<id: INT, value: INT>>
+            |) USING delta""".stripMargin)
+
+      sql("INSERT INTO source VALUES (1, map('1', struct(2, 3)))")
+
+      sql("INSERT INTO target SELECT * FROM source")
+
+      checkAnswer(
+        spark.sql("SELECT * FROM target"),
+        Seq(
+          Row(1, Map(1 -> Row(2, 3)))
+        ))
+    }
+  }
+
+  test("insert map type with different data type in value") {
+    withTable("source", "target") {
+      sql("""CREATE TABLE source (
+            |  id INT,
+            |  metrics MAP<STRING, STRUCT<id: INT, value: LONG>>
+            |) USING delta""".stripMargin)
+
+      sql("""CREATE TABLE target (
+            |  id INT,
+            |  metrics MAP<STRING, STRUCT<id: INT, value: INT>>
+            |) USING delta""".stripMargin)
+
+      sql("INSERT INTO source VALUES (1, map('m1', struct(2, 3L)))")
+
+      sql("INSERT INTO target SELECT * FROM source")
+
+      checkAnswer(
+        spark.sql("SELECT * FROM target"),
+        Seq(
+          Row(1, Map("m1" -> Row(2, 3)))
+        ))
+    }
+  }
+
+  def runInsertOverwrite(
+      sourceSchema: String,
+      sourceRecord: String,
+      targetSchema: String,
+      targetRecord: String)(runAndVerify: (String, String) => Unit): Unit = {
+    val sourceTable = "source"
+    val targetTable = "target"
+    withTable(sourceTable) {
+      withTable(targetTable) {
+        withSQLConf("spark.databricks.delta.schema.autoMerge.enabled" -> 
"true") {
+          // prepare source table
+          sql(s"""CREATE TABLE $sourceTable ($sourceSchema)
+                 | USING DELTA""".stripMargin)
+          sql(s"INSERT INTO $sourceTable VALUES ($sourceRecord)")
+          // prepare target table
+          sql(s"""CREATE TABLE $targetTable ($targetSchema)
+                 | USING DELTA""".stripMargin)
+          sql(s"INSERT INTO $targetTable VALUES ($targetRecord)")
+          runAndVerify(sourceTable, targetTable)
+        }
+      }
+    }
+  }
+}
+
+class DeltaInsertIntoSQLByPathSuite
+  extends DeltaInsertIntoTests(supportsDynamicOverwrite = true, 
includeSQLOnlyTests = true)
+  with DeltaSQLCommandTest {
+  override protected def doInsert(tableName: String, insert: DataFrame, mode: 
SaveMode): Unit = {
+    val tmpView = "tmp_view"
+    withTempView(tmpView) {
+      insert.createOrReplaceTempView(tmpView)
+      val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else "INTO"
+      val ident = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+      val catalogTable = spark.sessionState.catalog.getTableMetadata(ident)
+      sql(s"INSERT $overwrite TABLE delta.`${catalogTable.location}` SELECT * 
FROM $tmpView")
+    }
+  }
+
+  testQuietly("insertInto: cannot insert into a table that doesn't exist") {
+    import testImplicits._
+    Seq(SaveMode.Append, SaveMode.Overwrite).foreach {
+      mode =>
+        withTempDir {
+          dir =>
+            val t1 = s"delta.`${dir.getCanonicalPath}`"
+            val tmpView = "tmp_view"
+            withTempView(tmpView) {
+              val overwrite = if (mode == SaveMode.Overwrite) "OVERWRITE" else 
"INTO"
+              val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+              df.createOrReplaceTempView(tmpView)
+
+              intercept[AnalysisException] {
+                sql(s"INSERT $overwrite TABLE $t1 SELECT * FROM $tmpView")
+              }
+
+              assert(
+                new File(dir, "_delta_log").mkdirs(),
+                "Failed to create a _delta_log directory")
+              intercept[AnalysisException] {
+                sql(s"INSERT $overwrite TABLE $t1 SELECT * FROM $tmpView")
+              }
+            }
+        }
+    }
+  }
+}
+
+class DeltaInsertIntoDataFrameSuite
+  extends DeltaInsertIntoTestsWithTempViews(
+    supportsDynamicOverwrite = true,
+    includeSQLOnlyTests = false)
+  with DeltaSQLCommandTest {
+  override protected def doInsert(tableName: String, insert: DataFrame, mode: 
SaveMode): Unit = {
+    val dfw = insert.write.format(v2Format)
+    if (mode != null) {
+      dfw.mode(mode)
+    }
+    dfw.insertInto(tableName)
+  }
+}
+
+class DeltaInsertIntoDataFrameByPathSuite
+  extends DeltaInsertIntoTests(supportsDynamicOverwrite = true, 
includeSQLOnlyTests = false)
+  with DeltaSQLCommandTest {
+  override protected def doInsert(tableName: String, insert: DataFrame, mode: 
SaveMode): Unit = {
+    val dfw = insert.write.format(v2Format)
+    if (mode != null) {
+      dfw.mode(mode)
+    }
+    val ident = spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+    val catalogTable = spark.sessionState.catalog.getTableMetadata(ident)
+    dfw.insertInto(s"delta.`${catalogTable.location}`")
+  }
+
+  testQuietly("insertInto: cannot insert into a table that doesn't exist") {
+    import testImplicits._
+    Seq(SaveMode.Append, SaveMode.Overwrite).foreach {
+      mode =>
+        withTempDir {
+          dir =>
+            val t1 = s"delta.`${dir.getCanonicalPath}`"
+            val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+
+            intercept[AnalysisException] {
+              df.write.mode(mode).insertInto(t1)
+            }
+
+            assert(new File(dir, "_delta_log").mkdirs(), "Failed to create a 
_delta_log directory")
+            intercept[AnalysisException] {
+              df.write.mode(mode).insertInto(t1)
+            }
+
+            // Test DataFrameWriterV2 as well
+            val dfW2 = df.writeTo(t1)
+            if (mode == SaveMode.Append) {
+              intercept[AnalysisException] {
+                dfW2.append()
+              }
+            } else {
+              intercept[AnalysisException] {
+                dfW2.overwrite(lit(true))
+              }
+            }
+        }
+    }
+  }
+}
+
+trait DeltaInsertIntoColumnMappingSelectedTests extends 
DeltaColumnMappingSelectedTestMixin {
+  override protected def runOnlyTests = Seq(
+    "InsertInto: overwrite - mixed clause reordered - static mode",
+    "InsertInto: overwrite - multiple static partitions - dynamic mode"
+  )
+}
+
+class DeltaInsertIntoSQLNameColumnMappingSuite
+  extends DeltaInsertIntoSQLSuite
+  with DeltaColumnMappingEnableNameMode
+  with DeltaInsertIntoColumnMappingSelectedTests {
+  override protected def runOnlyTests: Seq[String] = super.runOnlyTests :+
+    "insert overwrite should work with selecting constants"
+}
+
+class DeltaInsertIntoSQLByPathNameColumnMappingSuite
+  extends DeltaInsertIntoSQLByPathSuite
+  with DeltaColumnMappingEnableNameMode
+  with DeltaInsertIntoColumnMappingSelectedTests
+
+class DeltaInsertIntoDataFrameNameColumnMappingSuite
+  extends DeltaInsertIntoDataFrameSuite
+  with DeltaColumnMappingEnableNameMode
+  with DeltaInsertIntoColumnMappingSelectedTests
+
+class DeltaInsertIntoDataFrameByPathNameColumnMappingSuite
+  extends DeltaInsertIntoDataFrameByPathSuite
+  with DeltaColumnMappingEnableNameMode
+  with DeltaInsertIntoColumnMappingSelectedTests
+
+abstract class DeltaInsertIntoTestsWithTempViews(
+    supportsDynamicOverwrite: Boolean,
+    includeSQLOnlyTests: Boolean)
+  extends DeltaInsertIntoTests(supportsDynamicOverwrite, includeSQLOnlyTests)
+  with DeltaTestUtilsForTempViews {
+  protected def testComplexTempViews(name: String)(text: String, 
expectedResult: Seq[Row]): Unit = {
+    testWithTempView(s"insertInto a temp view created on top of a table - 
$name") {
+      isSQLTempView =>
+        import testImplicits._
+        val t1 = "tbl"
+        sql(s"CREATE TABLE $t1 (key int, value int) USING $v2Format")
+        Seq(SaveMode.Append, SaveMode.Overwrite).foreach {
+          mode =>
+            createTempViewFromSelect(text, isSQLTempView)
+            val df = Seq((0, 3), (1, 2)).toDF("key", "value")
+            try {
+              doInsert("v", df, mode)
+              checkAnswer(spark.table("v"), expectedResult)
+            } catch {
+              case e: AnalysisException =>
+                assert(
+                  e.getMessage.contains(INSERT_INTO_TMP_VIEW_ERROR_MSG) ||
+                    e.getMessage.contains("Inserting into an RDD-based table 
is not allowed") ||
+                    e.getMessage.contains("Table default.v not found") ||
+                    e.getMessage.contains("Table or view 'v' not found in 
database 'default'") ||
+                    e.getMessage.contains("The table or view `default`.`v` 
cannot be found") ||
+                    e.getMessage.contains(
+                      "[UNSUPPORTED_INSERT.RDD_BASED] Can't insert into the 
target."))
+            }
+        }
+    }
+  }
+
+  testComplexTempViews("basic")(
+    "SELECT * FROM tbl",
+    Seq(Row(0, 3), Row(1, 2))
+  )
+
+  testComplexTempViews("subset cols")(
+    "SELECT key FROM tbl",
+    Seq(Row(0), Row(1))
+  )
+
+  testComplexTempViews("superset cols")(
+    "SELECT key, value, 1 FROM tbl",
+    Seq(Row(0, 3, 1), Row(1, 2, 1))
+  )
+
+  testComplexTempViews("nontrivial projection")(
+    "SELECT value as key, key as value FROM tbl",
+    Seq(Row(3, 0), Row(2, 1))
+  )
+
+  testComplexTempViews("view with too many internal aliases")(
+    "SELECT * FROM (SELECT * FROM tbl AS t1) AS t2",
+    Seq(Row(0, 3), Row(1, 2))
+  )
+
+}
+
+class DeltaColumnDefaultsInsertSuite extends InsertIntoSQLOnlyTests with 
DeltaSQLCommandTest {
+
+  import testImplicits._
+
+  override val supportsDynamicOverwrite = true
+  override val includeSQLOnlyTests = true
+
+  val tblPropertiesAllowDefaults =
+    """tblproperties (
+      |  'delta.feature.allowColumnDefaults' = 'enabled',
+      |  'delta.columnMapping.mode' = 'name'
+      |)""".stripMargin
+
+  // Ignored in Gluten: Results mismatch.
+  ignore("Column DEFAULT value support with Delta Lake, positive tests") {
+    Seq(
+      PartitionOverwriteMode.STATIC.toString,
+      PartitionOverwriteMode.DYNAMIC.toString
+    ).foreach {
+      partitionOverwriteMode =>
+        withSQLConf(
+          SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+          SQLConf.PARTITION_OVERWRITE_MODE.key -> partitionOverwriteMode,
+          // Set these configs to allow writing test values like timestamps of 
Jan. 1, year 1, etc.
+          SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> 
LegacyBehaviorPolicy.LEGACY.toString,
+          SQLConf.PARQUET_INT96_REBASE_MODE_IN_WRITE.key -> 
LegacyBehaviorPolicy.LEGACY.toString
+        ) {
+          withTable("t1", "t2", "t3", "t4") {
+            // Positive tests:
+            // Create some columns with default values and then insert into 
them.
+            sql("create table t1(" +
+              s"a int default 42, b boolean default true, c string default 
'abc') using $v2Format " +
+              s"partitioned by (a) $tblPropertiesAllowDefaults")
+            sql("insert into t1 values (1, false, default)")
+            sql("insert into t1 values (1, default, default)")
+            sql("alter table t1 alter column c set default 'def'")
+            sql("insert into t1 values (default, default, default)")
+            sql("alter table t1 alter column c drop default")
+            // Exercise INSERT INTO commands with VALUES lists mapping columns 
positionally.
+            sql("insert into t1 values (default, default, default)")
+            // Write the data in the table 't1' to new table 't4' and then 
perform an INSERT OVERWRITE
+            // back to 't1' here, to exercise static and dynamic partition 
overwrites.
+            sql(
+              f"create table t4(a int, b boolean, c string) using $v2Format " +
+                s"partitioned by (a) $tblPropertiesAllowDefaults")
+            // Exercise INSERT INTO commands with SELECT queries mapping 
columns by name.
+            sql("insert into t4(a, b, c) select a, b, c from t1")
+            sql("insert overwrite table t1 select * from t4")
+            checkAnswer(
+              spark.table("t1"),
+              Seq(
+                Row(1, false, "abc"),
+                Row(1, true, "abc"),
+                Row(42, true, "def"),
+                Row(42, true, null)
+              ))
+            // Insert default values with all supported types.
+            sql(
+              "create table t2(" +
+                "s boolean default true, " +
+                "t byte default cast(null as byte), " +
+                "u short default cast(42 as short), " +
+                "v float default 0, " +
+                "w double default 0, " +
+                "x date default date'0000', " +
+                "y timestamp default timestamp'0000', " +
+                "z decimal(5, 2) default 123.45," +
+                "a1 bigint default 43," +
+                "a2 smallint default cast(5 as smallint)," +
+                s"a3 tinyint default cast(6 as tinyint)) using $v2Format " +
+                tblPropertiesAllowDefaults)
+            sql(
+              "insert into t2 values (default, default, default, default, 
default, default, " +
+                "default, default, default, default, default)")
+            val result: Array[Row] = spark.table("t2").collect()
+            assert(result.length == 1)
+            val row: Row = result(0)
+            assert(row.length == 11)
+            assert(row(0) == true)
+            assert(row(1) == null)
+            assert(row(2) == 42)
+            assert(row(3) == 0.0f)
+            assert(row(4) == 0.0d)
+            assert(row(5).toString == "0001-01-01")
+            assert(row(6).toString == "0001-01-01 00:00:00.0")
+            assert(row(7).toString == "123.45")
+            assert(row(8) == 43L)
+            assert(row(9) == 5)
+            assert(row(10) == 6)
+          }
+          withTable("t3") {
+            // Set a default value for a partitioning column.
+            sql(
+              s"create table t3(i boolean, s bigint, q int default 42) using 
$v2Format " +
+                s"partitioned by (i) $tblPropertiesAllowDefaults")
+            sql("alter table t3 alter column i set default true")
+            sql("insert into t3(i, s, q) values (default, default, default)")
+            checkAnswer(spark.table("t3"), Seq(Row(true, null, 42)))
+            // Drop the column and add it again without the default. Querying 
the column now returns
+            // NULL.
+            sql("alter table t3 drop column q")
+            sql("alter table t3 add column q int")
+            checkAnswer(spark.table("t3"), Seq(Row(true, null, null)))
+          }
+        }
+    }
+  }
+
+  test("Column DEFAULT value support with Delta Lake, negative tests") {
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+      // The table feature is not enabled via TBLPROPERTIES.
+      withTable("createTableWithDefaultFeatureNotEnabled") {
+        checkError(
+          intercept[DeltaAnalysisException] {
+            sql(
+              s"create table createTableWithDefaultFeatureNotEnabled(" +
+                s"i boolean, s bigint, q int default 42) using $v2Format " +
+                "partitioned by (i)")
+          },
+          "WRONG_COLUMN_DEFAULTS_FOR_DELTA_FEATURE_NOT_ENABLED",
+          parameters = Map("commandType" -> "CREATE TABLE")
+        )
+      }
+      withTable("alterTableSetDefaultFeatureNotEnabled") {
+        sql(s"create table alterTableSetDefaultFeatureNotEnabled(a int) using 
$v2Format")
+        checkError(
+          intercept[DeltaAnalysisException] {
+            sql("alter table alterTableSetDefaultFeatureNotEnabled alter 
column a set default 42")
+          },
+          "WRONG_COLUMN_DEFAULTS_FOR_DELTA_FEATURE_NOT_ENABLED",
+          parameters = Map("commandType" -> "ALTER TABLE")
+        )
+      }
+      // Adding a new column with a default value to an existing table is not 
allowed.
+      withTable("alterTableTest") {
+        sql(
+          s"create table alterTableTest(i boolean, s bigint, q int default 42) 
using $v2Format " +
+            s"partitioned by (i) $tblPropertiesAllowDefaults")
+        checkError(
+          intercept[DeltaAnalysisException] {
+            sql("alter table alterTableTest add column z int default 42")
+          },
+          
"WRONG_COLUMN_DEFAULTS_FOR_DELTA_ALTER_TABLE_ADD_COLUMN_NOT_SUPPORTED"
+        )
+      }
+      // The default value fails to analyze.
+      checkError(
+        intercept[AnalysisException] {
+          sql(
+            s"create table t4 (s int default badvalue) using $v2Format " +
+              s"$tblPropertiesAllowDefaults")
+        },
+        INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG,
+        parameters =
+          Map("statement" -> "CREATE TABLE", "colName" -> "`s`", 
"defaultValue" -> "badvalue")
+      )
+
+      // The default value analyzes to a table not in the catalog.
+      // The error message reports that we failed to execute the command 
because subquery
+      // expressions are not allowed in DEFAULT values.
+      checkError(
+        intercept[AnalysisException] {
+          sql(
+            s"create table t4 (s int default (select min(x) from badtable)) 
using $v2Format " +
+              tblPropertiesAllowDefaults)
+        },
+        "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "defaultValue" -> "(select min(x) from badtable)")
+      )
+      // The default value has an explicit alias. It fails to evaluate when 
inlined into the
+      // VALUES list at the INSERT INTO time.
+      // The error message reports that we failed to execute the command 
because subquery
+      // expressions are not allowed in DEFAULT values.
+      checkError(
+        intercept[AnalysisException] {
+          sql(
+            s"create table t4 (s int default (select 42 as alias)) using 
$v2Format " +
+              tblPropertiesAllowDefaults)
+        },
+        "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "defaultValue" -> "(select 42 as alias)")
+      )
+      // The default value parses but the type is not coercible.
+      checkError(
+        intercept[AnalysisException] {
+          sql(
+            s"create table t4 (s bigint default false) " +
+              s"using $v2Format $tblPropertiesAllowDefaults")
+        },
+        "INVALID_DEFAULT_VALUE.DATA_TYPE",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "expectedType" -> "\"BIGINT\"",
+          "actualType" -> "\"BOOLEAN\"",
+          "defaultValue" -> "false")
+      )
+      // It is possible to create a table with NOT NULL constraint and a 
DEFAULT value of NULL.
+      // However, future inserts into that table will fail.
+      withTable("t4") {
+        sql(
+          s"create table t4(i boolean, s bigint, q int default null not null) 
using $v2Format " +
+            s"partitioned by (i) $tblPropertiesAllowDefaults")
+        // The InvariantViolationException is not a SparkThrowable, so just 
check we receive one.
+        assert(intercept[InvariantViolationException] {
+          sql("insert into t4 values (default, default, default)")
+        }.getMessage.nonEmpty)
+      }
+      // It is possible to create a table with a check constraint and a 
DEFAULT value that does not
+      // conform. However, future inserts into that table will fail.
+      withTable("t4") {
+        sql(
+          s"create table t4(i boolean, s bigint, q int default 42) using 
$v2Format " +
+            s"partitioned by (i) $tblPropertiesAllowDefaults")
+        sql("alter table t4 add constraint smallq check (q < 10)")
+        assert(intercept[InvariantViolationException] {
+          sql("insert into t4 values (default, default, default)")
+        }.getMessage.nonEmpty)
+      }
+    }
+    // Column default values are disabled per configuration in general.
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+      checkError(
+        intercept[ParseException] {
+          sql(
+            s"create table t4 (s int default 41 + 1) using $v2Format " +
+              tblPropertiesAllowDefaults)
+        },
+        "UNSUPPORTED_DEFAULT_VALUE.WITH_SUGGESTION",
+        parameters = Map.empty,
+        context = ExpectedContext(fragment = "s int default 41 + 1", start = 
17, stop = 36)
+      )
+    }
+  }
+
+  test("Exercise column defaults with dataframe writes") {
+    // There are three column types exercising various combinations of 
implicit and explicit
+    // default column value references in the 'insert into' statements. Note 
these tests depend on
+    // enabling the configuration to use NULLs for missing DEFAULT column 
values.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"true") {
+      for (useDataFrames <- Seq(false, true)) {
+        withTable("t1", "t2") {
+          sql(
+            s"create table t1(j int, s bigint default 42, x bigint default 43) 
using $v2Format " +
+              tblPropertiesAllowDefaults)
+          if (useDataFrames) {
+            // Use 'saveAsTable' to exercise mapping columns by name. Note 
that we have to specify
+            // values for all columns of the target table here whether we use 
'saveAsTable' or
+            // 'insertInto', since the DataFrame generates a LogicalPlan 
equivalent to a SQL INSERT
+            // INTO command without any explicit user-specified column list. 
For example, if we
+            // used Seq((1)).toDF("j", "s", "x").write.mode("append") here 
instead, it would
+            // generate an unresolved LogicalPlan equivalent to the SQL query
+            // "INSERT INTO t1 VALUES (1)". This would fail with an error 
reporting the VALUES
+            // list is not long enough, since the analyzer would consider this 
equivalent to
+            // "INSERT INTO t1 (j, s, x) VALUES (1)".
+            Seq((1, 42L, 43L))
+              .toDF("j", "s", "x")
+              .write
+              .mode("append")
+              .format("delta")
+              .saveAsTable("t1")
+            Seq((2, 42L, 43L))
+              .toDF("j", "s", "x")
+              .write
+              .mode("append")
+              .format("delta")
+              .saveAsTable("t1")
+            Seq((3, 42L, 43L))
+              .toDF("j", "s", "x")
+              .write
+              .mode("append")
+              .format("delta")
+              .saveAsTable("t1")
+            Seq((4, 44L, 43L))
+              .toDF("j", "s", "x")
+              .write
+              .mode("append")
+              .format("delta")
+              .saveAsTable("t1")
+            Seq((5, 44L, 45L))
+              .toDF("j", "s", "x")
+              .write
+              .mode("append")
+              .format("delta")
+              .saveAsTable("t1")
+          } else {
+            sql("insert into t1(j) values(1)")
+            sql("insert into t1(j, s) values(2, default)")
+            sql("insert into t1(j, s, x) values(3, default, default)")
+            sql("insert into t1(j, s) values(4, 44)")
+            sql("insert into t1(j, s, x) values(5, 44, 45)")
+          }
+          sql(
+            s"create table t2(j int, s bigint default 42, x bigint default 43) 
using $v2Format " +
+              tblPropertiesAllowDefaults)
+          if (useDataFrames) {
+            // Use 'insertInto' to exercise mapping columns positionally.
+            spark.table("t1").where("j = 1").write.insertInto("t2")
+            spark.table("t1").where("j = 2").write.insertInto("t2")
+            spark.table("t1").where("j = 3").write.insertInto("t2")
+            spark.table("t1").where("j = 4").write.insertInto("t2")
+            spark.table("t1").where("j = 5").write.insertInto("t2")
+          } else {
+            sql("insert into t2(j) select j from t1 where j = 1")
+            sql("insert into t2(j, s) select j, default from t1 where j = 2")
+            sql("insert into t2(j, s, x) select j, default, default from t1 
where j = 3")
+            sql("insert into t2(j, s) select j, s from t1 where j = 4")
+            sql("insert into t2(j, s, x) select j, s, 45L from t1 where j = 5")
+          }
+          checkAnswer(
+            spark.table("t2"),
+            Row(1, 42L, 43L) ::
+              Row(2, 42L, 43L) ::
+              Row(3, 42L, 43L) ::
+              Row(4, 44L, 43L) ::
+              Row(5, 44L, 45L) :: Nil)
+          // Also exercise schema evolution with DataFrames.
+          if (useDataFrames) {
+            Seq((5, 44L, 45L, 46L))
+              .toDF("j", "s", "x", "y")
+              .write
+              .mode("append")
+              .format("delta")
+              .option("mergeSchema", "true")
+              .saveAsTable("t2")
+            checkAnswer(
+              spark.table("t2"),
+              Row(1, 42L, 43L, null) ::
+                Row(2, 42L, 43L, null) ::
+                Row(3, 42L, 43L, null) ::
+                Row(4, 44L, 43L, null) ::
+                Row(5, 44L, 45L, null) ::
+                Row(5, 44L, 45L, 46L) :: Nil)
+          }
+        }
+      }
+    }
+  }
+
+  test("ReplaceWhere with column defaults with dataframe writes") {
+    withTable("t1", "t2", "t3") {
+      sql(
+        s"create table t1(j int, s bigint default 42, x bigint default 43) 
using $v2Format " +
+          tblPropertiesAllowDefaults)
+      Seq((1, 42L, 43L)).toDF.write.insertInto("t1")
+      Seq((2, 42L, 43L)).toDF.write.insertInto("t1")
+      Seq((3, 42L, 43L)).toDF.write.insertInto("t1")
+      Seq((4, 44L, 43L)).toDF.write.insertInto("t1")
+      Seq((5, 44L, 45L)).toDF.write.insertInto("t1")
+      spark
+        .table("t1")
+        .write
+        .format("delta")
+        .mode("overwrite")
+        .option("replaceWhere", "j = default and s = default and x = default")
+        .saveAsTable("t2")
+      Seq("t1", "t2").foreach {
+        t =>
+          checkAnswer(
+            spark.table(t),
+            Row(1, 42L, 43L) ::
+              Row(2, 42L, 43L) ::
+              Row(3, 42L, 43L) ::
+              Row(4, 44L, 43L) ::
+              Row(5, 44L, 45L) :: Nil)
+      }
+    }
+  }
+
+  test("DESCRIBE and SHOW CREATE TABLE with column defaults") {
+    withTable("t") {
+      spark.sql(
+        s"CREATE TABLE t (id bigint default 42) " +
+          s"using $v2Format $tblPropertiesAllowDefaults")
+      val descriptionDf = spark.sql(s"DESCRIBE TABLE EXTENDED t")
+      assert(
+        descriptionDf.schema.map(field => (field.name, field.dataType)) === 
Seq(
+          ("col_name", StringType),
+          ("data_type", StringType),
+          ("comment", StringType)))
+      QueryTest.checkAnswer(
+        descriptionDf.filter(
+          "!(col_name in ('Catalog', 'Created Time', 'Created By', 'Database', 
" +
+            "'index', 'Is_managed_location', 'Location', 'Name', 'Owner', 
'Partition Provider'," +
+            "'Provider', 'Table', 'Table Properties',  'Type', '_partition', 
'Last Access', " +
+            "'Statistics', ''))"),
+        Seq(
+          Row("# Column Default Values", "", ""),
+          Row("# Detailed Table Information", "", ""),
+          Row("id", "bigint", "42"),
+          Row("id", "bigint", null)
+        )
+      )
+    }
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE t (
+             |  a bigint NOT NULL,
+             |  b bigint DEFAULT 42,
+             |  c string DEFAULT 'abc, "def"' COMMENT 'comment'
+             |)
+             |USING parquet
+             |COMMENT 'This is a comment'
+             |$tblPropertiesAllowDefaults
+        """.stripMargin)
+      val currentCatalog = 
spark.sessionState.catalogManager.currentCatalog.name()
+      QueryTest.checkAnswer(
+        sql("SHOW CREATE TABLE T"),
+        Seq(Row(s"""CREATE TABLE $currentCatalog.default.T (
+                   |  a BIGINT,
+                   |  b BIGINT DEFAULT 42,
+                   |  c STRING DEFAULT 'abc, "def"' COMMENT 'comment')
+                   |USING parquet
+                   |COMMENT 'This is a comment'
+                   |TBLPROPERTIES (
+                   |  'delta.columnMapping.mode' = 'name',
+                   |  'delta.feature.allowColumnDefaults' = 'enabled')
+                   |""".stripMargin))
+      )
+    }
+  }
+}
+
+/** These tests come from Apache Spark with some modifications to match Delta 
behavior. */
+abstract class DeltaInsertIntoTests(
+    override protected val supportsDynamicOverwrite: Boolean,
+    override protected val includeSQLOnlyTests: Boolean)
+  extends InsertIntoSQLOnlyTests {
+
+  import testImplicits._
+
+  override def afterEach(): Unit = {
+    spark.catalog.listTables().collect().foreach(t => sql(s"drop table 
${t.name}"))
+    super.afterEach()
+  }
+
+  // START Apache Spark tests
+
+  /**
+   * Insert data into a table using the insertInto statement. Implementations 
can be in SQL
+   * ("INSERT") or using the DataFrameWriter (`df.write.insertInto`). 
Insertions will be by column
+   * ordinal and not by column name.
+   */
+  protected def doInsert(tableName: String, insert: DataFrame, mode: SaveMode 
= null): Unit
+
+  test("insertInto: append") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    doInsert(t1, df)
+    verifyTable(t1, df)
+  }
+
+  test("insertInto: append by position") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+
+    doInsert(t1, dfr)
+    verifyTable(t1, df)
+  }
+
+  test("insertInto: append cast automatically") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+    doInsert(t1, df)
+    verifyTable(t1, df)
+  }
+
+  test("insertInto: append partitioned table") {
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+      doInsert(t1, df)
+      verifyTable(t1, df)
+    }
+  }
+
+  test("insertInto: overwrite non-partitioned table") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data")
+    doInsert(t1, df)
+    doInsert(t1, df2, SaveMode.Overwrite)
+    verifyTable(t1, df2)
+  }
+
+  test("insertInto: overwrite partitioned table in static mode") {
+    withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+      val t1 = "tbl"
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+      val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+      doInsert(t1, init)
+
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+      doInsert(t1, df, SaveMode.Overwrite)
+      verifyTable(t1, df)
+    }
+  }
+
+  test("insertInto: overwrite by position") {
+    withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+      val t1 = "tbl"
+      withTable(t1) {
+        sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+        val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+        doInsert(t1, init)
+
+        val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+        doInsert(t1, dfr, SaveMode.Overwrite)
+
+        val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+        verifyTable(t1, df)
+      }
+    }
+  }
+
+  test("insertInto: overwrite cast automatically") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data")
+    val df2c = Seq((4, "d"), (5, "e"), (6, "f")).toDF("id", "data")
+    doInsert(t1, df)
+    doInsert(t1, df2c, SaveMode.Overwrite)
+    verifyTable(t1, df2)
+  }
+
+  test("insertInto: fails when missing a column") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING 
$v2Format")
+    val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    // mismatched datatype
+    val df2 = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+    for (df <- Seq(df1, df2)) {
+      val exc = intercept[AnalysisException] {
+        doInsert(t1, df)
+      }
+      verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", 
"missing"))
+      assert(exc.getMessage.contains("not enough data columns"))
+    }
+  }
+
+  test("insertInto: overwrite fails when missing a column") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING 
$v2Format")
+    val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    // mismatched datatype
+    val df2 = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+    for (df <- Seq(df1, df2)) {
+      val exc = intercept[AnalysisException] {
+        doInsert(t1, df, SaveMode.Overwrite)
+      }
+      verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", 
"missing"))
+      assert(exc.getMessage.contains("not enough data columns"))
+    }
+  }
+
+  // This behavior is specific to Delta
+  test("insertInto: fails when an extra column is present but can evolve 
schema") {
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+      val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit")
+      val exc = intercept[AnalysisException] {
+        doInsert(t1, df)
+      }
+
+      verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+      assert(exc.getMessage.contains(s"mergeSchema"))
+
+      withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") {
+        doInsert(t1, df)
+      }
+      verifyTable(t1, Seq((1L, "a", "mango")).toDF("id", "data", "fruit"))
+    }
+  }
+
+  test("insertInto: UTC timestamp partition values round trip across different 
session TZ") {
+    val t1 = "utc_timestamp_partitioned_values"
+    withTable(t1) {
+      withTimeZone("UTC") {
+        sql(s"CREATE TABLE $t1 (data int, ts timestamp) USING delta 
PARTITIONED BY (ts)")
+        sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15T04:00:00UTC')")
+        sql(s"INSERT INTO $t1 VALUES (2, timestamp'2024-06-15T4:00:00UTC+8')")
+        sql(s"INSERT INTO $t1 VALUES (3, timestamp'2024-06-15T5:00:00 
UTC+01:00')")
+        sql(s"INSERT INTO $t1 VALUES (4, 
timestamp'2024-06-16T5:00:00.123456UTC')")
+        sql(s"INSERT INTO $t1 VALUES (5, timestamp'1903-12-28T5:00:00')")
+      }
+
+      withTimeZone("GMT-8") {
+        val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+        val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+        val partitionColName = 
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+        checkAnswer(
+          allFiles.select("partitionValues").orderBy("modificationTime"),
+          Seq(
+            Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z")),
+            Row(Map(partitionColName -> "2024-06-14T20:00:00.000000Z")),
+            Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z")),
+            Row(Map(partitionColName -> "2024-06-16T05:00:00.123456Z")),
+            Row(Map(partitionColName -> "1903-12-28T05:00:00.000000Z"))
+          )
+        )
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-15T4:00:00UTC+8'"),
+          Seq(Row(2)))
+
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-14T20:00:00UTC-08'"),
+          Seq(Row(1), Row(3)))
+
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'1903-12-27T21:00:00UTC-08'"),
+          Seq(Row(5)))
+
+        checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(4)))
+      }
+    }
+  }
+
+  test("insertInto: Non-UTC and UTC partition values round trip same session 
TZ") {
+    val t1 = "utc_timestamp_partitioned_values"
+    withTable(t1) {
+      withTimeZone("GMT-8") {
+        sql(s"CREATE TABLE $t1 (data int, ts timestamp) USING delta 
PARTITIONED BY (ts)")
+        sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15T4:00:00UTC')")
+        sql(s"INSERT INTO $t1 VALUES (2, timestamp'2024-06-15T4:00:00UTC+8')")
+      }
+
+      withTimeZone("GMT-8") {
+        withSQLConf(DeltaSQLConf.UTC_TIMESTAMP_PARTITION_VALUES.key -> 
"false") {
+          sql(s"INSERT INTO $t1 VALUES (3, timestamp'2024-06-15T5:00:00 
UTC+01:00')")
+          sql(s"INSERT INTO $t1 VALUES (4, timestamp'1903-12-28T5:00:00')")
+        }
+
+        val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+        val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+        val partitionColName = 
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+        checkAnswer(
+          allFiles.select("partitionValues").orderBy("modificationTime"),
+          Seq(
+            Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z")),
+            Row(Map(partitionColName -> "2024-06-14T20:00:00.000000Z")),
+            Row(Map(partitionColName -> "2024-06-14 20:00:00")),
+            Row(Map(partitionColName -> "1903-12-28 05:00:00"))
+          )
+        )
+      }
+
+      withTimeZone("GMT-8") {
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-15T4:00:00UTC+8'"),
+          Seq(Row(2)))
+
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-14T20:00:00'"),
+          Seq(Row(1), Row(3)))
+
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'1903-12-28T05:00:00'"),
+          Seq(Row(4)))
+
+        checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(3)))
+      }
+    }
+  }
+
+  test("insertInto: Timestamp No Timezone round trips across timezones") {
+    val t1 = "timestamp_ntz"
+    withTable(t1) {
+      withTimeZone("GMT-8") {
+        sql(s"CREATE TABLE $t1 (data int, ts timestamp_ntz) USING delta 
PARTITIONED BY (ts)")
+        sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15T4:00:00')")
+        sql(s"INSERT INTO $t1 VALUES (2, timestamp'2024-06-16T5:00:00')")
+        sql(s"INSERT INTO $t1 VALUES (3, timestamp'1903-12-28T5:00:00')")
+
+        val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+        val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+        val partitionColName = 
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+        checkAnswer(
+          allFiles.select("partitionValues").orderBy("modificationTime"),
+          Seq(
+            Row(Map(partitionColName -> "2024-06-15 04:00:00")),
+            Row(Map(partitionColName -> "2024-06-16 05:00:00")),
+            Row(Map(partitionColName -> "1903-12-28 05:00:00"))
+          )
+        )
+      }
+
+      withSQLConf("spark.sql.session.timeZone" -> "UTC-03:00") {
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-15T4:00:00'"),
+          Seq(Row(1)))
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-16T05:00:00'"),
+          Seq(Row(2)))
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'1903-12-28T05:00:00'"),
+          Seq(Row(3)))
+        checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(3)))
+      }
+    }
+  }
+
+  test("insertInto: Timestamp round trips across same session time zone: UTC 
normalized") {
+    val t1 = "utc_timestamp_partitioned_values"
+
+    withTimeZone("GMT-8") {
+      sql(s"CREATE TABLE $t1 (data int, ts timestamp) USING delta PARTITIONED 
BY (ts)")
+      sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15 04:00:00UTC')")
+      sql(s"INSERT INTO $t1 VALUES (2, timestamp'2024-06-15T4:00:00UTC+8')")
+      sql(s"INSERT INTO $t1 VALUES (3, timestamp'2024-06-15T5:00:00 
UTC+01:00')")
+      val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+      val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+      val partitionColName = 
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+      checkAnswer(
+        allFiles.select("partitionValues").orderBy("modificationTime"),
+        Seq(
+          Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z")),
+          Row(Map(partitionColName -> "2024-06-14T20:00:00.000000Z")),
+          Row(Map(partitionColName -> "2024-06-15T04:00:00.000000Z"))
+        )
+      )
+
+      checkAnswer(
+        sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-15T04:00:00UTC'"),
+        Seq(Row(1), Row(3)))
+
+      checkAnswer(
+        sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-14T20:00:00UTC'"),
+        Seq(Row(2)))
+
+      checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(2)))
+    }
+  }
+
+  test("insertInto: Timestamp round trips across same session time zone: 
session time normalized") {
+    val t1 = "utc_timestamp_partitioned_values"
+
+    withTimeZone("UTC") {
+      withSQLConf(DeltaSQLConf.UTC_TIMESTAMP_PARTITION_VALUES.key -> "false") {
+        sql(s"CREATE TABLE $t1 (data int, ts timestamp) USING delta 
PARTITIONED BY (ts)")
+        sql(s"INSERT INTO $t1 VALUES (1, timestamp'2024-06-15 
04:00:00UTC+08:00')")
+        sql(s"INSERT INTO $t1 VALUES (2, 
timestamp'2024-06-15T4:00:00UTC-08:00')")
+        sql(s"INSERT INTO $t1 VALUES (3, 
timestamp'2024-06-15T5:00:00UTC+09:00')")
+        val deltaLog = DeltaLog.forTable(spark, TableIdentifier(t1))
+        val allFiles = deltaLog.unsafeVolatileSnapshot.allFiles
+        val partitionColName = 
deltaLog.unsafeVolatileMetadata.physicalPartitionColumns.head
+
+        checkAnswer(
+          allFiles.select("partitionValues").orderBy("modificationTime"),
+          Seq(
+            Row(Map(partitionColName -> "2024-06-14 20:00:00")),
+            Row(Map(partitionColName -> "2024-06-15 12:00:00")),
+            Row(Map(partitionColName -> "2024-06-14 20:00:00"))
+          )
+        )
+
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-14T20:00:00'"),
+          Seq(Row(1), Row(3)))
+
+        checkAnswer(
+          sql(s"SELECT data FROM $t1 where ts = 
timestamp'2024-06-15T12:00:00'"),
+          Seq(Row(2)))
+
+        checkAnswer(sql(s"SELECT count(distinct(ts)) from $t1"), Seq(Row(2)))
+      }
+    }
+  }
+
+  private def withTimeZone(zone: String)(f: => Unit): Unit = {
+    val currentDefault = TimeZone.getDefault
+    try {
+      TimeZone.setDefault(TimeZone.getTimeZone(zone))
+      f
+    } finally {
+      TimeZone.setDefault(currentDefault)
+    }
+  }
+
+  // This behavior is specific to Delta
+  testQuietly("insertInto: schema enforcement") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+    val df = Seq(("a", 1L)).toDF("id", "data") // reverse order
+
+    def getDF(rows: Row*): DataFrame = {
+      spark.createDataFrame(spark.sparkContext.parallelize(rows), 
spark.table(t1).schema)
+    }
+
+    withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> "strict") {
+      intercept[AnalysisException] {
+        doInsert(t1, df, SaveMode.Overwrite)
+      }
+
+      verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+
+      intercept[AnalysisException] {
+        doInsert(t1, df)
+      }
+
+      verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+    }
+
+    withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> "ansi") {
+      intercept[SparkException] {
+        doInsert(t1, df, SaveMode.Overwrite)
+      }
+
+      verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+
+      intercept[SparkException] {
+        doInsert(t1, df)
+      }
+
+      verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
+    }
+
+    withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> "legacy") {
+      doInsert(t1, df, SaveMode.Overwrite)
+      verifyTable(t1, getDF(Row(null, "1")))
+
+      doInsert(t1, df)
+
+      verifyTable(t1, getDF(Row(null, "1"), Row(null, "1")))
+    }
+  }
+
+  testQuietly("insertInto: struct types and schema enforcement") {
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(s"""CREATE TABLE $t1 (
+             |  id bigint,
+             |  point struct<x: double, y: double>
+             |)
+             |USING delta""".stripMargin)
+      val init = Seq((1L, (0.0, 1.0))).toDF("id", "point")
+      doInsert(t1, init)
+
+      doInsert(t1, Seq((2L, (1.0, 0.0))).toDF("col1", "col2")) // naming 
doesn't matter
+
+      // can handle null types
+      doInsert(t1, Seq((3L, (1.0, null))).toDF("col1", "col2"))
+      doInsert(t1, Seq((4L, (null, 1.0))).toDF("col1", "col2"))
+
+      val expected = Seq(
+        Row(1L, Row(0.0, 1.0)),
+        Row(2L, Row(1.0, 0.0)),
+        Row(3L, Row(1.0, null)),
+        Row(4L, Row(null, 1.0)))
+      verifyTable(t1, spark.createDataFrame(expected.asJava, 
spark.table(t1).schema))
+
+      // schema enforcement
+      val complexSchema = Seq((5L, (0.5, 0.5), (2.5, 2.5, 1.0), "a", (0.5, 
"b")))
+        .toDF("long", "struct", "newstruct", "string", "badstruct")
+        .select(
+          $"long",
+          $"struct",
+          struct($"newstruct._1".as("x"), $"newstruct._2".as("y"), 
$"newstruct._3".as("z"))
+            .as("newstruct"),
+          $"string",
+          $"badstruct")
+
+      // new column in root
+      intercept[AnalysisException] {
+        doInsert(t1, complexSchema.select("long", "struct", "string"))
+      }
+
+      // new column in struct not accepted
+      intercept[AnalysisException] {
+        doInsert(t1, complexSchema.select("long", "newstruct"))
+      }
+
+      withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> "strict") {
+        // bad data type not accepted
+        intercept[AnalysisException] {
+          doInsert(t1, complexSchema.select("string", "struct"))
+        }
+
+        // nested bad data type in struct not accepted
+        intercept[AnalysisException] {
+          doInsert(t1, complexSchema.select("long", "badstruct"))
+        }
+      }
+
+      // missing column in struct
+      intercept[AnalysisException] {
+        doInsert(t1, complexSchema.select($"long", struct(lit(0.1))))
+      }
+
+      // wrong ordering
+      intercept[AnalysisException] {
+        doInsert(t1, complexSchema.select("struct", "long"))
+      }
+
+      // schema evolution
+      withSQLConf(
+        DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true",
+        SQLConf.STORE_ASSIGNMENT_POLICY.key -> "strict") {
+        // ordering should still match
+        intercept[AnalysisException] {
+          doInsert(t1, complexSchema.select("struct", "long"))
+        }
+
+        intercept[AnalysisException] {
+          doInsert(t1, complexSchema.select("struct", "long", "string"))
+        }
+
+        // new column to the end works
+        doInsert(t1, complexSchema.select($"long", $"struct", 
$"string".as("letter")))
+
+        // still cannot insert missing column
+        intercept[AnalysisException] {
+          doInsert(t1, complexSchema.select("long", "struct"))
+        }
+
+        intercept[AnalysisException] {
+          doInsert(t1, complexSchema.select($"long", struct(lit(0.1)), 
$"string"))
+        }
+
+        // still perform nested data type checks
+        intercept[AnalysisException] {
+          doInsert(t1, complexSchema.select("long", "badstruct", "string"))
+        }
+
+        // bad column within struct
+        intercept[AnalysisException] {
+          doInsert(
+            t1,
+            complexSchema.select($"long", struct(lit(0.1), lit("a"), 
lit(0.2)), $"string"))
+        }
+
+        // Add column to nested field
+        doInsert(t1, complexSchema.select($"long", $"newstruct", lit(null)))
+
+        // cannot insert missing field into struct now
+        intercept[AnalysisException] {
+          doInsert(t1, complexSchema.select("long", "struct", "string"))
+        }
+      }
+
+      val expected2 = Seq(
+        Row(1L, Row(0.0, 1.0, null), null),
+        Row(2L, Row(1.0, 0.0, null), null),
+        Row(3L, Row(1.0, null, null), null),
+        Row(4L, Row(null, 1.0, null), null),
+        Row(5L, Row(0.5, 0.5, null), "a"),
+        Row(5L, Row(2.5, 2.5, 1.0), null)
+      )
+      verifyTable(t1, spark.createDataFrame(expected2.asJava, 
spark.table(t1).schema))
+
+      val expectedSchema = new StructType()
+        .add("id", LongType)
+        .add(
+          "point",
+          new StructType()
+            .add("x", DoubleType)
+            .add("y", DoubleType)
+            .add("z", DoubleType))
+        .add("letter", StringType)
+      val diff = SchemaUtils.reportDifferences(spark.table(t1).schema, 
expectedSchema)
+      if (diff.nonEmpty) {
+        fail(diff.mkString("\n"))
+      }
+    }
+  }
+
+  dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic 
mode") {
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+      val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+      doInsert(t1, init)
+
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+      doInsert(t1, df, SaveMode.Overwrite)
+
+      verifyTable(t1, df.union(sql("SELECT 4L, 'keep'")))
+    }
+  }
+
+  dynamicOverwriteTest("insertInto: overwrite partitioned table in dynamic 
mode by position") {
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+      val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+      doInsert(t1, init)
+
+      val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
+      doInsert(t1, dfr, SaveMode.Overwrite)
+
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c"), (4L, "keep")).toDF("id", 
"data")
+      verifyTable(t1, df)
+    }
+  }
+
+  dynamicOverwriteTest(
+    "insertInto: overwrite partitioned table in dynamic mode automatic 
casting") {
+    val t1 = "tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+      val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
+      doInsert(t1, init)
+
+      val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+      val dfc = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+      doInsert(t1, df, SaveMode.Overwrite)
+
+      verifyTable(t1, df.union(sql("SELECT 4L, 'keep'")))
+    }
+  }
+
+  dynamicOverwriteTest("insertInto: overwrite fails when missing a column in 
dynamic mode") {
+    val t1 = "tbl"
+    sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING 
$v2Format")
+    val df1 = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
+    // mismatched datatype
+    val df2 = Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", "data")
+    for (df <- Seq(df1, df2)) {
+      val exc = intercept[AnalysisException] {
+        doInsert(t1, df, SaveMode.Overwrite)
+      }
+      verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", 
"missing"))
+      assert(exc.getMessage.contains("not enough data columns"))
+    }
+  }
+
+  test("insert nested struct from view into delta") {
+    withTable("testNestedStruct") {
+      sql(
+        s"CREATE TABLE testNestedStruct " +
+          s" (num INT, text STRING, s STRUCT<a:STRING, s2: 
STRUCT<c:STRING,d:STRING>, b:STRING>)" +
+          s" USING DELTA")
+      val data = sql(s"SELECT 1, 'a', struct('a', struct('c', 'd'), 'b')")
+      doInsert("testNestedStruct", data)
+      verifyTable(
+        "testNestedStruct",
+        sql(s"SELECT 1 AS num, 'a' AS text, struct('a', struct('c', 'd') AS 
s2, 'b') AS s"))
+    }
+  }
+}
+
+trait InsertIntoSQLOnlyTests extends QueryTest with SharedSparkSession with 
BeforeAndAfter {
+
+  import testImplicits._
+
+  /** Check that the results in `tableName` match the `expected` DataFrame. */
+  protected def verifyTable(tableName: String, expected: DataFrame): Unit = {
+    checkAnswer(spark.table(tableName), expected)
+  }
+
+  protected val v2Format: String = "delta"
+
+  /**
+   * Whether dynamic partition overwrites are supported by the `Table` 
definitions used in the test
+   * suites. Tables that leverage the V1 Write interface do not support 
dynamic partition
+   * overwrites.
+   */
+  protected val supportsDynamicOverwrite: Boolean
+
+  /** Whether to include the SQL specific tests in this trait within the 
extending test suite. */
+  protected val includeSQLOnlyTests: Boolean
+
+  private def withTableAndData(tableName: String)(testFn: String => Unit): 
Unit = {
+    withTable(tableName) {
+      val viewName = "tmp_view"
+      val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, 
"c"))).toDF("id", "data")
+      df.createOrReplaceTempView(viewName)
+      withTempView(viewName) {
+        testFn(viewName)
+      }
+    }
+  }
+
+  protected def dynamicOverwriteTest(testName: String)(f: => Unit): Unit = {
+    test(testName) {
+      try {
+        withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString) {
+          f
+        }
+        if (!supportsDynamicOverwrite) {
+          fail("Expected failure from test, because the table doesn't support 
dynamic overwrites")
+        }
+      } catch {
+        case a: AnalysisException if !supportsDynamicOverwrite =>
+          assert(a.getMessage.contains("does not support dynamic overwrite"))
+      }
+    }
+  }
+
+  if (includeSQLOnlyTests) {
+    test("InsertInto: when the table doesn't exist") {
+      val t1 = "tbl"
+      val t2 = "tbl2"
+      withTableAndData(t1) {
+        _ =>
+          sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+          val e = intercept[AnalysisException] {
+            sql(s"INSERT INTO $t2 VALUES (2L, 'dummy')")
+          }
+          assert(e.getMessage.contains(t2))
+          assert(
+            e.getMessage.contains("Table not found") ||
+              e.getMessage.contains(s"table or view `$t2` cannot be found")
+          )
+      }
+    }
+
+    test("InsertInto: append to partitioned table - static clause") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+          sql(s"INSERT INTO $t1 PARTITION (id = 23) SELECT data FROM $view")
+          verifyTable(t1, sql(s"SELECT 23, data FROM $view"))
+      }
+    }
+
+    test("InsertInto: static PARTITION clause fails with non-partition 
column") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (data)")
+
+          val exc = intercept[AnalysisException] {
+            sql(s"INSERT INTO TABLE $t1 PARTITION (id=1) SELECT data FROM 
$view")
+          }
+
+          verifyTable(t1, spark.emptyDataFrame)
+          assert(
+            exc.getMessage.contains("PARTITION clause cannot contain a 
non-partition column") ||
+              exc.getMessage.contains("PARTITION clause cannot contain the 
non-partition column") ||
+              exc.getMessage.contains(
+                "[NON_PARTITION_COLUMN] PARTITION clause cannot contain the 
non-partition column"))
+          assert(exc.getMessage.contains("id"))
+      }
+    }
+
+    test("InsertInto: dynamic PARTITION clause fails with non-partition 
column") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+
+          val exc = intercept[AnalysisException] {
+            sql(s"INSERT INTO TABLE $t1 PARTITION (data) SELECT * FROM $view")
+          }
+
+          verifyTable(t1, spark.emptyDataFrame)
+          assert(
+            exc.getMessage.contains("PARTITION clause cannot contain a 
non-partition column") ||
+              exc.getMessage.contains("PARTITION clause cannot contain the 
non-partition column") ||
+              exc.getMessage.contains(
+                "[NON_PARTITION_COLUMN] PARTITION clause cannot contain the 
non-partition column"))
+          assert(exc.getMessage.contains("data"))
+      }
+    }
+
+    test("InsertInto: overwrite - dynamic clause - static mode") {
+      withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+        val t1 = "tbl"
+        withTableAndData(t1) {
+          view =>
+            sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+            sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'also-deleted')")
+            sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id) SELECT * FROM 
$view")
+            verifyTable(t1, Seq((1, "a"), (2, "b"), (3, "c")).toDF())
+        }
+      }
+    }
+
+    dynamicOverwriteTest("InsertInto: overwrite - dynamic clause - dynamic 
mode") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+          sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'keep')")
+          sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id) SELECT * FROM $view")
+          verifyTable(t1, Seq((1, "a"), (2, "b"), (3, "c"), (4, 
"keep")).toDF("id", "data"))
+      }
+    }
+
+    test("InsertInto: overwrite - missing clause - static mode") {
+      withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+        val t1 = "tbl"
+        withTableAndData(t1) {
+          view =>
+            sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+            sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'also-deleted')")
+            sql(s"INSERT OVERWRITE TABLE $t1 SELECT * FROM $view")
+            verifyTable(t1, Seq((1, "a"), (2, "b"), (3, "c")).toDF("id", 
"data"))
+        }
+      }
+    }
+
+    dynamicOverwriteTest("InsertInto: overwrite - missing clause - dynamic 
mode") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
+          sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'keep')")
+          sql(s"INSERT OVERWRITE TABLE $t1 SELECT * FROM $view")
+          verifyTable(t1, Seq((1, "a"), (2, "b"), (3, "c"), (4, 
"keep")).toDF("id", "data"))
+      }
+    }
+
+    test("InsertInto: overwrite - static clause") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(
+            s"CREATE TABLE $t1 (id bigint, data string, p1 int) " +
+              s"USING $v2Format PARTITIONED BY (p1)")
+          sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 23), (4L, 'keep', 2)")
+          verifyTable(t1, Seq((2L, "dummy", 23), (4L, "keep", 2)).toDF("id", 
"data", "p1"))
+          sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p1 = 23) SELECT * FROM 
$view")
+          verifyTable(
+            t1,
+            Seq((1, "a", 23), (2, "b", 23), (3, "c", 23), (4, "keep", 
2)).toDF("id", "data", "p1"))
+      }
+    }
+
+    test("InsertInto: overwrite - mixed clause - static mode") {
+      withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+        val t1 = "tbl"
+        withTableAndData(t1) {
+          view =>
+            sql(
+              s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+                s"USING $v2Format PARTITIONED BY (id, p)")
+            sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 
'also-deleted', 2)")
+            sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id, p = 2) SELECT * 
FROM $view")
+            verifyTable(t1, Seq((1, "a", 2), (2, "b", 2), (3, "c", 
2)).toDF("id", "data", "p"))
+        }
+      }
+    }
+
+    test("InsertInto: overwrite - mixed clause reordered - static mode") {
+      withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+        val t1 = "tbl"
+        withTableAndData(t1) {
+          view =>
+            sql(
+              s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+                s"USING $v2Format PARTITIONED BY (id, p)")
+            sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 
'also-deleted', 2)")
+            sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2, id) SELECT * 
FROM $view")
+            verifyTable(t1, Seq((1, "a", 2), (2, "b", 2), (3, "c", 
2)).toDF("id", "data", "p"))
+        }
+      }
+    }
+
+    test("InsertInto: overwrite - implicit dynamic partition - static mode") {
+      withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+        val t1 = "tbl"
+        withTableAndData(t1) {
+          view =>
+            sql(
+              s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+                s"USING $v2Format PARTITIONED BY (id, p)")
+            sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 
'also-deleted', 2)")
+            sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2) SELECT * FROM 
$view")
+            verifyTable(t1, Seq((1, "a", 2), (2, "b", 2), (3, "c", 
2)).toDF("id", "data", "p"))
+        }
+      }
+    }
+
+    dynamicOverwriteTest("InsertInto: overwrite - mixed clause - dynamic 
mode") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(
+            s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+              s"USING $v2Format PARTITIONED BY (id, p)")
+          sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+          sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2, id) SELECT * FROM 
$view")
+          verifyTable(
+            t1,
+            Seq((1, "a", 2), (2, "b", 2), (3, "c", 2), (4, "keep", 
2)).toDF("id", "data", "p"))
+      }
+    }
+
+    dynamicOverwriteTest("InsertInto: overwrite - mixed clause reordered - 
dynamic mode") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(
+            s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+              s"USING $v2Format PARTITIONED BY (id, p)")
+          sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+          sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id, p = 2) SELECT * FROM 
$view")
+          verifyTable(
+            t1,
+            Seq((1, "a", 2), (2, "b", 2), (3, "c", 2), (4, "keep", 
2)).toDF("id", "data", "p"))
+      }
+    }
+
+    dynamicOverwriteTest("InsertInto: overwrite - implicit dynamic partition - 
dynamic mode") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(
+            s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+              s"USING $v2Format PARTITIONED BY (id, p)")
+          sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+          sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2) SELECT * FROM 
$view")
+          verifyTable(
+            t1,
+            Seq((1, "a", 2), (2, "b", 2), (3, "c", 2), (4, "keep", 
2)).toDF("id", "data", "p"))
+      }
+    }
+
+    test("insert nested struct literal into delta") {
+      withTable("insertNestedTest") {
+        sql(s"CREATE TABLE insertNestedTest " +
+          s" (num INT, text STRING, s STRUCT<a:STRING, s2: 
STRUCT<c:STRING,d:STRING>, b:STRING>)" +
+          s" USING DELTA")
+        sql(s"INSERT INTO insertNestedTest VALUES (1, 'a', struct('a', 
struct('c', 'd'), 'b'))")
+      }
+    }
+
+    dynamicOverwriteTest("InsertInto: overwrite - multiple static partitions - 
dynamic mode") {
+      val t1 = "tbl"
+      withTableAndData(t1) {
+        view =>
+          sql(
+            s"CREATE TABLE $t1 (id bigint, data string, p int) " +
+              s"USING $v2Format PARTITIONED BY (id, p)")
+          sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+          sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 2, p = 2) SELECT 
data FROM $view")
+          verifyTable(
+            t1,
+            Seq((2, "a", 2), (2, "b", 2), (2, "c", 2), (4, "keep", 
2)).toDF("id", "data", "p"))
+      }
+    }
+
+    test("InsertInto: overwrite - dot in column names - static mode") {
+      import testImplicits._
+      val t1 = "tbl"
+      withTable(t1) {
+        sql(s"CREATE TABLE $t1 (`a.b` string, `c.d` string) USING $v2Format 
PARTITIONED BY (`a.b`)")
+        sql(s"INSERT OVERWRITE $t1 PARTITION (`a.b` = 'a') VALUES('b')")
+        verifyTable(t1, Seq("a" -> "b").toDF("id", "data"))
+      }
+    }
+  }
+
+  // END Apache Spark tests
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
new file mode 100644
index 0000000000..9d90d0c885
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+object DeltaInsertIntoTableSuiteShims {
+  val INSERT_INTO_TMP_VIEW_ERROR_MSG = "Inserting into a view is not allowed"
+
+  val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = 
"INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION"
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
index c01e3f3f7c..e41df6180f 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
@@ -1516,8 +1516,7 @@ class DeltaSuite
     }
   }
 
-  // Ignore in Gluten.
-  ignore("SC-8727 - default snapshot num partitions") {
+  test("SC-8727 - default snapshot num partitions") {
     withTempDir {
       tempDir =>
         spark.range(10).write.format("delta").save(tempDir.toString)
@@ -1560,7 +1559,7 @@ class DeltaSuite
     }
   }
 
-  // Ignore in Gluten.
+  // Ignore in Gluten: Gluten has less partitions.
   ignore("SC-8810: skip deleted file") {
     withSQLConf(("spark.sql.files.ignoreMissingFiles", "true")) {
       withTempDir {
@@ -1595,7 +1594,7 @@ class DeltaSuite
     }
   }
 
-  // Ignore in Gluten.
+  // Ignore in Gluten: Error message mismatch.
   ignore("SC-8810: skipping deleted file still throws on corrupted file") {
     withSQLConf(("spark.sql.files.ignoreMissingFiles", "true")) {
       withTempDir {
@@ -1661,7 +1660,7 @@ class DeltaSuite
     }
   }
 
-  // Ignore in Gluten.
+  // Ignore in Gluten: Error message mismatch.
   ignore("deleted files cause failure by default") {
     withTempDir {
       tempDir =>
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/FakeFileSystem.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/FakeFileSystem.scala
new file mode 100644
index 0000000000..1b7d4441e3
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/FakeFileSystem.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.hadoop.fs.RawLocalFileSystem
+
+import java.net.URI
+
+/** A fake file system to test whether session Hadoop configuration will be 
picked up. */
+class FakeFileSystem extends RawLocalFileSystem {
+  override def getScheme: String = FakeFileSystem.scheme
+  override def getUri: URI = FakeFileSystem.uri
+}
+
+object FakeFileSystem {
+  val scheme = "fake"
+  val uri = URI.create(s"$scheme:///")
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
index f047278b67..327ef2dad8 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/UpdateSuiteBase.scala
@@ -1035,7 +1035,8 @@ abstract class UpdateSuiteBase
     expectedErrorClassForSQLTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
     expectedErrorClassForDataSetTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION"
   )
-// Ignore in Gluten.
+
+// Ignore in Gluten: Error message mismatch.
 //  testInvalidTempViews("superset cols")(
 //    text = "SELECT key, value, 1 FROM tab",
 //    // The analyzer can't tell whether the table originally had the extra 
column or not.
@@ -1059,7 +1060,7 @@ abstract class UpdateSuiteBase
     }
   }
 
-// Ignore in Gluten - result mismatch.
+// Ignore in Gluten - result mismatch, but Gluten's answer is correct.
 //  testComplexTempViews("nontrivial projection")(
 //    text = "SELECT value as key, key as value FROM tab",
 //    expectedResult = Seq(Row(3, 0), Row(3, 3))
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
index bf64858399..53adfbf4a4 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
@@ -46,10 +46,11 @@ trait DeltaSQLCommandTest extends SharedSparkSession {
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
       .set("spark.default.parallelism", "1")
       .set("spark.memory.offHeap.enabled", "true")
-      .set("spark.sql.shuffle.partitions", "1")
+      .set("spark.sql.shuffle.partitions", "5")
       .set("spark.memory.offHeap.size", "2g")
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
       .set(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key, "true")
+      .set("spark.databricks.delta.snapshotPartitions", "2")
   }
 }
 // spotless:on


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to