This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 56523dc85e [VL] Delta: Add Delta Lake write unit test for Spark 3.5 + 
Delta 3.3 (#10802)
56523dc85e is described below

commit 56523dc85e30e2eaca5d599b9eb543cd95ca2315
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Sep 26 20:14:16 2025 +0200

    [VL] Delta: Add Delta Lake write unit test for Spark 3.5 + Delta 3.3 
(#10802)
---
 .../apache/spark/sql/delta/DeleteSQLSuite.scala    | 154 +++++
 .../apache/spark/sql/delta/DeleteSuiteBase.scala   | 565 ++++++++++++++++++
 .../spark/sql/delta/DeletionVectorsTestUtils.scala | 367 ++++++++++++
 .../sql/delta/DeltaColumnMappingTestUtils.scala    | 487 ++++++++++++++++
 .../apache/spark/sql/delta/DeltaTestUtils.scala    | 635 +++++++++++++++++++++
 .../test/DeltaColumnMappingSelectedTestMixin.scala |  76 +++
 .../sql/delta/test/DeltaExcludedTestMixin.scala    |  40 ++
 .../spark/sql/delta/test/DeltaSQLCommandTest.scala |  52 ++
 .../spark/sql/delta/test/DeltaSQLTestUtils.scala   |  79 +++
 .../spark/sql/delta/test/DeltaTestImplicits.scala  | 204 +++++++
 ...DeltaExcludedBySparkVersionTestMixinShims.scala |  45 ++
 11 files changed, 2704 insertions(+)

diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
new file mode 100644
index 0000000000..950c2269d7
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.{DeltaExcludedTestMixin, 
DeltaSQLCommandTest}
+
+import org.scalatest.Ignore
+
+// spotless:off
+class DeleteSQLSuite extends DeleteSuiteBase
+  with DeltaExcludedTestMixin
+  with DeltaSQLCommandTest {
+
+  import testImplicits._
+
+  override protected def executeDelete(target: String, where: String = null): 
Unit = {
+    val whereClause = Option(where).map(c => s"WHERE $c").getOrElse("")
+    sql(s"DELETE FROM $target $whereClause")
+  }
+
+  override def excluded: Seq[String] = super.excluded ++
+    Seq(
+      // FIXME: Excluded by Gluten as results are mismatch.
+      "test delete on temp view - nontrivial projection - SQL TempView",
+      "test delete on temp view - nontrivial projection - Dataset TempView"
+    )
+
+  // For EXPLAIN, which is not supported in OSS
+  test("explain") {
+    append(Seq((2, 2)).toDF("key", "value"))
+    val df = sql(s"EXPLAIN DELETE FROM delta.`$tempPath` WHERE key = 2")
+    val outputs = df.collect().map(_.mkString).mkString
+    assert(outputs.contains("Delta"))
+    assert(!outputs.contains("index") && !outputs.contains("ActionLog"))
+    // no change should be made by explain
+    checkAnswer(readDeltaTable(tempPath), Row(2, 2))
+  }
+
+  test("delete from a temp view") {
+    withTable("tab") {
+      withTempView("v") {
+        Seq((1, 1), (0, 3), (1, 5)).toDF("key", 
"value").write.format("delta").saveAsTable("tab")
+        spark.table("tab").as("name").createTempView("v")
+        sql("DELETE FROM v WHERE key = 1")
+        checkAnswer(spark.table("tab"), Row(0, 3))
+      }
+    }
+  }
+
+  test("delete from a SQL temp view") {
+    withTable("tab") {
+      withTempView("v") {
+        Seq((1, 1), (0, 3), (1, 5)).toDF("key", 
"value").write.format("delta").saveAsTable("tab")
+        sql("CREATE TEMP VIEW v AS SELECT * FROM tab")
+        sql("DELETE FROM v WHERE key = 1 AND VALUE = 5")
+        checkAnswer(spark.table("tab"), Seq(Row(1, 1), Row(0, 3)))
+      }
+    }
+  }
+
+  Seq(true, false).foreach { partitioned =>
+    test(s"User defined _change_type column doesn't get dropped - 
partitioned=$partitioned") {
+      withTable("tab") {
+        sql(
+          s"""CREATE TABLE tab USING DELTA
+             |${if (partitioned) "PARTITIONED BY (part) " else ""}
+             |TBLPROPERTIES (delta.enableChangeDataFeed = false)
+             |AS SELECT id, int(id / 10) AS part, 'foo' as _change_type
+             |FROM RANGE(1000)
+             |""".stripMargin)
+        val rowsToDelete = (1 to 1000 by 42).mkString("(", ", ", ")")
+        executeDelete("tab", s"id in $rowsToDelete")
+        sql("SELECT id, _change_type FROM tab").collect().foreach { row =>
+          val _change_type = row.getString(1)
+          assert(_change_type === "foo", s"Invalid _change_type for 
id=${row.get(0)}")
+        }
+      }
+    }
+  }
+}
+
+// FIXME: Enable the test.
+//  Skipping as function input_file_name doesn't get correctly resolved.
+@Ignore
+class DeleteSQLNameColumnMappingSuite extends DeleteSQLSuite
+  with DeltaColumnMappingEnableNameMode {
+
+  protected override def runOnlyTests: Seq[String] = Seq(true, false).map { 
isPartitioned =>
+    s"basic case - delete from a Delta table by name - 
Partition=$isPartitioned"
+  } ++ Seq(true, false).flatMap { isPartitioned =>
+    Seq(
+      s"where key columns - Partition=$isPartitioned",
+      s"where data columns - Partition=$isPartitioned")
+  }
+
+}
+
+class DeleteSQLWithDeletionVectorsSuite extends DeleteSQLSuite
+  with DeltaExcludedTestMixin
+  with DeletionVectorsTestUtils {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    enableDeletionVectors(spark, delete = true)
+    spark.conf.set(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key, 
"false")
+  }
+
+  override def excluded: Seq[String] = super.excluded ++
+    Seq(
+      // The following two tests must fail when DV is used. Covered by another 
test case:
+      // "throw error when non-pinned TahoeFileIndex snapshot is used".
+      "data and partition columns - Partition=true Skipping=false",
+      "data and partition columns - Partition=false Skipping=false",
+      // The scan schema contains additional row index filter columns.
+      "nested schema pruning on data condition",
+      // The number of records is not recomputed when using DVs
+      "delete throws error if number of records increases",
+      "delete logs error if number of records are missing in stats",
+      // FIXME: Excluded by Gluten as results are mismatch.
+      "test delete on temp view - nontrivial projection - SQL TempView",
+      "test delete on temp view - nontrivial projection - Dataset TempView"
+  )
+
+  // This works correctly with DVs, but fails in classic DELETE.
+  override def testSuperSetColsTempView(): Unit = {
+    testComplexTempViews("superset cols")(
+      text = "SELECT key, value, 1 FROM tab",
+      expectResult = Row(0, 3, 1) :: Nil)
+  }
+}
+
+class DeleteSQLWithDeletionVectorsAndPredicatePushdownSuite
+    extends DeleteSQLWithDeletionVectorsSuite {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark.conf.set(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key, 
"true")
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala
new file mode 100644
index 0000000000..8ab9510ff3
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.functions.{lit, struct}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+
+import shims.DeltaExcludedBySparkVersionTestMixinShims
+
+// spotless:off
+abstract class DeleteSuiteBase extends QueryTest
+  with SharedSparkSession
+  with DeltaDMLTestUtils
+  with DeltaTestUtilsForTempViews
+  with DeltaExcludedBySparkVersionTestMixinShims {
+
+  import testImplicits._
+
+  protected def executeDelete(target: String, where: String = null): Unit
+
+  protected def checkDelete(
+      condition: Option[String],
+      expectedResults: Seq[Row],
+      tableName: Option[String] = None): Unit = {
+    executeDelete(target = tableName.getOrElse(s"delta.`$tempPath`"), where = 
condition.orNull)
+    checkAnswer(readDeltaTable(tempPath), expectedResults)
+  }
+
+  Seq(true, false).foreach { isPartitioned =>
+    test(s"basic case - Partition=$isPartitioned") {
+      val partitions = if (isPartitioned) "key" :: Nil else Nil
+      append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), 
partitions)
+
+      checkDelete(condition = None, Nil)
+    }
+  }
+
+  Seq(true, false).foreach { isPartitioned =>
+    test(s"basic case - delete from a Delta table by path - 
Partition=$isPartitioned") {
+      withTable("deltaTable") {
+        val partitions = if (isPartitioned) "key" :: Nil else Nil
+        val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+        append(input, partitions)
+
+        checkDelete(Some("value = 4 and key = 3"),
+          Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
+        checkDelete(Some("value = 4 and key = 1"),
+          Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil)
+        checkDelete(Some("value = 2 or key = 1"),
+          Row(0, 3) :: Nil)
+        checkDelete(Some("key = 0 or value = 99"), Nil)
+      }
+    }
+  }
+
+  Seq(true, false).foreach { isPartitioned =>
+    test(s"basic case - delete from a Delta table by name - 
Partition=$isPartitioned") {
+      withTable("delta_table") {
+        val partitionByClause = if (isPartitioned) "PARTITIONED BY (key)" else 
""
+        sql(
+          s"""
+             |CREATE TABLE delta_table(key INT, value INT)
+             |USING delta
+             |OPTIONS('path'='$tempPath')
+             |$partitionByClause
+           """.stripMargin)
+
+        val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+        append(input)
+
+        checkDelete(Some("value = 4 and key = 3"),
+          Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil,
+          Some("delta_table"))
+        checkDelete(Some("value = 4 and key = 1"),
+          Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil,
+          Some("delta_table"))
+        checkDelete(Some("value = 2 or key = 1"),
+          Row(0, 3) :: Nil,
+          Some("delta_table"))
+        checkDelete(Some("key = 0 or value = 99"),
+          Nil,
+          Some("delta_table"))
+      }
+    }
+  }
+
+  Seq(true, false).foreach { isPartitioned =>
+    test(s"basic key columns - Partition=$isPartitioned") {
+      val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+      val partitions = if (isPartitioned) "key" :: Nil else Nil
+      append(input, partitions)
+
+      checkDelete(Some("key > 2"), Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: 
Row(0, 3) :: Nil)
+      checkDelete(Some("key < 2"), Row(2, 2) :: Nil)
+      checkDelete(Some("key = 2"), Nil)
+    }
+  }
+
+  Seq(true, false).foreach { isPartitioned =>
+    test(s"where key columns - Partition=$isPartitioned") {
+      val partitions = if (isPartitioned) "key" :: Nil else Nil
+      append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), 
partitions)
+
+      checkDelete(Some("key = 1"), Row(2, 2) :: Row(0, 3) :: Nil)
+      checkDelete(Some("key = 2"), Row(0, 3) :: Nil)
+      checkDelete(Some("key = 0"), Nil)
+    }
+  }
+
+  Seq(true, false).foreach { isPartitioned =>
+    test(s"where data columns - Partition=$isPartitioned") {
+      val partitions = if (isPartitioned) "key" :: Nil else Nil
+      append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), 
partitions)
+
+      checkDelete(Some("value <= 2"), Row(1, 4) :: Row(0, 3) :: Nil)
+      checkDelete(Some("value = 3"), Row(1, 4) :: Nil)
+      checkDelete(Some("value != 0"), Nil)
+    }
+  }
+
+  test("where data columns and partition columns") {
+    val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+    append(input, Seq("key"))
+
+    checkDelete(Some("value = 4 and key = 3"),
+      Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
+    checkDelete(Some("value = 4 and key = 1"),
+      Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil)
+    checkDelete(Some("value = 2 or key = 1"),
+      Row(0, 3) :: Nil)
+    checkDelete(Some("key = 0 or value = 99"),
+      Nil)
+  }
+
+  Seq(true, false).foreach { skippingEnabled =>
+    Seq(true, false).foreach { isPartitioned =>
+      test(s"data and partition columns - Partition=$isPartitioned 
Skipping=$skippingEnabled") {
+        withSQLConf(DeltaSQLConf.DELTA_STATS_SKIPPING.key -> 
skippingEnabled.toString) {
+          val partitions = if (isPartitioned) "key" :: Nil else Nil
+          val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+          append(input, partitions)
+
+          checkDelete(Some("value = 4 and key = 3"),
+            Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
+          checkDelete(Some("value = 4 and key = 1"),
+            Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil)
+          checkDelete(Some("value = 2 or key = 1"),
+            Row(0, 3) :: Nil)
+          checkDelete(Some("key = 0 or value = 99"),
+            Nil)
+        }
+      }
+    }
+  }
+
+  test("Negative case - non-Delta target") {
+    Seq((1, 1), (0, 3), (1, 5)).toDF("key1", "value")
+      .write.format("parquet").mode("append").save(tempPath)
+    val e = intercept[DeltaAnalysisException] {
+      executeDelete(target = s"delta.`$tempPath`")
+    }.getMessage
+    assert(e.contains("DELETE destination only supports Delta sources") ||
+      e.contains("is not a Delta table") || e.contains("doesn't exist") ||
+      e.contains("Incompatible format"))
+  }
+
+  test("Negative case - non-deterministic condition") {
+    append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"))
+    val e = intercept[AnalysisException] {
+      executeDelete(target = s"delta.`$tempPath`", where = "rand() > 0.5")
+    }.getMessage
+    assert(e.contains("nondeterministic expressions are only allowed in") ||
+      e.contains("The operator expects a deterministic expression"))
+  }
+
+  test("Negative case - DELETE the child directory") {
+    append(Seq((2, 2), (3, 2)).toDF("key", "value"), partitionBy = "key" :: 
Nil)
+    val e = intercept[AnalysisException] {
+      executeDelete(target = s"delta.`$tempPath/key=2`", where = "value = 2")
+    }.getMessage
+    assert(e.contains("Expect a full scan of Delta sources, but found a 
partial scan"))
+  }
+
+  test("delete cached table by name") {
+    withTable("cached_delta_table") {
+      Seq((2, 2), (1, 4)).toDF("key", "value")
+        .write.format("delta").saveAsTable("cached_delta_table")
+
+      spark.table("cached_delta_table").cache()
+      spark.table("cached_delta_table").collect()
+      executeDelete(target = "cached_delta_table", where = "key = 2")
+      checkAnswer(spark.table("cached_delta_table"), Row(1, 4) :: Nil)
+    }
+  }
+
+  test("delete cached table by path") {
+    Seq((2, 2), (1, 4)).toDF("key", "value")
+      .write.mode("overwrite").format("delta").save(tempPath)
+    spark.read.format("delta").load(tempPath).cache()
+    spark.read.format("delta").load(tempPath).collect()
+    executeDelete(s"delta.`$tempPath`", where = "key = 2")
+    checkAnswer(spark.read.format("delta").load(tempPath), Row(1, 4) :: Nil)
+  }
+
+  Seq(true, false).foreach { isPartitioned =>
+    test(s"condition having current_date - Partition=$isPartitioned") {
+      val partitions = if (isPartitioned) "key" :: Nil else Nil
+      append(
+        Seq((java.sql.Date.valueOf("1969-12-31"), 2),
+          (java.sql.Date.valueOf("2099-12-31"), 4))
+          .toDF("key", "value"), partitions)
+
+      checkDelete(Some("CURRENT_DATE > key"),
+        Row(java.sql.Date.valueOf("2099-12-31"), 4) :: Nil)
+      checkDelete(Some("CURRENT_DATE <= key"), Nil)
+    }
+  }
+
+  test("condition having current_timestamp - Partition by Timestamp") {
+    append(
+      Seq((java.sql.Timestamp.valueOf("2012-12-31 16:00:10.011"), 2),
+        (java.sql.Timestamp.valueOf("2099-12-31 16:00:10.011"), 4))
+        .toDF("key", "value"), Seq("key"))
+
+    checkDelete(Some("CURRENT_TIMESTAMP > key"),
+      Row(java.sql.Timestamp.valueOf("2099-12-31 16:00:10.011"), 4) :: Nil)
+    checkDelete(Some("CURRENT_TIMESTAMP <= key"), Nil)
+  }
+
+  Seq(true, false).foreach { isPartitioned =>
+    test(s"foldable condition - Partition=$isPartitioned") {
+      val partitions = if (isPartitioned) "key" :: Nil else Nil
+      append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), 
partitions)
+
+      val allRows = Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil
+
+      checkDelete(Some("false"), allRows)
+      checkDelete(Some("1 <> 1"), allRows)
+      checkDelete(Some("1 > null"), allRows)
+      checkDelete(Some("true"), Nil)
+      checkDelete(Some("1 = 1"), Nil)
+    }
+  }
+
+  test("SC-12232: should not delete the rows where condition evaluates to 
null") {
+    append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key", 
"value").coalesce(1))
+
+    // "null = null" evaluates to null
+    checkDelete(Some("value = null"),
+      Row("a", null) :: Row("b", null) :: Row("c", "v") :: Row("d", "vv") :: 
Nil)
+
+    // these expressions evaluate to null when value is null
+    checkDelete(Some("value = 'v'"),
+      Row("a", null) :: Row("b", null) :: Row("d", "vv") :: Nil)
+    checkDelete(Some("value <> 'v'"),
+      Row("a", null) :: Row("b", null) :: Nil)
+  }
+
+  test("SC-12232: delete rows with null values using isNull") {
+    append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key", 
"value").coalesce(1))
+
+    // when value is null, this expression evaluates to true
+    checkDelete(Some("value is null"),
+      Row("c", "v") :: Row("d", "vv") :: Nil)
+  }
+
+  test("SC-12232: delete rows with null values using EqualNullSafe") {
+    append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key", 
"value").coalesce(1))
+
+    // when value is null, this expression evaluates to true
+    checkDelete(Some("value <=> null"),
+      Row("c", "v") :: Row("d", "vv") :: Nil)
+  }
+
+  test("do not support subquery test") {
+    append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"))
+    Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("c", 
"d").createOrReplaceTempView("source")
+
+    // basic subquery
+    val e0 = intercept[AnalysisException] {
+      executeDelete(target = s"delta.`$tempPath`", "key < (SELECT max(c) FROM 
source)")
+    }.getMessage
+    assert(e0.contains("Subqueries are not supported"))
+
+    // subquery with EXISTS
+    val e1 = intercept[AnalysisException] {
+      executeDelete(target = s"delta.`$tempPath`", "EXISTS (SELECT max(c) FROM 
source)")
+    }.getMessage
+    assert(e1.contains("Subqueries are not supported"))
+
+    // subquery with NOT EXISTS
+    val e2 = intercept[AnalysisException] {
+      executeDelete(target = s"delta.`$tempPath`", "NOT EXISTS (SELECT max(c) 
FROM source)")
+    }.getMessage
+    assert(e2.contains("Subqueries are not supported"))
+
+    // subquery with IN
+    val e3 = intercept[AnalysisException] {
+      executeDelete(target = s"delta.`$tempPath`", "key IN (SELECT max(c) FROM 
source)")
+    }.getMessage
+    assert(e3.contains("Subqueries are not supported"))
+
+    // subquery with NOT IN
+    val e4 = intercept[AnalysisException] {
+      executeDelete(target = s"delta.`$tempPath`", "key NOT IN (SELECT max(c) 
FROM source)")
+    }.getMessage
+    assert(e4.contains("Subqueries are not supported"))
+  }
+
+  test("schema pruning on data condition") {
+    val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+    append(input, Nil)
+    // Start from a cached snapshot state
+    deltaLog.update().stateDF
+
+    val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
+      checkDelete(Some("key = 2"),
+        Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
+    }
+
+    val scans = executedPlans.flatMap(_.collect {
+      case f: FileSourceScanExec => f
+    })
+
+    // The first scan is for finding files to delete. We only are matching 
against the key
+    // so that should be the only field in the schema
+    assert(scans.head.schema.findNestedField(Seq("key")).nonEmpty)
+    assert(scans.head.schema.findNestedField(Seq("value")).isEmpty)
+  }
+
+
+  test("nested schema pruning on data condition") {
+    val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+      .select(struct("key", "value").alias("nested"))
+    append(input, Nil)
+    // Start from a cached snapshot state
+    deltaLog.update().stateDF
+
+    val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
+      checkDelete(Some("nested.key = 2"),
+        Row(Row(1, 4)) :: Row(Row(1, 1)) :: Row(Row(0, 3)) :: Nil)
+    }
+
+    val scans = executedPlans.flatMap(_.collect {
+      case f: FileSourceScanExec => f
+    })
+
+    assert(scans.head.schema == StructType.fromDDL("nested STRUCT<key: int>"))
+  }
+
+  /**
+   * @param function the unsupported function.
+   * @param functionType The type of the unsupported expression to be tested.
+   * @param data the data in the table.
+   * @param where the where clause containing the unsupported expression.
+   * @param expectException whether an exception is expected to be thrown
+   * @param customErrorRegex customized error regex.
+   */
+  def testUnsupportedExpression(
+      function: String,
+      functionType: String,
+      data: => DataFrame,
+      where: String,
+      expectException: Boolean,
+      customErrorRegex: Option[String] = None): Unit = {
+    test(s"$functionType functions in delete - expect exception: 
$expectException") {
+      withTable("deltaTable") {
+        data.write.format("delta").saveAsTable("deltaTable")
+
+        val expectedErrorRegex = "(?s).*(?i)unsupported.*(?i).*Invalid 
expressions.*"
+
+        var catchException = true
+
+        var errorRegex = if (functionType.equals("Generate")) {
+          ".*Subqueries are not supported in the DELETE.*"
+        } else customErrorRegex.getOrElse(expectedErrorRegex)
+
+
+        if (catchException) {
+          val dataBeforeException = 
spark.read.format("delta").table("deltaTable").collect()
+          val e = intercept[Exception] {
+            executeDelete(target = "deltaTable", where = where)
+          }
+          val message = if (e.getCause != null) {
+            e.getCause.getMessage
+          } else e.getMessage
+          assert(message.matches(errorRegex))
+          checkAnswer(spark.read.format("delta").table("deltaTable"), 
dataBeforeException)
+        } else {
+          executeDelete(target = "deltaTable", where = where)
+        }
+      }
+    }
+  }
+
+  testUnsupportedExpression(
+    function = "row_number",
+    functionType = "Window",
+    data = Seq((2, 2), (1, 4)).toDF("key", "value"),
+    where = "row_number() over (order by value) > 1",
+    expectException = true
+  )
+
+  testUnsupportedExpression(
+    function = "max",
+    functionType = "Aggregate",
+    data = Seq((2, 2), (1, 4)).toDF("key", "value"),
+    where = "key > max(value)",
+    expectException = true
+  )
+
+  // Explode functions are supported in where if only one row generated.
+  testUnsupportedExpression(
+    function = "explode",
+    functionType = "Generate",
+    data = Seq((2, List(2))).toDF("key", "value"),
+    where = "key = (select explode(value) from deltaTable)",
+    expectException = false // generate only one row, no exception.
+  )
+
+  // Explode functions are supported in where but if there's more than one row 
generated,
+  // it will throw an exception.
+  testUnsupportedExpression(
+    function = "explode",
+    functionType = "Generate",
+    data = Seq((2, List(2)), (1, List(4, 5))).toDF("key", "value"),
+    where = "key = (select explode(value) from deltaTable)",
+    expectException = true, // generate more than one row. Exception expected.
+    customErrorRegex =
+      Some(".*More than one row returned by a subquery used as an 
expression(?s).*")
+  )
+
+  Seq(true, false).foreach { isPartitioned =>
+    val name = s"test delete on temp view - basic - Partition=$isPartitioned"
+    testWithTempView(name) { isSQLTempView =>
+      val partitions = if (isPartitioned) "key" :: Nil else Nil
+      append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"), 
partitions)
+      createTempViewFromTable(s"delta.`$tempPath`", isSQLTempView)
+        checkDelete(
+          condition = Some("key <= 1"),
+          expectedResults = Row(2, 2) :: Nil,
+          tableName = Some("v"))
+    }
+  }
+
+  protected def testInvalidTempViews(name: String)(
+      text: String,
+      expectedErrorMsgForSQLTempView: String = null,
+      expectedErrorMsgForDataSetTempView: String = null,
+      expectedErrorClassForSQLTempView: String = null,
+      expectedErrorClassForDataSetTempView: String = null): Unit = {
+    testWithTempView(s"test delete on temp view - $name") { isSQLTempView =>
+      withTable("tab") {
+        Seq((0, 3), (1, 2)).toDF("key", 
"value").write.format("delta").saveAsTable("tab")
+        if (isSQLTempView) {
+          sql(s"CREATE TEMP VIEW v AS $text")
+        } else {
+          sql(text).createOrReplaceTempView("v")
+        }
+        val ex = intercept[AnalysisException] {
+          executeDelete(
+            "v",
+            "key >= 1 and value < 3"
+          )
+        }
+        testErrorMessageAndClass(
+          isSQLTempView,
+          ex,
+          expectedErrorMsgForSQLTempView,
+          expectedErrorMsgForDataSetTempView,
+          expectedErrorClassForSQLTempView,
+          expectedErrorClassForDataSetTempView)
+      }
+    }
+  }
+  testInvalidTempViews("subset cols")(
+    text = "SELECT key FROM tab",
+    expectedErrorClassForSQLTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+    expectedErrorClassForDataSetTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION"
+  )
+
+  // Need to be able to override this, because it works in some configurations.
+  protected def testSuperSetColsTempView(): Unit = {
+    testInvalidTempViews("superset cols")(
+      text = "SELECT key, value, 1 FROM tab",
+      // The analyzer can't tell whether the table originally had the extra 
column or not.
+      expectedErrorMsgForSQLTempView = "Can't resolve column 1 in root",
+      expectedErrorMsgForDataSetTempView = "Can't resolve column 1 in root"
+    )
+  }
+
+  testSuperSetColsTempView()
+
+  protected def testComplexTempViews(name: String)(
+      text: String,
+      expectResult: Seq[Row]): Unit = {
+    testWithTempView(s"test delete on temp view - $name") { isSQLTempView =>
+        withTable("tab") {
+          Seq((0, 3), (1, 2)).toDF("key", 
"value").write.format("delta").saveAsTable("tab")
+          createTempViewFromSelect(text, isSQLTempView)
+          executeDelete(
+            "v",
+            "key >= 1 and value < 3"
+          )
+          checkAnswer(spark.read.format("delta").table("v"), expectResult)
+        }
+    }
+  }
+
+  testComplexTempViews("nontrivial projection")(
+    text = "SELECT value as key, key as value FROM tab",
+    expectResult = Row(3, 0) :: Nil
+  )
+
+  testComplexTempViews("view with too many internal aliases")(
+    text = "SELECT * FROM (SELECT * FROM tab AS t1) AS t2",
+    expectResult = Row(0, 3) :: Nil
+  )
+
+  testSparkMasterOnly("Variant type") {
+    val dstDf = sql(
+      """SELECT parse_json(cast(id as string)) v, id i
+      FROM range(3)""")
+    append(dstDf)
+
+    executeDelete(target = s"delta.`$tempPath`", where = "to_json(v) = '1'")
+
+    checkAnswer(readDeltaTable(tempPath).selectExpr("i", "to_json(v)"),
+      Seq(Row(0, "0"), Row(2, "2")))
+  }
+
+  test("delete on partitioned table with special chars") {
+    val partValue = "part%one"
+    spark.range(0, 3, 1, 1).toDF("key").withColumn("value", lit(partValue))
+      .write.format("delta").partitionBy("value").save(tempPath)
+    checkDelete(
+      condition = Some(s"value = '$partValue' and key = 1"),
+      expectedResults = Row(0, partValue) :: Row(2, partValue) :: Nil)
+    checkDelete(
+      condition = Some(s"value = '$partValue' and key = 2"),
+      expectedResults = Row(0, partValue) :: Nil)
+    checkDelete(
+      condition = Some(s"value = '$partValue'"),
+      expectedResults = Nil)
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala
new file mode 100644
index 0000000000..5bb022c12d
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.{DataFrame, QueryTest, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.delta.DeltaOperations.Truncate
+import org.apache.spark.sql.delta.actions.{Action, AddFile, 
DeletionVectorDescriptor, RemoveFile}
+import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, 
RoaringBitmapArrayFormat}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
+import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
+import org.apache.spark.sql.delta.util.PathWithFileSystem
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.test.SharedSparkSession
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
+
+import java.io.File
+import java.util.UUID
+
+// spotless:off
+/** Collection of test utilities related with persistent Deletion Vectors. */
+trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession with 
DeltaSQLTestUtils {
+
+  def enableDeletionVectors(
+      spark: SparkSession,
+      delete: Boolean = false,
+      update: Boolean = false,
+      merge: Boolean = false): Unit = {
+    val global = delete || update || merge
+    spark.conf
+      
.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, 
global.toString)
+    spark.conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key, 
delete.toString)
+    spark.conf.set(DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key, 
update.toString)
+    spark.conf.set(DeltaSQLConf.MERGE_USE_PERSISTENT_DELETION_VECTORS.key, 
merge.toString)
+  }
+
+  def enableDeletionVectorsForAllSupportedOperations(spark: SparkSession): 
Unit =
+    enableDeletionVectors(spark, delete = true, update = true)
+
+  def testWithDVs(testName: String, testTags: org.scalatest.Tag*)(thunk: => 
Unit): Unit = {
+    test(testName, testTags : _*) {
+      withDeletionVectorsEnabled() {
+        thunk
+      }
+    }
+  }
+
+  /** Run a thunk with Deletion Vectors enabled/disabled. */
+  def withDeletionVectorsEnabled(enabled: Boolean = true)(thunk: => Unit): 
Unit = {
+    val enabledStr = enabled.toString
+    withSQLConf(
+      DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> 
enabledStr,
+      DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr,
+      DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr,
+      DeltaSQLConf.MERGE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr) {
+      thunk
+    }
+  }
+
+  /** Helper to run 'fn' with a temporary Delta table. */
+  def withTempDeltaTable(
+      dataDF: DataFrame,
+      partitionBy: Seq[String] = Seq.empty,
+      enableDVs: Boolean = true,
+      conf: Seq[(String, String)] = Nil)
+      (fn: (() => io.delta.tables.DeltaTable, DeltaLog) => Unit): Unit = {
+    withTempPath { path =>
+      val tablePath = new Path(path.getAbsolutePath)
+      withSQLConf(conf: _*) {
+        dataDF.write
+          .option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, 
enableDVs.toString)
+          .partitionBy(partitionBy: _*)
+          .format("delta")
+          .save(tablePath.toString)
+      }
+      // DeltaTable hangs on to the DataFrame it is created with for the 
entire object lifetime.
+      // That means subsequent `targetTable.toDF` calls will return the same 
snapshot.
+      // The DV tests are generally written assuming `targetTable.toDF` would 
return a new snapshot.
+      // So create a function here instead of a n instance, so 
`targetTable().toDF`
+      // will actually provide a new snapshot.
+      val targetTable =
+        () => io.delta.tables.DeltaTable.forPath(tablePath.toString)
+      val targetLog = DeltaLog.forTable(spark, tablePath)
+      fn(targetTable, targetLog)
+    }
+  }
+
+  /** Create a temp path which contains special characters. */
+  override def withTempPath(f: File => Unit): Unit = {
+    super.withTempPath(prefix = "s p a r k %2a")(f)
+  }
+
+  /** Create a temp path which contains special characters. */
+  override protected def withTempDir(f: File => Unit): Unit = {
+    super.withTempDir(prefix = "s p a r k %2a")(f)
+  }
+
+  /** Helper that verifies whether a defined number of DVs exist */
+  def verifyDVsExist(targetLog: DeltaLog, filesWithDVsSize: Int): Unit = {
+    val filesWithDVs = getFilesWithDeletionVectors(targetLog)
+    assert(filesWithDVs.size === filesWithDVsSize)
+    assertDeletionVectorsExist(targetLog, filesWithDVs)
+  }
+
+  /** Returns all [[AddFile]] actions of a Delta table that contain Deletion 
Vectors. */
+  def getFilesWithDeletionVectors(log: DeltaLog): Seq[AddFile] =
+    log.update().allFiles.collect().filter(_.deletionVector != null).toSeq
+
+  /** Lists the Deletion Vectors files of a table. */
+  def listDeletionVectors(log: DeltaLog): Seq[File] = {
+    val dir = new File(log.dataPath.toUri.getPath)
+    dir.listFiles().filter(_.getName.startsWith(
+      DeletionVectorDescriptor.DELETION_VECTOR_FILE_NAME_CORE))
+  }
+
+  /** Helper to check that the Deletion Vectors of the provided file actions 
exist on disk. */
+  def assertDeletionVectorsExist(log: DeltaLog, filesWithDVs: Seq[AddFile]): 
Unit = {
+    val tablePath = new Path(log.dataPath.toUri.getPath)
+    for (file <- filesWithDVs) {
+      val dv = file.deletionVector
+      assert(dv != null)
+      assert(dv.isOnDisk && !dv.isInline)
+      assert(dv.offset.isDefined)
+
+      // Check that DV exists.
+      val dvPath = dv.absolutePath(tablePath)
+      assert(new File(dvPath.toString).exists(), s"DV not found $dvPath")
+
+      // Check that cardinality is correct.
+      val bitmap = newDVStore.read(dvPath, dv.offset.get, dv.sizeInBytes)
+      assert(dv.cardinality === bitmap.cardinality)
+    }
+  }
+
+  /** Enable persistent deletion vectors in new Delta tables. */
+  def enableDeletionVectorsInNewTables(conf: RuntimeConfig): Unit =
+    
conf.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey, 
"true")
+
+  /** Enable persistent Deletion Vectors in a Delta table. */
+  def enableDeletionVectorsInTable(tablePath: Path, enable: Boolean): Unit =
+    spark.sql(
+      s"""ALTER TABLE delta.`$tablePath`
+         |SET TBLPROPERTIES 
('${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = '$enable')
+         |""".stripMargin)
+
+  /** Enable persistent Deletion Vectors in a Delta table. */
+  def enableDeletionVectorsInTable(deltaLog: DeltaLog, enable: Boolean = 
true): Unit =
+    enableDeletionVectorsInTable(deltaLog.dataPath, enable)
+
+  /** Enable persistent deletion vectors in new tables and DELETE DML 
commands. */
+  def enableDeletionVectors(conf: RuntimeConfig): Unit = {
+    enableDeletionVectorsInNewTables(conf)
+    conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key, "true")
+  }
+
+  // ======== HELPER METHODS TO WRITE DVs ==========
+  /** Helper method to remove the specified rows in the given file using DVs */
+  protected def removeRowsFromFileUsingDV(
+      log: DeltaLog,
+      addFile: AddFile,
+      rowIds: Seq[Long]): Seq[Action] = {
+    val dv = RoaringBitmapArray(rowIds: _*)
+    writeFileWithDV(log, addFile, dv)
+  }
+
+  /** Utility method to remove a ratio of rows from the given file */
+  protected def deleteRows(
+      log: DeltaLog, file: AddFile, approxPhyRows: Long, ratioOfRowsToDelete: 
Double): Unit = {
+    val numRowsToDelete =
+      Math.ceil(ratioOfRowsToDelete * 
file.numPhysicalRecords.getOrElse(approxPhyRows)).toInt
+    removeRowsFromFile(log, file, Seq.range(0, numRowsToDelete))
+  }
+
+  /** Utility method to remove the given rows from the given file using DVs */
+  protected def removeRowsFromFile(
+      log: DeltaLog, addFile: AddFile, rowIndexesToRemove: Seq[Long]): Unit = {
+    val txn = log.startTransaction()
+    val actions = removeRowsFromFileUsingDV(log, addFile, rowIndexesToRemove)
+    txn.commit(actions, Truncate())
+  }
+
+  protected def getFileActionsInLastVersion(log: DeltaLog): (Seq[AddFile], 
Seq[RemoveFile]) = {
+    val version = log.update().version
+    val allFiles = log.getChanges(version).toSeq.head._2
+    val add = allFiles.collect { case a: AddFile => a }
+    val remove = allFiles.collect { case r: RemoveFile => r }
+    (add, remove)
+  }
+
+  protected def serializeRoaringBitmapArrayWithDefaultFormat(
+      dv: RoaringBitmapArray): Array[Byte] = {
+    val serializationFormat = RoaringBitmapArrayFormat.Portable
+    dv.serializeAsByteArray(serializationFormat)
+  }
+
+  /**
+   * Produce a new [[AddFile]] that will store `dv` in the log using default 
settings for choosing
+   * inline or on-disk storage.
+   *
+   * Also returns the corresponding [[RemoveFile]] action for `currentFile`.
+   *
+   * TODO: Always on-disk for now. Inline support comes later.
+   */
+  protected def writeFileWithDV(
+      log: DeltaLog,
+      currentFile: AddFile,
+      dv: RoaringBitmapArray): Seq[Action] = {
+    writeFileWithDVOnDisk(log, currentFile, dv)
+  }
+
+  /** Name of the partition column used by [[createTestDF()]]. */
+  val PARTITION_COL = "partitionColumn"
+
+  def createTestDF(
+    start: Long,
+    end: Long,
+    numFiles: Int,
+    partitionColumn: Option[Int] = None): DataFrame = {
+    val df = spark.range(start, end, 1, numFiles).withColumn("v", col("id"))
+    if (partitionColumn.isEmpty) {
+      df
+    } else {
+      df.withColumn(PARTITION_COL, lit(partitionColumn.get))
+    }
+  }
+
+  /**
+   * Produce a new [[AddFile]] that will reference the `dv` in the log while 
storing it on-disk.
+   *
+   * Also returns the corresponding [[RemoveFile]] action for `currentFile`.
+   */
+  protected def writeFileWithDVOnDisk(
+      log: DeltaLog,
+      currentFile: AddFile,
+      dv: RoaringBitmapArray): Seq[Action] = writeFilesWithDVsOnDisk(log, 
Seq((currentFile, dv)))
+
+  protected def withDVWriter[T](
+      log: DeltaLog,
+      dvFileID: UUID)(fn: DeletionVectorStore.Writer => T): T = {
+    val dvStore = newDVStore
+    // scalastyle:off deltahadoopconfiguration
+    val conf = spark.sessionState.newHadoopConf()
+    // scalastyle:on deltahadoopconfiguration
+    val tableWithFS = PathWithFileSystem.withConf(log.dataPath, conf)
+    val dvPath =
+      
DeletionVectorStore.assembleDeletionVectorPathWithFileSystem(tableWithFS, 
dvFileID)
+    val writer = dvStore.createWriter(dvPath)
+    try {
+      fn(writer)
+    } finally {
+      writer.close()
+    }
+  }
+
+  /**
+   * Produce new [[AddFile]] actions that will reference associated DVs in the 
log while storing
+   * all DVs in the same file on-disk.
+   *
+   * Also returns the corresponding [[RemoveFile]] actions for the original 
file entries.
+   */
+  protected def writeFilesWithDVsOnDisk(
+      log: DeltaLog,
+      filesWithDVs: Seq[(AddFile, RoaringBitmapArray)]): Seq[Action] = {
+    val dvFileId = UUID.randomUUID()
+    withDVWriter(log, dvFileId) { writer =>
+      filesWithDVs.flatMap { case (currentFile, dv) =>
+        val range = 
writer.write(serializeRoaringBitmapArrayWithDefaultFormat(dv))
+        val dvData = DeletionVectorDescriptor.onDiskWithRelativePath(
+          id = dvFileId,
+          sizeInBytes = range.length,
+          cardinality = dv.cardinality,
+          offset = Some(range.offset))
+        val (add, remove) = currentFile.removeRows(
+          dvData,
+          updateStats = true
+        )
+        Seq(add, remove)
+      }
+    }
+  }
+
+  /**
+   * Removes the `numRowsToRemovePerFile` from each file via DV.
+   * Returns the total number of rows removed.
+   */
+  protected def removeRowsFromAllFilesInLog(
+      log: DeltaLog,
+      numRowsToRemovePerFile: Long): Long = {
+    var numFiles: Option[Int] = None
+    // This is needed to make the manual commit work correctly, since we are 
not actually
+    // running a command that produces metrics.
+    withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "false") {
+      val txn = log.startTransaction()
+      val allAddFiles = txn.snapshot.allFiles.collect()
+      numFiles = Some(allAddFiles.length)
+      val bitmap = RoaringBitmapArray(0L until numRowsToRemovePerFile: _*)
+      val actions = allAddFiles.flatMap { file =>
+        if (file.numPhysicalRecords.isDefined) {
+          // Only when stats are enabled. Can't check when stats are disabled
+          assert(file.numPhysicalRecords.get > numRowsToRemovePerFile)
+        }
+        writeFileWithDV(log, file, bitmap)
+      }
+      txn.commit(actions, DeltaOperations.Delete(predicate = Seq.empty))
+    }
+    numFiles.get * numRowsToRemovePerFile
+  }
+
+  def newDVStore(): DeletionVectorStore = {
+    // scalastyle:off deltahadoopconfiguration
+    DeletionVectorStore.createInstance(spark.sessionState.newHadoopConf())
+    // scalastyle:on deltahadoopconfiguration
+  }
+
+  /**
+   * Updates an [[AddFile]] with a [[DeletionVectorDescriptor]].
+   */
+  protected def updateFileDV(
+      addFile: AddFile,
+      dvDescriptor: DeletionVectorDescriptor): (AddFile, RemoveFile) = {
+    addFile.removeRows(
+      dvDescriptor,
+      updateStats = true
+    )
+  }
+
+  /** Delete the DV file in the given [[AddFile]]. Assumes the [[AddFile]] has 
a valid DV. */
+  protected def deleteDVFile(tablePath: String, addFile: AddFile): Unit = {
+    assert(addFile.deletionVector != null)
+    val dvPath = addFile.deletionVector.absolutePath(new Path(tablePath))
+    FileUtils.delete(new File(dvPath.toString))
+  }
+
+  /**
+   * Creates a [[DeletionVectorDescriptor]] from an [[RoaringBitmapArray]]
+   */
+  protected def writeDV(
+      log: DeltaLog,
+      bitmapArray: RoaringBitmapArray): DeletionVectorDescriptor = {
+    val dvFileId = UUID.randomUUID()
+    withDVWriter(log, dvFileId) { writer =>
+      val range = 
writer.write(serializeRoaringBitmapArrayWithDefaultFormat(bitmapArray))
+      DeletionVectorDescriptor.onDiskWithRelativePath(
+        id = dvFileId,
+        sizeInBytes = range.length,
+        cardinality = bitmapArray.cardinality,
+        offset = Some(range.offset))
+    }
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala
new file mode 100644
index 0000000000..68c47b42bb
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Column, Dataset}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
+import org.apache.spark.sql.delta.schema.SchemaUtils
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
+
+import org.apache.hadoop.fs.Path
+
+import java.io.File
+
+import scala.collection.mutable
+
+// spotless:off
+trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession {
+
+  import testImplicits._
+
+  protected def columnMappingMode: String = NoMapping.name
+
+  private val PHYSICAL_NAME_REGEX =
+    
"col-[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r
+
+  implicit class PhysicalNameString(s: String) {
+    def phy(deltaLog: DeltaLog): String = {
+      PHYSICAL_NAME_REGEX
+        .findFirstIn(s)
+        .getOrElse(getPhysicalName(s, deltaLog))
+    }
+  }
+
+  protected def columnMappingEnabled: Boolean = {
+    columnMappingModeString != "none"
+  }
+
+  protected def columnMappingModeString: String = {
+    
spark.conf.getOption(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey)
+      .getOrElse("none")
+  }
+
+  /**
+   * Check if two schemas are equal ignoring column mapping metadata
+   * @param schema1 Schema
+   * @param schema2 Schema
+   */
+  protected def assertEqual(schema1: StructType, schema2: StructType): Unit = {
+    if (columnMappingEnabled) {
+      assert(
+        DeltaColumnMapping.dropColumnMappingMetadata(schema1) ==
+        DeltaColumnMapping.dropColumnMappingMetadata(schema2)
+      )
+    } else {
+      assert(schema1 == schema2)
+    }
+  }
+
+  /**
+   * Check if two table configurations are equal ignoring column mapping 
metadata
+   * @param config1 Table config
+   * @param config2 Table config
+   */
+  protected def assertEqual(
+      config1: Map[String, String],
+      config2: Map[String, String]): Unit = {
+    if (columnMappingEnabled) {
+      assert(dropColumnMappingConfigurations(config1) == 
dropColumnMappingConfigurations(config2))
+    } else {
+      assert(config1 == config2)
+    }
+  }
+
+  /**
+   * Check if a partition with specific values exists.
+   * Handles both column mapped and non-mapped cases
+   * @param partCol Partition column name
+   * @param partValue Partition value
+   * @param deltaLog DeltaLog
+   */
+  protected def assertPartitionWithValueExists(
+      partCol: String,
+      partValue: String,
+      deltaLog: DeltaLog): Unit = {
+    assert(getPartitionFilePathsWithValue(partCol, partValue, 
deltaLog).nonEmpty)
+  }
+
+  /**
+   * Assert partition exists in an array of set of partition names/paths
+   * @param partCol Partition column name
+   * @param deltaLog Delta log
+   * @param inputFiles Input files to scan for DF
+   */
+  protected def assertPartitionExists(
+      partCol: String,
+      deltaLog: DeltaLog,
+      inputFiles: Array[String]): Unit = {
+    val physicalName = partCol.phy(deltaLog)
+    val allFiles = deltaLog.update().allFiles.collect()
+    // NOTE: inputFiles are *not* URL-encoded.
+    val filesWithPartitions = inputFiles.map { f =>
+      allFiles.filter { af =>
+        f.contains(af.toPath.toString)
+      }.flatMap(_.partitionValues.keys).toSet
+    }
+    assert(filesWithPartitions.forall(p => p.count(_ == physicalName) > 0))
+    // for non-column mapped mode, we can check the file paths as well
+    if (!columnMappingEnabled) {
+      assert(inputFiles.forall(path => path.contains(s"$physicalName=")),
+          s"${inputFiles.toSeq.mkString("\n")}\ndidn't contain partition 
columns $physicalName")
+    }
+  }
+
+  /**
+   * Load Deltalog from path
+   * @param pathOrIdentifier Location
+   * @param isIdentifier Whether the previous argument is a metastore 
identifier
+   * @return
+   */
+  protected def loadDeltaLog(pathOrIdentifier: String, isIdentifier: Boolean = 
false): DeltaLog = {
+    if (isIdentifier) {
+      DeltaLog.forTable(spark, TableIdentifier(pathOrIdentifier))
+    } else {
+      DeltaLog.forTable(spark, pathOrIdentifier)
+    }
+  }
+
+  /**
+   * Convert a (nested) column string to sequence of name parts
+   * @param col Column string
+   * @return Sequence of parts
+   */
+  protected def columnNameToParts(col: String): Seq[String] = {
+    UnresolvedAttribute.parseAttributeName(col)
+  }
+
+  /**
+   * Get partition file paths for a specific partition value
+   * @param partCol Logical or physical partition name
+   * @param partValue Partition value
+   * @param deltaLog DeltaLog
+   * @return List of paths
+   */
+  protected def getPartitionFilePathsWithValue(
+      partCol: String,
+      partValue: String,
+      deltaLog: DeltaLog): Array[String] = {
+    getPartitionFilePaths(partCol, deltaLog).getOrElse(partValue, Array.empty)
+  }
+
+  /**
+   * Get the partition value for null
+   */
+  protected def nullPartitionValue: String = {
+    if (columnMappingEnabled) {
+      null
+    } else {
+      ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+    }
+  }
+
+  /**
+   * Get partition file paths grouped by partition value
+   * @param partCol Logical or physical partition name
+   * @param deltaLog DeltaLog
+   * @return Partition value to paths
+   */
+  protected def getPartitionFilePaths(
+      partCol: String,
+      deltaLog: DeltaLog): Map[String, Array[String]] = {
+    if (columnMappingEnabled) {
+      val colName = partCol.phy(deltaLog)
+      deltaLog.update().allFiles.collect()
+        .groupBy(_.partitionValues(colName))
+        .mapValues(_.map(deltaLog.dataPath.toUri.getPath + "/" + _.path)).toMap
+    } else {
+      val partColEscaped = s"${ExternalCatalogUtils.escapePathName(partCol)}"
+      val dataPath = new File(deltaLog.dataPath.toUri.getPath)
+      dataPath.listFiles().filter(_.getName.startsWith(s"$partColEscaped="))
+        .groupBy(_.getName.split("=").last).mapValues(_.map(_.getPath)).toMap
+    }
+  }
+
+  /**
+   * Group a list of input file paths by partition key-value pair w.r.t. delta 
log
+   * @param inputFiles Input file paths
+   * @param deltaLog Delta log
+   * @return A mapped array each with the corresponding partition keys
+   */
+  protected def groupInputFilesByPartition(
+      inputFiles: Array[String],
+      deltaLog: DeltaLog): Map[(String, String), Array[String]] = {
+    if (columnMappingEnabled) {
+      val allFiles = deltaLog.update().allFiles.collect()
+      val grouped = inputFiles.flatMap { f =>
+        allFiles.find {
+          af => f.contains(af.toPath.toString)
+        }.head.partitionValues.map(entry => (f, entry))
+      }.groupBy(_._2)
+      grouped.mapValues(_.map(_._1)).toMap
+    } else {
+      inputFiles.groupBy(p => {
+        val nameParts = new Path(p).getParent.getName.split("=")
+        (nameParts(0), nameParts(1))
+      })
+    }
+  }
+
+  /**
+   * Drop column mapping configurations from Map
+   * @param configuration Table configuration
+   * @return Configuration
+   */
+  protected def dropColumnMappingConfigurations(
+      configuration: Map[String, String]): Map[String, String] = {
+    configuration - DeltaConfigs.COLUMN_MAPPING_MODE.key - 
DeltaConfigs.COLUMN_MAPPING_MAX_ID.key
+  }
+
+  /**
+   * Drop column mapping configurations from Dataset (e.g. sql("SHOW 
TBLPROPERTIES t1")
+   * @param configs Table configuration
+   * @return Configuration Dataset
+   */
+  protected def dropColumnMappingConfigurations(
+      configs: Dataset[(String, String)]): Dataset[(String, String)] = {
+    spark.createDataset(configs.collect().filter(p =>
+      !Seq(
+        DeltaConfigs.COLUMN_MAPPING_MAX_ID.key,
+        DeltaConfigs.COLUMN_MAPPING_MODE.key
+      ).contains(p._1)
+    ))
+  }
+
+  /** Return KV pairs of Protocol-related stuff for checking the result of 
DESCRIBE TABLE. */
+  protected def buildProtocolProps(snapshot: Snapshot): Seq[(String, String)] 
= {
+    val mergedConf =
+      DeltaConfigs.mergeGlobalConfigs(spark.sessionState.conf, 
snapshot.metadata.configuration)
+    val metadata = snapshot.metadata.copy(configuration = mergedConf)
+    var props = Seq(
+      (Protocol.MIN_READER_VERSION_PROP,
+        Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString),
+      (Protocol.MIN_WRITER_VERSION_PROP,
+        Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString))
+    if (snapshot.protocol.supportsReaderFeatures || 
snapshot.protocol.supportsWriterFeatures) {
+      props ++=
+        Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures(
+          spark, metadata, snapshot.protocol)
+          ._3
+          .map(f => (
+            s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}${f.name}",
+            TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED))
+    }
+    props
+  }
+
+  /**
+   * Convert (nested) column name string into physical name with reference 
from DeltaLog
+   * If target field does not have physical name, display name is returned
+   * @param col Logical column name
+   * @param deltaLog Reference DeltaLog
+   * @return Physical column name
+   */
+  protected def getPhysicalName(col: String, deltaLog: DeltaLog): String = {
+    val nameParts = UnresolvedAttribute.parseAttributeName(col)
+    val realSchema = deltaLog.update().schema
+    getPhysicalName(nameParts, realSchema)
+  }
+
+  protected def getPhysicalName(col: String, schema: StructType): String = {
+    val nameParts = UnresolvedAttribute.parseAttributeName(col)
+    getPhysicalName(nameParts, schema)
+  }
+
+  protected def getPhysicalName(nameParts: Seq[String], schema: StructType): 
String = {
+    SchemaUtils.findNestedFieldIgnoreCase(schema, nameParts, 
includeCollections = true)
+      .map(DeltaColumnMapping.getPhysicalName)
+      .get
+  }
+
+  protected def withColumnMappingConf(mode: String)(f: => Any): Any = {
+    withSQLConf(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey -> 
mode) {
+      f
+    }
+  }
+
+  protected def withMaxColumnIdConf(maxId: String)(f: => Any): Any = {
+    withSQLConf(DeltaConfigs.COLUMN_MAPPING_MAX_ID.defaultTablePropertyKey -> 
maxId) {
+      f
+    }
+  }
+
+  /**
+   * Gets the physical names of a path. This is used for converting column 
paths in stats schema,
+   * so it's ok to not support MapType and ArrayType.
+   */
+  def getPhysicalPathForStats(path: Seq[String], schema: StructType): 
Option[Seq[String]] = {
+    if (path.isEmpty) return Some(Seq.empty)
+    val field = schema.fields.find(_.name.equalsIgnoreCase(path.head))
+    field match {
+      case Some(f @ StructField(_, _: AtomicType, _, _ )) =>
+        if (path.size == 1) Some(Seq(DeltaColumnMapping.getPhysicalName(f))) 
else None
+      case Some(f @ StructField(_, st: StructType, _, _)) =>
+        val tail = getPhysicalPathForStats(path.tail, st)
+        tail.map(DeltaColumnMapping.getPhysicalName(f) +: _)
+      case _ =>
+        None
+    }
+  }
+
+   /**
+   * Convert (nested) column name string into physical name.
+   * Ignore parts of special paths starting with:
+   *  1. stats columns: minValues, maxValues, numRecords
+   *  2. stats df: stats_parsed
+   *  3. partition values: partitionValues_parsed, partitionValues
+   * @param col Logical column name (e.g. a.b.c)
+   * @param schema Reference schema with metadata
+   * @return Unresolved attribute with physical name paths
+   */
+  protected def convertColumnNameToAttributeWithPhysicalName(
+      col: String,
+      schema: StructType): UnresolvedAttribute = {
+    val parts = UnresolvedAttribute.parseAttributeName(col)
+    val shouldIgnoreFirstPart = Set(
+      "minValues",
+      "maxValues",
+      "numRecords",
+      Checkpoints.STRUCT_PARTITIONS_COL_NAME,
+      "partitionValues")
+    val shouldIgnoreSecondPart = Set(Checkpoints.STRUCT_STATS_COL_NAME, 
"stats")
+    val physical = if (shouldIgnoreFirstPart.contains(parts.head)) {
+      parts.head +: getPhysicalPathForStats(parts.tail, 
schema).getOrElse(parts.tail)
+    } else if (shouldIgnoreSecondPart.contains(parts.head)) {
+      parts.take(2) ++ getPhysicalPathForStats(parts.slice(2, parts.length), 
schema)
+          .getOrElse(parts.slice(2, parts.length))
+    } else {
+      getPhysicalPathForStats(parts, schema).getOrElse(parts)
+    }
+    UnresolvedAttribute(physical)
+  }
+
+  /**
+   * Convert a list of (nested) stats columns into physical name with 
reference from DeltaLog
+   * @param columns Logical columns
+   * @param deltaLog Reference DeltaLog
+   * @return Physical columns
+   */
+  protected def convertToPhysicalColumns(
+      columns: Seq[Column],
+      deltaLog: DeltaLog): Seq[Column] = {
+    val schema = deltaLog.update().schema
+    columns.map { col =>
+      val newExpr = col.expr.transform {
+        case a: Attribute =>
+          convertColumnNameToAttributeWithPhysicalName(a.name, schema)
+      }
+      Column(newExpr)
+    }
+  }
+
+  /**
+   * Standard CONVERT TO DELTA
+   * @param tableOrPath String
+   */
+  protected def convertToDelta(tableOrPath: String): Unit = {
+    sql(s"CONVERT TO DELTA $tableOrPath")
+  }
+
+  /**
+   * Force enable streaming read (with possible data loss) on column mapping 
enabled table with
+   * drop / rename schema changes.
+   */
+  protected def withStreamingReadOnColumnMappingTableEnabled(f: => Unit): Unit 
= {
+    if (columnMappingEnabled) {
+      withSQLConf(DeltaSQLConf
+        
.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES.key 
-> "true") {
+        f
+      }
+    } else {
+      f
+    }
+  }
+
+}
+
+trait DeltaColumnMappingTestUtils extends DeltaColumnMappingTestUtilsBase
+
+/**
+ * Include this trait to enable Id column mapping mode for a suite
+ */
+trait DeltaColumnMappingEnableIdMode extends SharedSparkSession
+  with DeltaColumnMappingTestUtils
+  with DeltaColumnMappingSelectedTestMixin {
+
+  protected override def columnMappingMode: String = IdMapping.name
+
+  protected override def sparkConf: SparkConf =
+    
super.sparkConf.set(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, 
"id")
+
+  /**
+   * CONVERT TO DELTA blocked in id mode
+   */
+  protected override def convertToDelta(tableOrPath: String): Unit =
+    throw DeltaErrors.convertToDeltaWithColumnMappingNotSupported(
+      DeltaColumnMappingMode(columnMappingModeString)
+    )
+}
+
+/**
+ * Include this trait to enable Name column mapping mode for a suite
+ */
+trait DeltaColumnMappingEnableNameMode extends SharedSparkSession
+  with DeltaColumnMappingTestUtils
+  with DeltaColumnMappingSelectedTestMixin {
+
+  protected override def columnMappingMode: String = NameMapping.name
+
+  protected override def sparkConf: SparkConf =
+    
super.sparkConf.set(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, 
columnMappingMode)
+
+  /**
+   * CONVERT TO DELTA can be possible under name mode in tests
+   */
+  protected override def convertToDelta(tableOrPath: String): Unit = {
+    withColumnMappingConf("none") {
+      super.convertToDelta(tableOrPath)
+    }
+
+    val (deltaPath, deltaLog) =
+      if (tableOrPath.contains("parquet") && tableOrPath.contains("`")) {
+        // parquet.`PATH`
+        val plainPath = tableOrPath.split('.').last.drop(1).dropRight(1)
+        (s"delta.`$plainPath`", DeltaLog.forTable(spark, plainPath))
+      } else {
+        (tableOrPath, DeltaLog.forTable(spark, TableIdentifier(tableOrPath)))
+      }
+
+    val tableReaderVersion = 
deltaLog.unsafeVolatileSnapshot.protocol.minReaderVersion
+    val tableWriterVersion = 
deltaLog.unsafeVolatileSnapshot.protocol.minWriterVersion
+    val requiredReaderVersion = if (tableWriterVersion >=
+      TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) {
+      // If the writer version of the table supports table features, we need to
+      // bump the reader version to table features to enable column mapping.
+      TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION
+    } else {
+      ColumnMappingTableFeature.minReaderVersion
+    }
+    val readerVersion = 
spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION).max(
+      requiredReaderVersion)
+    val writerVersion = 
spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION).max(
+      ColumnMappingTableFeature.minWriterVersion)
+
+    val properties = mutable.ListBuffer(DeltaConfigs.COLUMN_MAPPING_MODE.key 
-> "name")
+    if (tableReaderVersion < readerVersion) {
+      properties += DeltaConfigs.MIN_READER_VERSION.key -> 
readerVersion.toString
+    }
+    if (tableWriterVersion < writerVersion) {
+      properties += DeltaConfigs.MIN_WRITER_VERSION.key -> 
writerVersion.toString
+    }
+    val propertiesStr = properties.map(kv => s"'${kv._1}' = 
'${kv._2}'").mkString(", ")
+    sql(s"ALTER TABLE $deltaPath SET TBLPROPERTIES ($propertiesStr)")
+  }
+
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
new file mode 100644
index 0000000000..4e7326c8c1
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.{SparkContext, SparkFunSuite, SparkThrowable}
+import org.apache.spark.scheduler.{JobFailed, SparkListener, 
SparkListenerJobEnd, SparkListenerJobStart}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode}
+import org.apache.spark.sql.delta.DeltaTestUtils.Plans
+import org.apache.spark.sql.delta.actions._
+import org.apache.spark.sql.delta.commands.cdc.CDCReader
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
+import org.apache.spark.sql.delta.util.FileNames
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.QueryExecutionListener
+import org.apache.spark.util.Utils
+
+import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord}
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import io.delta.tables.{DeltaTable => IODeltaTable}
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.scalatest.BeforeAndAfterEach
+
+import java.io.{BufferedReader, File, InputStreamReader}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Locale
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+// spotless:off
+trait DeltaTestUtilsBase {
+  import DeltaTestUtils.TableIdentifierOrPath
+
+  final val BOOLEAN_DOMAIN: Seq[Boolean] = Seq(true, false)
+
+  class PlanCapturingListener() extends QueryExecutionListener {
+
+    private[this] var capturedPlans = List.empty[Plans]
+
+    def plans: Seq[Plans] = capturedPlans.reverse
+
+    override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+      capturedPlans ::= Plans(
+          qe.analyzed,
+          qe.optimizedPlan,
+          qe.sparkPlan,
+          qe.executedPlan)
+    }
+
+    override def onFailure(
+      funcName: String, qe: QueryExecution, error: Exception): Unit = {}
+  }
+
+  /**
+   * Run a thunk with physical plans for all queries captured and passed into 
a provided buffer.
+   */
+  def withLogicalPlansCaptured[T](
+      spark: SparkSession,
+      optimizedPlan: Boolean)(
+      thunk: => Unit): Seq[LogicalPlan] = {
+    val planCapturingListener = new PlanCapturingListener
+
+    spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+    spark.listenerManager.register(planCapturingListener)
+    try {
+      thunk
+      spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+      planCapturingListener.plans.map { plans =>
+        if (optimizedPlan) plans.optimized else plans.analyzed
+      }
+    } finally {
+      spark.listenerManager.unregister(planCapturingListener)
+    }
+  }
+
+  /**
+   * Run a thunk with physical plans for all queries captured and passed into 
a provided buffer.
+   */
+  def withPhysicalPlansCaptured[T](
+      spark: SparkSession)(
+      thunk: => Unit): Seq[SparkPlan] = {
+    val planCapturingListener = new PlanCapturingListener
+
+    spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+    spark.listenerManager.register(planCapturingListener)
+    try {
+      thunk
+      spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+      planCapturingListener.plans.map(_.sparkPlan)
+    } finally {
+      spark.listenerManager.unregister(planCapturingListener)
+    }
+  }
+
+  /**
+   * Run a thunk with logical and physical plans for all queries captured and 
passed
+   * into a provided buffer.
+   */
+  def withAllPlansCaptured[T](
+      spark: SparkSession)(
+      thunk: => Unit): Seq[Plans] = {
+    val planCapturingListener = new PlanCapturingListener
+
+    spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+    spark.listenerManager.register(planCapturingListener)
+    try {
+      thunk
+      spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+      planCapturingListener.plans
+    } finally {
+      spark.listenerManager.unregister(planCapturingListener)
+    }
+  }
+
+  def countSparkJobs(sc: SparkContext, f: => Unit): Int = {
+    val jobs: concurrent.Map[Int, Long] = new ConcurrentHashMap[Int, 
Long]().asScala
+    val listener = new SparkListener {
+      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+        jobs.put(jobStart.jobId, jobStart.stageInfos.map(_.numTasks).sum)
+      }
+      override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = 
jobEnd.jobResult match {
+        case JobFailed(_) => jobs.remove(jobEnd.jobId)
+        case _ => // On success, do nothing.
+      }
+    }
+    sc.addSparkListener(listener)
+    try {
+      sc.listenerBus.waitUntilEmpty(15000)
+      f
+      sc.listenerBus.waitUntilEmpty(15000)
+    } finally {
+      sc.removeSparkListener(listener)
+    }
+    // Spark will always log a job start/end event even when the job does not 
launch any task.
+    jobs.values.count(_ > 0)
+  }
+
+  /** Filter `usageRecords` by the `opType` tag or field. */
+  def filterUsageRecords(usageRecords: Seq[UsageRecord], opType: String): 
Seq[UsageRecord] = {
+    usageRecords.filter { r =>
+      r.tags.get("opType").contains(opType) || 
r.opType.map(_.typeName).contains(opType)
+    }
+  }
+
+  def collectUsageLogs(opType: String)(f: => Unit): 
collection.Seq[UsageRecord] = {
+    Log4jUsageLogger.track(f).filter { r =>
+      r.metric == "tahoeEvent" &&
+        r.tags.get("opType").contains(opType)
+    }
+  }
+
+  /**
+   * Remove protocol and metadata fields from checksum file of json format
+   */
+  def removeProtocolAndMetadataFromChecksumFile(checksumFilePath : Path): Unit 
= {
+    // scalastyle:off deltahadoopconfiguration
+    val fs = checksumFilePath.getFileSystem(
+      SparkSession.getActiveSession.map(_.sessionState.newHadoopConf()).get
+    )
+    // scalastyle:on deltahadoopconfiguration
+    if (!fs.exists(checksumFilePath)) return
+    val stream = fs.open(checksumFilePath)
+    val reader = new BufferedReader(new InputStreamReader(stream, UTF_8))
+    val content = reader.readLine()
+    stream.close()
+    val mapper = new ObjectMapper()
+    mapper.registerModule(DefaultScalaModule)
+    val map = mapper.readValue(content, classOf[Map[String, String]])
+    val partialContent = 
mapper.writeValueAsString(map.-("protocol").-("metadata")) + "\n"
+    val output = fs.create(checksumFilePath, true)
+    output.write(partialContent.getBytes(UTF_8))
+    output.close()
+  }
+
+  protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
+    // The expected plan for touched file computation is of the format below.
+    // The data column should be pruned from both leaves.
+    // HashAggregate(output=[count#3463L])
+    // +- HashAggregate(output=[count#3466L])
+    //   +- Project
+    //      +- Filter (isnotnull(count#3454L) AND (count#3454L > 1))
+    //         +- HashAggregate(output=[count#3454L])
+    //            +- HashAggregate(output=[_row_id_#3418L, sum#3468L])
+    //               +- Project [_row_id_#3418L, UDF(_file_name_#3422) AS 
one#3448]
+    //                  +- BroadcastHashJoin [id#3342L], [id#3412L], Inner, 
BuildLeft
+    //                     :- Project [id#3342L]
+    //                     :  +- Filter isnotnull(id#3342L)
+    //                     :     +- FileScan parquet [id#3342L,part#3343L]
+    //                     +- Filter isnotnull(id#3412L)
+    //                        +- Project [...]
+    //                           +- Project [...]
+    //                             +- FileScan parquet [id#3412L,part#3413L]
+    // Note: It can be RDDScanExec instead of FileScan if the source was 
materialized.
+    // We pick the first plan starting from FileScan and ending in 
HashAggregate as a
+    // stable heuristic for the one we want.
+    plans.map(_.executedPlan)
+      .filter {
+        case WholeStageCodegenExec(hash: HashAggregateExec) =>
+          hash.collectLeaves().size == 2 &&
+            hash.collectLeaves()
+              .forall { s =>
+                s.isInstanceOf[FileSourceScanExec] ||
+                  s.isInstanceOf[RDDScanExec]
+              }
+        case _ => false
+      }.head
+  }
+
+  /**
+   * Separate name- from path-based SQL table identifiers.
+   */
+  def getTableIdentifierOrPath(sqlIdentifier: String): TableIdentifierOrPath = 
{
+    // Match: delta.`path`[[ as] alias] or tahoe.`path`[[ as] alias]
+    val pathMatcher: Regex = raw"(?:delta|tahoe)\.`([^`]+)`(?:(?: as)? 
(.+))?".r
+    // Match: db.table[[ as] alias]
+    val qualifiedDbMatcher: Regex = raw"`?([^\.` ]+)`?\.`?([^\.` ]+)`?(?:(?: 
as)? (.+))?".r
+    // Match: table[[ as] alias]
+    val unqualifiedNameMatcher: Regex = raw"([^ ]+)(?:(?: as)? (.+))?".r
+    sqlIdentifier match {
+      case pathMatcher(path, alias) =>
+        TableIdentifierOrPath.Path(path, Option(alias))
+      case qualifiedDbMatcher(dbName, tableName, alias) =>
+        TableIdentifierOrPath.Identifier(TableIdentifier(tableName, 
Some(dbName)), Option(alias))
+      case unqualifiedNameMatcher(tableName, alias) =>
+        TableIdentifierOrPath.Identifier(TableIdentifier(tableName), 
Option(alias))
+    }
+  }
+
+  /**
+   * Produce a DeltaTable instance given a `TableIdentifierOrPath` instance.
+   */
+  def getDeltaTableForIdentifierOrPath(
+      spark: SparkSession,
+      identifierOrPath: TableIdentifierOrPath): IODeltaTable = {
+    identifierOrPath match {
+      case TableIdentifierOrPath.Identifier(id, optionalAlias) =>
+        val table = IODeltaTable.forName(spark, id.unquotedString)
+        optionalAlias.map(table.as(_)).getOrElse(table)
+      case TableIdentifierOrPath.Path(path, optionalAlias) =>
+        val table = IODeltaTable.forPath(spark, path)
+        optionalAlias.map(table.as(_)).getOrElse(table)
+    }
+  }
+
+  @deprecated("Use checkError() instead")
+  protected def errorContains(errMsg: String, str: String): Unit = {
+    
assert(errMsg.toLowerCase(Locale.ROOT).contains(str.toLowerCase(Locale.ROOT)))
+  }
+
+  /**
+   * Helper types to define the expected result of a test case.
+   * Either:
+   * - Success: include an expected value to check, e.g. expected schema or 
result as a DF or rows.
+   * - Failure: an exception is thrown and the caller passes a function to 
check that it matches an
+   *     expected error, typ. `checkError()` or `checkErrorMatchPVals()`.
+   */
+  sealed trait ExpectedResult[-T]
+  object ExpectedResult {
+    case class Success[T](expected: T) extends ExpectedResult[T]
+    case class Failure[T](checkError: SparkThrowable => Unit) extends 
ExpectedResult[T]
+  }
+
+  /** Utility method to check exception `e` is of type `E` or a cause of it is 
of type `E` */
+  def findIfResponsible[E <: Throwable: ClassTag](e: Throwable): Option[E] = e 
match {
+    case culprit: E => Some(culprit)
+    case _ =>
+      val children = Option(e.getCause).iterator ++ e.getSuppressed.iterator
+      children
+        .map(findIfResponsible[E](_))
+        .collectFirst { case Some(culprit) => culprit }
+  }
+
+  def verifyBackfilled(file: FileStatus): Unit = {
+    val unbackfilled = 
file.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString)
+    assert(!unbackfilled, s"File $file was not backfilled")
+  }
+
+  def verifyUnbackfilled(file: FileStatus): Unit = {
+    val unbackfilled = 
file.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString)
+    assert(unbackfilled, s"File $file was backfilled")
+  }
+}
+
+trait DeltaCheckpointTestUtils
+  extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession =>
+
+  def testDifferentCheckpoints(testName: String, quiet: Boolean = false)
+      (f: (CheckpointPolicy.Policy, Option[V2Checkpoint.Format]) => Unit): 
Unit = {
+    test(s"$testName [Checkpoint V1]") {
+      def testFunc(): Unit = {
+        withSQLConf(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey ->
+          CheckpointPolicy.Classic.name) {
+          f(CheckpointPolicy.Classic, None)
+        }
+      }
+      if (quiet) quietly { testFunc() } else testFunc()
+    }
+    for (checkpointFormat <- V2Checkpoint.Format.ALL)
+    test(s"$testName [Checkpoint V2, format: ${checkpointFormat.name}]") {
+      def testFunc(): Unit = {
+        withSQLConf(
+          DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey -> 
CheckpointPolicy.V2.name,
+          DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> 
checkpointFormat.name
+        ) {
+          f(CheckpointPolicy.V2, Some(checkpointFormat))
+        }
+      }
+      if (quiet) quietly { testFunc() } else testFunc()
+    }
+  }
+
+  /**
+   * Helper method to get the dataframe corresponding to the files which has 
the file actions for a
+   * given checkpoint.
+   */
+  def getCheckpointDfForFilesContainingFileActions(
+      log: DeltaLog,
+      checkpointFile: Path): DataFrame = {
+    val ci = CheckpointInstance.apply(checkpointFile)
+    val allCheckpointFiles = log
+        .listFrom(ci.version)
+        .filter(FileNames.isCheckpointFile)
+        .filter(f => CheckpointInstance(f.getPath) == ci)
+        .toSeq
+    val fileActionsFileIndex = ci.format match {
+      case CheckpointInstance.Format.V2 =>
+        val incompleteCheckpointProvider = ci.getCheckpointProvider(log, 
allCheckpointFiles)
+        val df = 
log.loadIndex(incompleteCheckpointProvider.topLevelFileIndex.get, 
Action.logSchema)
+        val sidecarFileStatuses = 
df.as[SingleAction].collect().map(_.unwrap).collect {
+          case sf: SidecarFile => sf
+        }.map(sf => sf.toFileStatus(log.logPath))
+        DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, 
sidecarFileStatuses)
+      case CheckpointInstance.Format.SINGLE | 
CheckpointInstance.Format.WITH_PARTS =>
+        DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET,
+          allCheckpointFiles.toArray)
+      case _ =>
+        throw new Exception(s"Unexpected checkpoint format for file 
$checkpointFile")
+    }
+    fileActionsFileIndex.files
+      .map(fileStatus => spark.read.parquet(fileStatus.getPath.toString))
+      .reduce(_.union(_))
+  }
+}
+
+object DeltaTestUtils extends DeltaTestUtilsBase {
+
+  sealed trait TableIdentifierOrPath
+  object TableIdentifierOrPath {
+    case class Identifier(id: TableIdentifier, alias: Option[String])
+      extends TableIdentifierOrPath
+    case class Path(path: String, alias: Option[String]) extends 
TableIdentifierOrPath
+  }
+
+  case class Plans(
+      analyzed: LogicalPlan,
+      optimized: LogicalPlan,
+      sparkPlan: SparkPlan,
+      executedPlan: SparkPlan)
+
+  /**
+   * Creates an AddFile that can be used for tests where the exact parameters 
do not matter.
+   */
+  def createTestAddFile(
+      encodedPath: String = "foo",
+      partitionValues: Map[String, String] = Map.empty,
+      size: Long = 1L,
+      modificationTime: Long = 1L,
+      dataChange: Boolean = true,
+      stats: String = "{\"numRecords\": 1}"): AddFile = {
+    AddFile(encodedPath, partitionValues, size, modificationTime, dataChange, 
stats)
+  }
+
+  /**
+   * Extracts the table name and alias (if any) from the given string. 
Correctly handles whitespaces
+   * in table name but doesn't support whitespaces in alias.
+   */
+  def parseTableAndAlias(table: String): (String, Option[String]) = {
+    // Matches 'delta.`path` AS alias' (case insensitive).
+    val deltaPathWithAsAlias = raw"(?i)(delta\.`.+`)(?: AS) (\S+)".r
+    // Matches 'delta.`path` alias'.
+    val deltaPathWithAlias = raw"(delta\.`.+`) (\S+)".r
+    // Matches 'delta.`path`'.
+    val deltaPath = raw"(delta\.`.+`)".r
+    // Matches 'tableName AS alias' (case insensitive).
+    val tableNameWithAsAlias = raw"(?i)(.+)(?: AS) (\S+)".r
+    // Matches 'tableName alias'.
+    val tableNameWithAlias = raw"(.+) (.+)".r
+
+    table match {
+      case deltaPathWithAsAlias(tableName, alias) => tableName -> Some(alias)
+      case deltaPathWithAlias(tableName, alias) => tableName -> Some(alias)
+      case deltaPath(tableName) => tableName -> None
+      case tableNameWithAsAlias(tableName, alias) => tableName -> Some(alias)
+      case tableNameWithAlias(tableName, alias) => tableName -> Some(alias)
+      case tableName => tableName -> None
+    }
+  }
+
+  /**
+   * Implements an ordering where `x < y` iff both reader and writer versions 
of
+   * `x` are strictly less than those of `y`.
+   *
+   * Can be used to conveniently check that this relationship holds in 
tests/assertions
+   * without having to write out the conjunction of the two subconditions 
every time.
+   */
+  case object StrictProtocolOrdering extends PartialOrdering[Protocol] {
+    override def tryCompare(x: Protocol, y: Protocol): Option[Int] = {
+      if (x.minReaderVersion == y.minReaderVersion &&
+        x.minWriterVersion == y.minWriterVersion) {
+        Some(0)
+      } else if (x.minReaderVersion < y.minReaderVersion &&
+        x.minWriterVersion < y.minWriterVersion) {
+        Some(-1)
+      } else if (x.minReaderVersion > y.minReaderVersion &&
+        x.minWriterVersion > y.minWriterVersion) {
+        Some(1)
+      } else {
+        None
+      }
+    }
+
+    override def lteq(x: Protocol, y: Protocol): Boolean =
+      x.minReaderVersion <= y.minReaderVersion && x.minWriterVersion <= 
y.minWriterVersion
+
+    // Just a more readable version of `lteq`.
+    def fulfillsVersionRequirements(actual: Protocol, requirement: Protocol): 
Boolean =
+      lteq(requirement, actual)
+  }
+}
+
+trait DeltaTestUtilsForTempViews
+  extends SharedSparkSession
+  with DeltaTestUtilsBase {
+
+  def testWithTempView(testName: String)(testFun: Boolean => Any): Unit = {
+    Seq(true, false).foreach { isSQLTempView =>
+      val tempViewUsed = if (isSQLTempView) "SQL TempView" else "Dataset 
TempView"
+      test(s"$testName - $tempViewUsed") {
+        withTempView("v") {
+          testFun(isSQLTempView)
+        }
+      }
+    }
+  }
+
+  def testQuietlyWithTempView(testName: String)(testFun: Boolean => Any): Unit 
= {
+    Seq(true, false).foreach { isSQLTempView =>
+      val tempViewUsed = if (isSQLTempView) "SQL TempView" else "Dataset 
TempView"
+      testQuietly(s"$testName - $tempViewUsed") {
+        withTempView("v") {
+          testFun(isSQLTempView)
+        }
+      }
+    }
+  }
+
+  def createTempViewFromTable(
+      tableName: String,
+      isSQLTempView: Boolean,
+      format: Option[String] = None): Unit = {
+    if (isSQLTempView) {
+      sql(s"CREATE OR REPLACE TEMP VIEW v AS SELECT * from $tableName")
+    } else {
+      
spark.read.format(format.getOrElse("delta")).table(tableName).createOrReplaceTempView("v")
+    }
+  }
+
+  def createTempViewFromSelect(text: String, isSQLTempView: Boolean): Unit = {
+    if (isSQLTempView) {
+      sql(s"CREATE OR REPLACE TEMP VIEW v AS $text")
+    } else {
+      sql(text).createOrReplaceTempView("v")
+    }
+  }
+
+  def testErrorMessageAndClass(
+      isSQLTempView: Boolean,
+      ex: AnalysisException,
+      expectedErrorMsgForSQLTempView: String = null,
+      expectedErrorMsgForDataSetTempView: String = null,
+      expectedErrorClassForSQLTempView: String = null,
+      expectedErrorClassForDataSetTempView: String = null): Unit = {
+    if (isSQLTempView) {
+      if (expectedErrorMsgForSQLTempView != null) {
+        errorContains(ex.getMessage, expectedErrorMsgForSQLTempView)
+      }
+      if (expectedErrorClassForSQLTempView != null) {
+        assert(ex.getErrorClass == expectedErrorClassForSQLTempView)
+      }
+    } else {
+      if (expectedErrorMsgForDataSetTempView != null) {
+        errorContains(ex.getMessage, expectedErrorMsgForDataSetTempView)
+      }
+      if (expectedErrorClassForDataSetTempView != null) {
+        assert(ex.getErrorClass == expectedErrorClassForDataSetTempView, 
ex.getMessage)
+      }
+    }
+  }
+}
+
+/**
+ * Trait collecting helper methods for DML tests e.p. creating a test table 
for each test and
+ * cleaning it up after each test.
+ */
+trait DeltaDMLTestUtils
+  extends DeltaSQLTestUtils
+  with DeltaTestUtilsBase
+  with BeforeAndAfterEach {
+  self: SharedSparkSession =>
+
+  import testImplicits._
+
+  protected var tempDir: File = _
+
+  protected var deltaLog: DeltaLog = _
+
+  protected def tempPath: String = tempDir.getCanonicalPath
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    // Using a space in path to provide coverage for special characters.
+    tempDir = Utils.createTempDir(namePrefix = "spark test")
+    deltaLog = DeltaLog.forTable(spark, new Path(tempPath))
+  }
+
+  override protected def afterEach(): Unit = {
+    try {
+      Utils.deleteRecursively(tempDir)
+      DeltaLog.clearCache()
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  protected def append(df: DataFrame, partitionBy: Seq[String] = Nil): Unit = {
+    val dfw = df.write.format("delta").mode("append")
+    if (partitionBy.nonEmpty) {
+      dfw.partitionBy(partitionBy: _*)
+    }
+    dfw.save(tempPath)
+  }
+
+  protected def withKeyValueData(
+      source: Seq[(Int, Int)],
+      target: Seq[(Int, Int)],
+      isKeyPartitioned: Boolean = false,
+      sourceKeyValueNames: (String, String) = ("key", "value"),
+      targetKeyValueNames: (String, String) = ("key", "value"))(
+      thunk: (String, String) => Unit = null): Unit = {
+
+    import testImplicits._
+
+    append(target.toDF(targetKeyValueNames._1, 
targetKeyValueNames._2).coalesce(2),
+      if (isKeyPartitioned) Seq(targetKeyValueNames._1) else Nil)
+    withTempView("source") {
+      source.toDF(sourceKeyValueNames._1, 
sourceKeyValueNames._2).createOrReplaceTempView("source")
+      thunk("source", s"delta.`$tempPath`")
+    }
+  }
+
+  /**
+   * Parse the input JSON data into a dataframe, one row per input element.
+   * Throws an exception on malformed inputs or records that don't comply with 
the provided schema.
+   */
+  protected def readFromJSON(data: Seq[String], schema: StructType = null): 
DataFrame = {
+    if (schema != null) {
+      spark.read
+        .schema(schema)
+        .option("mode", FailFastMode.name)
+        .json(data.toDS)
+    } else {
+      spark.read
+        .option("mode", FailFastMode.name)
+        .json(data.toDS)
+    }
+  }
+
+  protected def readDeltaTable(path: String): DataFrame = {
+    spark.read.format("delta").load(path)
+  }
+
+  protected def getDeltaFileStmt(path: String): String = s"SELECT * FROM 
delta.`$path`"
+
+  /**
+   * Finds the latest operation of the given type that ran on the test table 
and returns the
+   * dataframe with the changes of the corresponding table version.
+   *
+   * @param operation Delta operation name, see [[DeltaOperations]].
+   */
+  protected def getCDCForLatestOperation(deltaLog: DeltaLog, operation: 
String): DataFrame = {
+    val latestOperation = deltaLog.history
+      .getHistory(None)
+      .find(_.operation == operation)
+    assert(latestOperation.nonEmpty, s"Couldn't find a ${operation} operation 
to check CDF")
+
+    val latestOperationVersion = latestOperation.get.version
+    assert(latestOperationVersion.nonEmpty,
+      s"Latest ${operation} operation doesn't have a version associated with 
it")
+
+    CDCReader
+      .changesToBatchDF(
+        deltaLog,
+        latestOperationVersion.get,
+        latestOperationVersion.get,
+        spark)
+      .drop(CDCReader.CDC_COMMIT_TIMESTAMP)
+      .drop(CDCReader.CDC_COMMIT_VERSION)
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala
new file mode 100644
index 0000000000..135dd97bfa
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.delta.DeltaColumnMappingTestUtils
+import org.apache.spark.sql.delta.DeltaConfigs
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+import org.scalatest.exceptions.TestFailedException
+
+import scala.collection.mutable
+
+// spotless:off
+/**
+ * A trait for selective enabling certain tests to run for column mapping modes
+ */
+trait DeltaColumnMappingSelectedTestMixin extends SparkFunSuite
+  with DeltaSQLTestUtils with DeltaColumnMappingTestUtils {
+
+  protected def runOnlyTests: Seq[String] = Seq()
+
+  /**
+   * If true, will run all tests.
+   * Requires that `runOnlyTests` is empty.
+   */
+  protected def runAllTests: Boolean = false
+
+  private val testsRun: mutable.Set[String] = mutable.Set.empty
+
+  override protected def test(
+      testName: String,
+      testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = {
+    require(!runAllTests || runOnlyTests.isEmpty,
+      "If `runAllTests` is true then `runOnlyTests` must be empty")
+
+    if (runAllTests || runOnlyTests.contains(testName)) {
+      super.test(s"$testName - column mapping $columnMappingMode mode", 
testTags: _*) {
+        testsRun.add(testName)
+        withSQLConf(
+          DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey -> 
columnMappingMode) {
+          testFun
+        }
+      }
+    } else {
+      super.ignore(s"$testName - ignored by 
DeltaColumnMappingSelectedTestMixin")(testFun)
+    }
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    val missingTests = runOnlyTests.toSet diff testsRun
+    if (missingTests.nonEmpty) {
+      throw new TestFailedException(
+        Some("Not all selected column mapping tests were run. Missing: " +
+          missingTests.mkString(", ")), None, 0)
+    }
+  }
+
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala
new file mode 100644
index 0000000000..b166697284
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.sql.QueryTest
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+// spotless:off
+trait DeltaExcludedTestMixin extends QueryTest {
+
+  /** Tests to be ignored by the runner. */
+  override def excluded: Seq[String] = Seq.empty
+
+  protected override def test(testName: String, testTags: Tag*)
+    (testFun: => Any)
+    (implicit pos: Position): Unit = {
+    if (excluded.contains(testName)) {
+      super.ignore(testName, testTags: _*)(testFun)
+    } else {
+      super.test(testName, testTags: _*)(testFun)
+    }
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
new file mode 100644
index 0000000000..3d94d2bde3
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.delta.catalog.DeltaCatalog
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.test.SharedSparkSession
+
+import io.delta.sql.DeltaSparkSessionExtension
+
+// spotless:off
+/**
+ * A trait for tests that are testing a fully set up SparkSession with all of 
Delta's requirements,
+ * such as the configuration of the DeltaCatalog and the addition of all Delta 
extensions.
+ */
+trait DeltaSQLCommandTest extends SharedSparkSession {
+
+  override protected def sparkConf: SparkConf = {
+    val conf = super.sparkConf
+
+    // Delta.
+    conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
+        classOf[DeltaSparkSessionExtension].getName)
+      .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
+        classOf[DeltaCatalog].getName)
+
+    // Gluten.
+    conf.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.default.parallelism", "1")
+      .set("spark.memory.offHeap.enabled", "true")
+      .set("spark.sql.shuffle.partitions", "1")
+      .set("spark.memory.offHeap.size", "2g")
+      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala
new file mode 100644
index 0000000000..22f4e9fa11
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+import java.io.File
+
+// spotless:off
+trait DeltaSQLTestUtils extends SQLTestUtils {
+  /**
+   * Override the temp dir/path creation methods from [[SQLTestUtils]] to:
+   * 1. Drop the call to `waitForTasksToFinish` which is a source of flakiness 
due to timeouts
+   *    without clear benefits.
+   * 2. Allow creating paths with special characters for better test coverage.
+   */
+
+  protected val defaultTempDirPrefix: String = "spark%dir%prefix"
+
+  override protected def withTempDir(f: File => Unit): Unit = {
+    withTempDir(prefix = defaultTempDirPrefix)(f)
+  }
+
+  override protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit): 
Unit = {
+    withTempPaths(numPaths, prefix = defaultTempDirPrefix)(f)
+  }
+
+  override def withTempPath(f: File => Unit): Unit = {
+    withTempPath(prefix = defaultTempDirPrefix)(f)
+  }
+
+  /**
+   * Creates a temporary directory, which is then passed to `f` and will be 
deleted after `f`
+   * returns.
+   */
+  def withTempDir(prefix: String)(f: File => Unit): Unit = {
+    val path = Utils.createTempDir(namePrefix = prefix)
+    try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  /**
+   * Generates a temporary directory path without creating the actual 
directory, which is then
+   * passed to `f` and will be deleted after `f` returns.
+   */
+  def withTempPath(prefix: String)(f: File => Unit): Unit = {
+    val path = Utils.createTempDir(namePrefix = prefix)
+    path.delete()
+    try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  /**
+   * Generates the specified number of temporary directory paths without 
creating the actual
+   * directories, which are then passed to `f` and will be deleted after `f` 
returns.
+   */
+  protected def withTempPaths(numPaths: Int, prefix: String)(f: Seq[File] => 
Unit): Unit = {
+    val files =
+      Seq.fill[File](numPaths)(Utils.createTempDir(namePrefix = 
prefix).getCanonicalFile)
+    files.foreach(_.delete())
+    try f(files) finally {
+      files.foreach(Utils.deleteRecursively)
+    }
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
new file mode 100644
index 0000000000..f2e7acc695
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.delta.{DeltaLog, OptimisticTransaction, Snapshot}
+import org.apache.spark.sql.delta.DeltaOperations.{ManualUpdate, Operation, 
Write}
+import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, Protocol}
+import org.apache.spark.sql.delta.catalog.DeltaTableV2
+import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
+import 
org.apache.spark.sql.delta.coordinatedcommits.TableCommitCoordinatorClient
+import org.apache.spark.sql.delta.hooks.AutoCompact
+import org.apache.spark.sql.delta.stats.StatisticsCollection
+import org.apache.spark.util.Clock
+
+import io.delta.storage.commit.{CommitResponse, GetCommitsResponse, 
UpdatedActions}
+import org.apache.hadoop.fs.Path
+
+import java.io.File
+
+// spotless:off
+/**
+ * Additional method definitions for Delta classes that are intended for use 
only in testing.
+ */
+object DeltaTestImplicits {
+  implicit class OptimisticTxnTestHelper(txn: OptimisticTransaction) {
+
+    /** Ensure that the initial commit of a Delta table always contains a 
Metadata action */
+    def commitActions(op: Operation, actions: Action*): Long = {
+      if (txn.readVersion == -1) {
+        val metadataOpt = actions.collectFirst { case m: Metadata => m }
+        val protocolOpt = actions.collectFirst { case p: Protocol => p }
+        val otherActions =
+          actions.filterNot(a => a.isInstanceOf[Metadata] || 
a.isInstanceOf[Protocol])
+        (metadataOpt, protocolOpt) match {
+          case (Some(metadata), Some(protocol)) =>
+            // When both metadata and protocol are explicitly passed, use them.
+            txn.updateProtocol(protocol)
+            // This will auto upgrade any required table features in the 
passed protocol as per
+            // given metadata.
+            txn.updateMetadataForNewTable(metadata)
+          case (Some(metadata), None) =>
+            // When just metadata is passed, use it.
+            // This will auto generate protocol as per metadata.
+            txn.updateMetadataForNewTable(metadata)
+          case (None, Some(protocol)) =>
+            txn.updateProtocol(protocol)
+            txn.updateMetadataForNewTable(Metadata())
+          case (None, None) =>
+            // If neither metadata nor protocol is explicitly passed, then use 
default Metadata and
+            // with the maximum protocol.
+            txn.updateMetadataForNewTable(Metadata())
+            txn.updateProtocol(Action.supportedProtocolVersion())
+        }
+        txn.commit(otherActions, op)
+      } else {
+        txn.commit(actions, op)
+      }
+    }
+
+    def commitManually(actions: Action*): Long = {
+      commitActions(ManualUpdate, actions: _*)
+    }
+
+    def commitWriteAppend(actions: Action*): Long = {
+      commitActions(Write(SaveMode.Append), actions: _*)
+    }
+  }
+
+  /** Add test-only File overloads for DeltaTable.forPath */
+  implicit class DeltaLogObjectTestHelper(deltaLog: DeltaLog.type) {
+    def forTable(spark: SparkSession, dataPath: File): DeltaLog = {
+      DeltaLog.forTable(spark, new Path(dataPath.getCanonicalPath))
+    }
+
+    def forTable(spark: SparkSession, dataPath: File, clock: Clock): DeltaLog 
= {
+      DeltaLog.forTable(spark, new Path(dataPath.getCanonicalPath), clock)
+    }
+  }
+
+  /** Helper class for working with [[TableCommitCoordinatorClient]] */
+  implicit class TableCommitCoordinatorClientTestHelper(
+      tableCommitCoordinatorClient: TableCommitCoordinatorClient) {
+
+    def commit(
+        commitVersion: Long,
+        actions: Iterator[String],
+        updatedActions: UpdatedActions): CommitResponse = {
+      tableCommitCoordinatorClient.commit(
+        commitVersion, actions, updatedActions, tableIdentifierOpt = None)
+    }
+
+    def getCommits(
+        startVersion: Option[Long] = None,
+        endVersion: Option[Long] = None): GetCommitsResponse = {
+      tableCommitCoordinatorClient.getCommits(tableIdentifierOpt = None, 
startVersion, endVersion)
+    }
+
+    def backfillToVersion(
+        version: Long,
+        lastKnownBackfilledVersion: Option[Long] = None): Unit = {
+      tableCommitCoordinatorClient.backfillToVersion(
+        tableIdentifierOpt = None, version, lastKnownBackfilledVersion)
+    }
+  }
+
+
+  /** Helper class for working with [[Snapshot]] */
+  implicit class SnapshotTestHelper(snapshot: Snapshot) {
+    def ensureCommitFilesBackfilled(): Unit = {
+      snapshot.ensureCommitFilesBackfilled(catalogTableOpt = None)
+    }
+  }
+
+  /**
+   * Helper class for working with the most recent snapshot in the deltaLog
+   */
+  implicit class DeltaLogTestHelper(deltaLog: DeltaLog) {
+    def snapshot: Snapshot = {
+      deltaLog.unsafeVolatileSnapshot
+    }
+
+    def checkpoint(): Unit = {
+      deltaLog.checkpoint(snapshot)
+    }
+
+    def checkpointInterval(): Int = {
+      deltaLog.checkpointInterval(snapshot.metadata)
+    }
+
+    def deltaRetentionMillis(): Long = {
+      deltaLog.deltaRetentionMillis(snapshot.metadata)
+    }
+
+    def enableExpiredLogCleanup(): Boolean = {
+      deltaLog.enableExpiredLogCleanup(snapshot.metadata)
+    }
+
+    def upgradeProtocol(newVersion: Protocol): Unit = {
+      upgradeProtocol(deltaLog.unsafeVolatileSnapshot, newVersion)
+    }
+
+    def upgradeProtocol(snapshot: Snapshot, newVersion: Protocol): Unit = {
+      deltaLog.upgradeProtocol(None, snapshot, newVersion)
+    }
+  }
+
+  implicit class DeltaTableV2ObjectTestHelper(dt: DeltaTableV2.type) {
+    /** Convenience overload that omits the cmd arg (which is not helpful in 
tests). */
+    def apply(spark: SparkSession, id: TableIdentifier): DeltaTableV2 =
+      dt.apply(spark, id, "test")
+  }
+
+  implicit class DeltaTableV2TestHelper(deltaTable: DeltaTableV2) {
+    /** For backward compatibility with existing unit tests */
+    def snapshot: Snapshot = deltaTable.initialSnapshot
+  }
+
+  implicit class AutoCompactObjectTestHelper(ac: AutoCompact.type) {
+    private[delta] def compact(
+      spark: SparkSession,
+      deltaLog: DeltaLog,
+      partitionPredicates: Seq[Expression] = Nil,
+      opType: String = AutoCompact.OP_TYPE): Seq[OptimizeMetrics] = {
+      AutoCompact.compact(
+        spark, deltaLog, catalogTable = None,
+        partitionPredicates, opType)
+    }
+  }
+
+  implicit class StatisticsCollectionObjectTestHelper(sc: 
StatisticsCollection.type) {
+
+    /**
+     * This is an implicit helper required for backward compatibility with 
existing
+     * unit tests. It allows to call [[StatisticsCollection.recompute]] 
without a
+     * catalog table and in the actual call, sets it to [[None]].
+     */
+    def recompute(
+      spark: SparkSession,
+      deltaLog: DeltaLog,
+      predicates: Seq[Expression] = Seq(Literal(true)),
+      fileFilter: AddFile => Boolean = af => true): Unit = {
+      StatisticsCollection.recompute(
+        spark, deltaLog, catalogTable = None, predicates, fileFilter)
+    }
+  }
+}
+// spotless:on
diff --git 
a/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala
 
b/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala
new file mode 100644
index 0000000000..26c1a69481
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package shims
+
+import org.apache.spark.sql.QueryTest
+
+// spotless:off
+trait DeltaExcludedBySparkVersionTestMixinShims extends QueryTest {
+  /**
+   * Tests that are meant for Delta compiled against Spark Latest Release 
only. Executed since this
+   * is the Spark Latest Release shim.
+   */
+  protected def testSparkLatestOnly(
+      testName: String, testTags: org.scalatest.Tag*)
+      (testFun: => Any)
+      (implicit pos: org.scalactic.source.Position): Unit = {
+    test(testName, testTags: _*)(testFun)(pos)
+  }
+
+  /**
+   * Tests that are meant for Delta compiled against Spark Master Release 
only. Ignored since this
+   * is the Spark Latest Release shim.
+   */
+  protected def testSparkMasterOnly(
+      testName: String, testTags: org.scalatest.Tag*)
+      (testFun: => Any)
+      (implicit pos: org.scalactic.source.Position): Unit = {
+    ignore(testName, testTags: _*)(testFun)(pos)
+  }
+}
+// spotless:on


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to