This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 56523dc85e [VL] Delta: Add Delta Lake write unit test for Spark 3.5 +
Delta 3.3 (#10802)
56523dc85e is described below
commit 56523dc85e30e2eaca5d599b9eb543cd95ca2315
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Sep 26 20:14:16 2025 +0200
[VL] Delta: Add Delta Lake write unit test for Spark 3.5 + Delta 3.3
(#10802)
---
.../apache/spark/sql/delta/DeleteSQLSuite.scala | 154 +++++
.../apache/spark/sql/delta/DeleteSuiteBase.scala | 565 ++++++++++++++++++
.../spark/sql/delta/DeletionVectorsTestUtils.scala | 367 ++++++++++++
.../sql/delta/DeltaColumnMappingTestUtils.scala | 487 ++++++++++++++++
.../apache/spark/sql/delta/DeltaTestUtils.scala | 635 +++++++++++++++++++++
.../test/DeltaColumnMappingSelectedTestMixin.scala | 76 +++
.../sql/delta/test/DeltaExcludedTestMixin.scala | 40 ++
.../spark/sql/delta/test/DeltaSQLCommandTest.scala | 52 ++
.../spark/sql/delta/test/DeltaSQLTestUtils.scala | 79 +++
.../spark/sql/delta/test/DeltaTestImplicits.scala | 204 +++++++
...DeltaExcludedBySparkVersionTestMixinShims.scala | 45 ++
11 files changed, 2704 insertions(+)
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
new file mode 100644
index 0000000000..950c2269d7
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.{DeltaExcludedTestMixin,
DeltaSQLCommandTest}
+
+import org.scalatest.Ignore
+
+// spotless:off
+class DeleteSQLSuite extends DeleteSuiteBase
+ with DeltaExcludedTestMixin
+ with DeltaSQLCommandTest {
+
+ import testImplicits._
+
+ override protected def executeDelete(target: String, where: String = null):
Unit = {
+ val whereClause = Option(where).map(c => s"WHERE $c").getOrElse("")
+ sql(s"DELETE FROM $target $whereClause")
+ }
+
+ override def excluded: Seq[String] = super.excluded ++
+ Seq(
+ // FIXME: Excluded by Gluten as results are mismatch.
+ "test delete on temp view - nontrivial projection - SQL TempView",
+ "test delete on temp view - nontrivial projection - Dataset TempView"
+ )
+
+ // For EXPLAIN, which is not supported in OSS
+ test("explain") {
+ append(Seq((2, 2)).toDF("key", "value"))
+ val df = sql(s"EXPLAIN DELETE FROM delta.`$tempPath` WHERE key = 2")
+ val outputs = df.collect().map(_.mkString).mkString
+ assert(outputs.contains("Delta"))
+ assert(!outputs.contains("index") && !outputs.contains("ActionLog"))
+ // no change should be made by explain
+ checkAnswer(readDeltaTable(tempPath), Row(2, 2))
+ }
+
+ test("delete from a temp view") {
+ withTable("tab") {
+ withTempView("v") {
+ Seq((1, 1), (0, 3), (1, 5)).toDF("key",
"value").write.format("delta").saveAsTable("tab")
+ spark.table("tab").as("name").createTempView("v")
+ sql("DELETE FROM v WHERE key = 1")
+ checkAnswer(spark.table("tab"), Row(0, 3))
+ }
+ }
+ }
+
+ test("delete from a SQL temp view") {
+ withTable("tab") {
+ withTempView("v") {
+ Seq((1, 1), (0, 3), (1, 5)).toDF("key",
"value").write.format("delta").saveAsTable("tab")
+ sql("CREATE TEMP VIEW v AS SELECT * FROM tab")
+ sql("DELETE FROM v WHERE key = 1 AND VALUE = 5")
+ checkAnswer(spark.table("tab"), Seq(Row(1, 1), Row(0, 3)))
+ }
+ }
+ }
+
+ Seq(true, false).foreach { partitioned =>
+ test(s"User defined _change_type column doesn't get dropped -
partitioned=$partitioned") {
+ withTable("tab") {
+ sql(
+ s"""CREATE TABLE tab USING DELTA
+ |${if (partitioned) "PARTITIONED BY (part) " else ""}
+ |TBLPROPERTIES (delta.enableChangeDataFeed = false)
+ |AS SELECT id, int(id / 10) AS part, 'foo' as _change_type
+ |FROM RANGE(1000)
+ |""".stripMargin)
+ val rowsToDelete = (1 to 1000 by 42).mkString("(", ", ", ")")
+ executeDelete("tab", s"id in $rowsToDelete")
+ sql("SELECT id, _change_type FROM tab").collect().foreach { row =>
+ val _change_type = row.getString(1)
+ assert(_change_type === "foo", s"Invalid _change_type for
id=${row.get(0)}")
+ }
+ }
+ }
+ }
+}
+
+// FIXME: Enable the test.
+// Skipping as function input_file_name doesn't get correctly resolved.
+@Ignore
+class DeleteSQLNameColumnMappingSuite extends DeleteSQLSuite
+ with DeltaColumnMappingEnableNameMode {
+
+ protected override def runOnlyTests: Seq[String] = Seq(true, false).map {
isPartitioned =>
+ s"basic case - delete from a Delta table by name -
Partition=$isPartitioned"
+ } ++ Seq(true, false).flatMap { isPartitioned =>
+ Seq(
+ s"where key columns - Partition=$isPartitioned",
+ s"where data columns - Partition=$isPartitioned")
+ }
+
+}
+
+class DeleteSQLWithDeletionVectorsSuite extends DeleteSQLSuite
+ with DeltaExcludedTestMixin
+ with DeletionVectorsTestUtils {
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ enableDeletionVectors(spark, delete = true)
+ spark.conf.set(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key,
"false")
+ }
+
+ override def excluded: Seq[String] = super.excluded ++
+ Seq(
+ // The following two tests must fail when DV is used. Covered by another
test case:
+ // "throw error when non-pinned TahoeFileIndex snapshot is used".
+ "data and partition columns - Partition=true Skipping=false",
+ "data and partition columns - Partition=false Skipping=false",
+ // The scan schema contains additional row index filter columns.
+ "nested schema pruning on data condition",
+ // The number of records is not recomputed when using DVs
+ "delete throws error if number of records increases",
+ "delete logs error if number of records are missing in stats",
+ // FIXME: Excluded by Gluten as results are mismatch.
+ "test delete on temp view - nontrivial projection - SQL TempView",
+ "test delete on temp view - nontrivial projection - Dataset TempView"
+ )
+
+ // This works correctly with DVs, but fails in classic DELETE.
+ override def testSuperSetColsTempView(): Unit = {
+ testComplexTempViews("superset cols")(
+ text = "SELECT key, value, 1 FROM tab",
+ expectResult = Row(0, 3, 1) :: Nil)
+ }
+}
+
+class DeleteSQLWithDeletionVectorsAndPredicatePushdownSuite
+ extends DeleteSQLWithDeletionVectorsSuite {
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ spark.conf.set(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX.key,
"true")
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala
new file mode 100644
index 0000000000..8ab9510ff3
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeleteSuiteBase.scala
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.functions.{lit, struct}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+
+import shims.DeltaExcludedBySparkVersionTestMixinShims
+
+// spotless:off
+abstract class DeleteSuiteBase extends QueryTest
+ with SharedSparkSession
+ with DeltaDMLTestUtils
+ with DeltaTestUtilsForTempViews
+ with DeltaExcludedBySparkVersionTestMixinShims {
+
+ import testImplicits._
+
+ protected def executeDelete(target: String, where: String = null): Unit
+
+ protected def checkDelete(
+ condition: Option[String],
+ expectedResults: Seq[Row],
+ tableName: Option[String] = None): Unit = {
+ executeDelete(target = tableName.getOrElse(s"delta.`$tempPath`"), where =
condition.orNull)
+ checkAnswer(readDeltaTable(tempPath), expectedResults)
+ }
+
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"basic case - Partition=$isPartitioned") {
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"),
partitions)
+
+ checkDelete(condition = None, Nil)
+ }
+ }
+
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"basic case - delete from a Delta table by path -
Partition=$isPartitioned") {
+ withTable("deltaTable") {
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+ append(input, partitions)
+
+ checkDelete(Some("value = 4 and key = 3"),
+ Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
+ checkDelete(Some("value = 4 and key = 1"),
+ Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil)
+ checkDelete(Some("value = 2 or key = 1"),
+ Row(0, 3) :: Nil)
+ checkDelete(Some("key = 0 or value = 99"), Nil)
+ }
+ }
+ }
+
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"basic case - delete from a Delta table by name -
Partition=$isPartitioned") {
+ withTable("delta_table") {
+ val partitionByClause = if (isPartitioned) "PARTITIONED BY (key)" else
""
+ sql(
+ s"""
+ |CREATE TABLE delta_table(key INT, value INT)
+ |USING delta
+ |OPTIONS('path'='$tempPath')
+ |$partitionByClause
+ """.stripMargin)
+
+ val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+ append(input)
+
+ checkDelete(Some("value = 4 and key = 3"),
+ Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil,
+ Some("delta_table"))
+ checkDelete(Some("value = 4 and key = 1"),
+ Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil,
+ Some("delta_table"))
+ checkDelete(Some("value = 2 or key = 1"),
+ Row(0, 3) :: Nil,
+ Some("delta_table"))
+ checkDelete(Some("key = 0 or value = 99"),
+ Nil,
+ Some("delta_table"))
+ }
+ }
+ }
+
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"basic key columns - Partition=$isPartitioned") {
+ val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ append(input, partitions)
+
+ checkDelete(Some("key > 2"), Row(2, 2) :: Row(1, 4) :: Row(1, 1) ::
Row(0, 3) :: Nil)
+ checkDelete(Some("key < 2"), Row(2, 2) :: Nil)
+ checkDelete(Some("key = 2"), Nil)
+ }
+ }
+
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"where key columns - Partition=$isPartitioned") {
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"),
partitions)
+
+ checkDelete(Some("key = 1"), Row(2, 2) :: Row(0, 3) :: Nil)
+ checkDelete(Some("key = 2"), Row(0, 3) :: Nil)
+ checkDelete(Some("key = 0"), Nil)
+ }
+ }
+
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"where data columns - Partition=$isPartitioned") {
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"),
partitions)
+
+ checkDelete(Some("value <= 2"), Row(1, 4) :: Row(0, 3) :: Nil)
+ checkDelete(Some("value = 3"), Row(1, 4) :: Nil)
+ checkDelete(Some("value != 0"), Nil)
+ }
+ }
+
+ test("where data columns and partition columns") {
+ val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+ append(input, Seq("key"))
+
+ checkDelete(Some("value = 4 and key = 3"),
+ Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
+ checkDelete(Some("value = 4 and key = 1"),
+ Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil)
+ checkDelete(Some("value = 2 or key = 1"),
+ Row(0, 3) :: Nil)
+ checkDelete(Some("key = 0 or value = 99"),
+ Nil)
+ }
+
+ Seq(true, false).foreach { skippingEnabled =>
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"data and partition columns - Partition=$isPartitioned
Skipping=$skippingEnabled") {
+ withSQLConf(DeltaSQLConf.DELTA_STATS_SKIPPING.key ->
skippingEnabled.toString) {
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+ append(input, partitions)
+
+ checkDelete(Some("value = 4 and key = 3"),
+ Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
+ checkDelete(Some("value = 4 and key = 1"),
+ Row(2, 2) :: Row(1, 1) :: Row(0, 3) :: Nil)
+ checkDelete(Some("value = 2 or key = 1"),
+ Row(0, 3) :: Nil)
+ checkDelete(Some("key = 0 or value = 99"),
+ Nil)
+ }
+ }
+ }
+ }
+
+ test("Negative case - non-Delta target") {
+ Seq((1, 1), (0, 3), (1, 5)).toDF("key1", "value")
+ .write.format("parquet").mode("append").save(tempPath)
+ val e = intercept[DeltaAnalysisException] {
+ executeDelete(target = s"delta.`$tempPath`")
+ }.getMessage
+ assert(e.contains("DELETE destination only supports Delta sources") ||
+ e.contains("is not a Delta table") || e.contains("doesn't exist") ||
+ e.contains("Incompatible format"))
+ }
+
+ test("Negative case - non-deterministic condition") {
+ append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"))
+ val e = intercept[AnalysisException] {
+ executeDelete(target = s"delta.`$tempPath`", where = "rand() > 0.5")
+ }.getMessage
+ assert(e.contains("nondeterministic expressions are only allowed in") ||
+ e.contains("The operator expects a deterministic expression"))
+ }
+
+ test("Negative case - DELETE the child directory") {
+ append(Seq((2, 2), (3, 2)).toDF("key", "value"), partitionBy = "key" ::
Nil)
+ val e = intercept[AnalysisException] {
+ executeDelete(target = s"delta.`$tempPath/key=2`", where = "value = 2")
+ }.getMessage
+ assert(e.contains("Expect a full scan of Delta sources, but found a
partial scan"))
+ }
+
+ test("delete cached table by name") {
+ withTable("cached_delta_table") {
+ Seq((2, 2), (1, 4)).toDF("key", "value")
+ .write.format("delta").saveAsTable("cached_delta_table")
+
+ spark.table("cached_delta_table").cache()
+ spark.table("cached_delta_table").collect()
+ executeDelete(target = "cached_delta_table", where = "key = 2")
+ checkAnswer(spark.table("cached_delta_table"), Row(1, 4) :: Nil)
+ }
+ }
+
+ test("delete cached table by path") {
+ Seq((2, 2), (1, 4)).toDF("key", "value")
+ .write.mode("overwrite").format("delta").save(tempPath)
+ spark.read.format("delta").load(tempPath).cache()
+ spark.read.format("delta").load(tempPath).collect()
+ executeDelete(s"delta.`$tempPath`", where = "key = 2")
+ checkAnswer(spark.read.format("delta").load(tempPath), Row(1, 4) :: Nil)
+ }
+
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"condition having current_date - Partition=$isPartitioned") {
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ append(
+ Seq((java.sql.Date.valueOf("1969-12-31"), 2),
+ (java.sql.Date.valueOf("2099-12-31"), 4))
+ .toDF("key", "value"), partitions)
+
+ checkDelete(Some("CURRENT_DATE > key"),
+ Row(java.sql.Date.valueOf("2099-12-31"), 4) :: Nil)
+ checkDelete(Some("CURRENT_DATE <= key"), Nil)
+ }
+ }
+
+ test("condition having current_timestamp - Partition by Timestamp") {
+ append(
+ Seq((java.sql.Timestamp.valueOf("2012-12-31 16:00:10.011"), 2),
+ (java.sql.Timestamp.valueOf("2099-12-31 16:00:10.011"), 4))
+ .toDF("key", "value"), Seq("key"))
+
+ checkDelete(Some("CURRENT_TIMESTAMP > key"),
+ Row(java.sql.Timestamp.valueOf("2099-12-31 16:00:10.011"), 4) :: Nil)
+ checkDelete(Some("CURRENT_TIMESTAMP <= key"), Nil)
+ }
+
+ Seq(true, false).foreach { isPartitioned =>
+ test(s"foldable condition - Partition=$isPartitioned") {
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"),
partitions)
+
+ val allRows = Row(2, 2) :: Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil
+
+ checkDelete(Some("false"), allRows)
+ checkDelete(Some("1 <> 1"), allRows)
+ checkDelete(Some("1 > null"), allRows)
+ checkDelete(Some("true"), Nil)
+ checkDelete(Some("1 = 1"), Nil)
+ }
+ }
+
+ test("SC-12232: should not delete the rows where condition evaluates to
null") {
+ append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key",
"value").coalesce(1))
+
+ // "null = null" evaluates to null
+ checkDelete(Some("value = null"),
+ Row("a", null) :: Row("b", null) :: Row("c", "v") :: Row("d", "vv") ::
Nil)
+
+ // these expressions evaluate to null when value is null
+ checkDelete(Some("value = 'v'"),
+ Row("a", null) :: Row("b", null) :: Row("d", "vv") :: Nil)
+ checkDelete(Some("value <> 'v'"),
+ Row("a", null) :: Row("b", null) :: Nil)
+ }
+
+ test("SC-12232: delete rows with null values using isNull") {
+ append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key",
"value").coalesce(1))
+
+ // when value is null, this expression evaluates to true
+ checkDelete(Some("value is null"),
+ Row("c", "v") :: Row("d", "vv") :: Nil)
+ }
+
+ test("SC-12232: delete rows with null values using EqualNullSafe") {
+ append(Seq(("a", null), ("b", null), ("c", "v"), ("d", "vv")).toDF("key",
"value").coalesce(1))
+
+ // when value is null, this expression evaluates to true
+ checkDelete(Some("value <=> null"),
+ Row("c", "v") :: Row("d", "vv") :: Nil)
+ }
+
+ test("do not support subquery test") {
+ append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"))
+ Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("c",
"d").createOrReplaceTempView("source")
+
+ // basic subquery
+ val e0 = intercept[AnalysisException] {
+ executeDelete(target = s"delta.`$tempPath`", "key < (SELECT max(c) FROM
source)")
+ }.getMessage
+ assert(e0.contains("Subqueries are not supported"))
+
+ // subquery with EXISTS
+ val e1 = intercept[AnalysisException] {
+ executeDelete(target = s"delta.`$tempPath`", "EXISTS (SELECT max(c) FROM
source)")
+ }.getMessage
+ assert(e1.contains("Subqueries are not supported"))
+
+ // subquery with NOT EXISTS
+ val e2 = intercept[AnalysisException] {
+ executeDelete(target = s"delta.`$tempPath`", "NOT EXISTS (SELECT max(c)
FROM source)")
+ }.getMessage
+ assert(e2.contains("Subqueries are not supported"))
+
+ // subquery with IN
+ val e3 = intercept[AnalysisException] {
+ executeDelete(target = s"delta.`$tempPath`", "key IN (SELECT max(c) FROM
source)")
+ }.getMessage
+ assert(e3.contains("Subqueries are not supported"))
+
+ // subquery with NOT IN
+ val e4 = intercept[AnalysisException] {
+ executeDelete(target = s"delta.`$tempPath`", "key NOT IN (SELECT max(c)
FROM source)")
+ }.getMessage
+ assert(e4.contains("Subqueries are not supported"))
+ }
+
+ test("schema pruning on data condition") {
+ val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+ append(input, Nil)
+ // Start from a cached snapshot state
+ deltaLog.update().stateDF
+
+ val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
+ checkDelete(Some("key = 2"),
+ Row(1, 4) :: Row(1, 1) :: Row(0, 3) :: Nil)
+ }
+
+ val scans = executedPlans.flatMap(_.collect {
+ case f: FileSourceScanExec => f
+ })
+
+ // The first scan is for finding files to delete. We only are matching
against the key
+ // so that should be the only field in the schema
+ assert(scans.head.schema.findNestedField(Seq("key")).nonEmpty)
+ assert(scans.head.schema.findNestedField(Seq("value")).isEmpty)
+ }
+
+
+ test("nested schema pruning on data condition") {
+ val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
+ .select(struct("key", "value").alias("nested"))
+ append(input, Nil)
+ // Start from a cached snapshot state
+ deltaLog.update().stateDF
+
+ val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
+ checkDelete(Some("nested.key = 2"),
+ Row(Row(1, 4)) :: Row(Row(1, 1)) :: Row(Row(0, 3)) :: Nil)
+ }
+
+ val scans = executedPlans.flatMap(_.collect {
+ case f: FileSourceScanExec => f
+ })
+
+ assert(scans.head.schema == StructType.fromDDL("nested STRUCT<key: int>"))
+ }
+
+ /**
+ * @param function the unsupported function.
+ * @param functionType The type of the unsupported expression to be tested.
+ * @param data the data in the table.
+ * @param where the where clause containing the unsupported expression.
+ * @param expectException whether an exception is expected to be thrown
+ * @param customErrorRegex customized error regex.
+ */
+ def testUnsupportedExpression(
+ function: String,
+ functionType: String,
+ data: => DataFrame,
+ where: String,
+ expectException: Boolean,
+ customErrorRegex: Option[String] = None): Unit = {
+ test(s"$functionType functions in delete - expect exception:
$expectException") {
+ withTable("deltaTable") {
+ data.write.format("delta").saveAsTable("deltaTable")
+
+ val expectedErrorRegex = "(?s).*(?i)unsupported.*(?i).*Invalid
expressions.*"
+
+ var catchException = true
+
+ var errorRegex = if (functionType.equals("Generate")) {
+ ".*Subqueries are not supported in the DELETE.*"
+ } else customErrorRegex.getOrElse(expectedErrorRegex)
+
+
+ if (catchException) {
+ val dataBeforeException =
spark.read.format("delta").table("deltaTable").collect()
+ val e = intercept[Exception] {
+ executeDelete(target = "deltaTable", where = where)
+ }
+ val message = if (e.getCause != null) {
+ e.getCause.getMessage
+ } else e.getMessage
+ assert(message.matches(errorRegex))
+ checkAnswer(spark.read.format("delta").table("deltaTable"),
dataBeforeException)
+ } else {
+ executeDelete(target = "deltaTable", where = where)
+ }
+ }
+ }
+ }
+
+ testUnsupportedExpression(
+ function = "row_number",
+ functionType = "Window",
+ data = Seq((2, 2), (1, 4)).toDF("key", "value"),
+ where = "row_number() over (order by value) > 1",
+ expectException = true
+ )
+
+ testUnsupportedExpression(
+ function = "max",
+ functionType = "Aggregate",
+ data = Seq((2, 2), (1, 4)).toDF("key", "value"),
+ where = "key > max(value)",
+ expectException = true
+ )
+
+ // Explode functions are supported in where if only one row generated.
+ testUnsupportedExpression(
+ function = "explode",
+ functionType = "Generate",
+ data = Seq((2, List(2))).toDF("key", "value"),
+ where = "key = (select explode(value) from deltaTable)",
+ expectException = false // generate only one row, no exception.
+ )
+
+ // Explode functions are supported in where but if there's more than one row
generated,
+ // it will throw an exception.
+ testUnsupportedExpression(
+ function = "explode",
+ functionType = "Generate",
+ data = Seq((2, List(2)), (1, List(4, 5))).toDF("key", "value"),
+ where = "key = (select explode(value) from deltaTable)",
+ expectException = true, // generate more than one row. Exception expected.
+ customErrorRegex =
+ Some(".*More than one row returned by a subquery used as an
expression(?s).*")
+ )
+
+ Seq(true, false).foreach { isPartitioned =>
+ val name = s"test delete on temp view - basic - Partition=$isPartitioned"
+ testWithTempView(name) { isSQLTempView =>
+ val partitions = if (isPartitioned) "key" :: Nil else Nil
+ append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"),
partitions)
+ createTempViewFromTable(s"delta.`$tempPath`", isSQLTempView)
+ checkDelete(
+ condition = Some("key <= 1"),
+ expectedResults = Row(2, 2) :: Nil,
+ tableName = Some("v"))
+ }
+ }
+
+ protected def testInvalidTempViews(name: String)(
+ text: String,
+ expectedErrorMsgForSQLTempView: String = null,
+ expectedErrorMsgForDataSetTempView: String = null,
+ expectedErrorClassForSQLTempView: String = null,
+ expectedErrorClassForDataSetTempView: String = null): Unit = {
+ testWithTempView(s"test delete on temp view - $name") { isSQLTempView =>
+ withTable("tab") {
+ Seq((0, 3), (1, 2)).toDF("key",
"value").write.format("delta").saveAsTable("tab")
+ if (isSQLTempView) {
+ sql(s"CREATE TEMP VIEW v AS $text")
+ } else {
+ sql(text).createOrReplaceTempView("v")
+ }
+ val ex = intercept[AnalysisException] {
+ executeDelete(
+ "v",
+ "key >= 1 and value < 3"
+ )
+ }
+ testErrorMessageAndClass(
+ isSQLTempView,
+ ex,
+ expectedErrorMsgForSQLTempView,
+ expectedErrorMsgForDataSetTempView,
+ expectedErrorClassForSQLTempView,
+ expectedErrorClassForDataSetTempView)
+ }
+ }
+ }
+ testInvalidTempViews("subset cols")(
+ text = "SELECT key FROM tab",
+ expectedErrorClassForSQLTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+ expectedErrorClassForDataSetTempView = "UNRESOLVED_COLUMN.WITH_SUGGESTION"
+ )
+
+ // Need to be able to override this, because it works in some configurations.
+ protected def testSuperSetColsTempView(): Unit = {
+ testInvalidTempViews("superset cols")(
+ text = "SELECT key, value, 1 FROM tab",
+ // The analyzer can't tell whether the table originally had the extra
column or not.
+ expectedErrorMsgForSQLTempView = "Can't resolve column 1 in root",
+ expectedErrorMsgForDataSetTempView = "Can't resolve column 1 in root"
+ )
+ }
+
+ testSuperSetColsTempView()
+
+ protected def testComplexTempViews(name: String)(
+ text: String,
+ expectResult: Seq[Row]): Unit = {
+ testWithTempView(s"test delete on temp view - $name") { isSQLTempView =>
+ withTable("tab") {
+ Seq((0, 3), (1, 2)).toDF("key",
"value").write.format("delta").saveAsTable("tab")
+ createTempViewFromSelect(text, isSQLTempView)
+ executeDelete(
+ "v",
+ "key >= 1 and value < 3"
+ )
+ checkAnswer(spark.read.format("delta").table("v"), expectResult)
+ }
+ }
+ }
+
+ testComplexTempViews("nontrivial projection")(
+ text = "SELECT value as key, key as value FROM tab",
+ expectResult = Row(3, 0) :: Nil
+ )
+
+ testComplexTempViews("view with too many internal aliases")(
+ text = "SELECT * FROM (SELECT * FROM tab AS t1) AS t2",
+ expectResult = Row(0, 3) :: Nil
+ )
+
+ testSparkMasterOnly("Variant type") {
+ val dstDf = sql(
+ """SELECT parse_json(cast(id as string)) v, id i
+ FROM range(3)""")
+ append(dstDf)
+
+ executeDelete(target = s"delta.`$tempPath`", where = "to_json(v) = '1'")
+
+ checkAnswer(readDeltaTable(tempPath).selectExpr("i", "to_json(v)"),
+ Seq(Row(0, "0"), Row(2, "2")))
+ }
+
+ test("delete on partitioned table with special chars") {
+ val partValue = "part%one"
+ spark.range(0, 3, 1, 1).toDF("key").withColumn("value", lit(partValue))
+ .write.format("delta").partitionBy("value").save(tempPath)
+ checkDelete(
+ condition = Some(s"value = '$partValue' and key = 1"),
+ expectedResults = Row(0, partValue) :: Row(2, partValue) :: Nil)
+ checkDelete(
+ condition = Some(s"value = '$partValue' and key = 2"),
+ expectedResults = Row(0, partValue) :: Nil)
+ checkDelete(
+ condition = Some(s"value = '$partValue'"),
+ expectedResults = Nil)
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala
new file mode 100644
index 0000000000..5bb022c12d
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.sql.{DataFrame, QueryTest, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.delta.DeltaOperations.Truncate
+import org.apache.spark.sql.delta.actions.{Action, AddFile,
DeletionVectorDescriptor, RemoveFile}
+import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray,
RoaringBitmapArrayFormat}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
+import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
+import org.apache.spark.sql.delta.util.PathWithFileSystem
+import org.apache.spark.sql.functions.{col, lit}
+import org.apache.spark.sql.test.SharedSparkSession
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
+
+import java.io.File
+import java.util.UUID
+
+// spotless:off
+/** Collection of test utilities related with persistent Deletion Vectors. */
+trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession with
DeltaSQLTestUtils {
+
+ def enableDeletionVectors(
+ spark: SparkSession,
+ delete: Boolean = false,
+ update: Boolean = false,
+ merge: Boolean = false): Unit = {
+ val global = delete || update || merge
+ spark.conf
+
.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey,
global.toString)
+ spark.conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key,
delete.toString)
+ spark.conf.set(DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key,
update.toString)
+ spark.conf.set(DeltaSQLConf.MERGE_USE_PERSISTENT_DELETION_VECTORS.key,
merge.toString)
+ }
+
+ def enableDeletionVectorsForAllSupportedOperations(spark: SparkSession):
Unit =
+ enableDeletionVectors(spark, delete = true, update = true)
+
+ def testWithDVs(testName: String, testTags: org.scalatest.Tag*)(thunk: =>
Unit): Unit = {
+ test(testName, testTags : _*) {
+ withDeletionVectorsEnabled() {
+ thunk
+ }
+ }
+ }
+
+ /** Run a thunk with Deletion Vectors enabled/disabled. */
+ def withDeletionVectorsEnabled(enabled: Boolean = true)(thunk: => Unit):
Unit = {
+ val enabledStr = enabled.toString
+ withSQLConf(
+ DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey ->
enabledStr,
+ DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr,
+ DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr,
+ DeltaSQLConf.MERGE_USE_PERSISTENT_DELETION_VECTORS.key -> enabledStr) {
+ thunk
+ }
+ }
+
+ /** Helper to run 'fn' with a temporary Delta table. */
+ def withTempDeltaTable(
+ dataDF: DataFrame,
+ partitionBy: Seq[String] = Seq.empty,
+ enableDVs: Boolean = true,
+ conf: Seq[(String, String)] = Nil)
+ (fn: (() => io.delta.tables.DeltaTable, DeltaLog) => Unit): Unit = {
+ withTempPath { path =>
+ val tablePath = new Path(path.getAbsolutePath)
+ withSQLConf(conf: _*) {
+ dataDF.write
+ .option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key,
enableDVs.toString)
+ .partitionBy(partitionBy: _*)
+ .format("delta")
+ .save(tablePath.toString)
+ }
+ // DeltaTable hangs on to the DataFrame it is created with for the
entire object lifetime.
+ // That means subsequent `targetTable.toDF` calls will return the same
snapshot.
+ // The DV tests are generally written assuming `targetTable.toDF` would
return a new snapshot.
+ // So create a function here instead of a n instance, so
`targetTable().toDF`
+ // will actually provide a new snapshot.
+ val targetTable =
+ () => io.delta.tables.DeltaTable.forPath(tablePath.toString)
+ val targetLog = DeltaLog.forTable(spark, tablePath)
+ fn(targetTable, targetLog)
+ }
+ }
+
+ /** Create a temp path which contains special characters. */
+ override def withTempPath(f: File => Unit): Unit = {
+ super.withTempPath(prefix = "s p a r k %2a")(f)
+ }
+
+ /** Create a temp path which contains special characters. */
+ override protected def withTempDir(f: File => Unit): Unit = {
+ super.withTempDir(prefix = "s p a r k %2a")(f)
+ }
+
+ /** Helper that verifies whether a defined number of DVs exist */
+ def verifyDVsExist(targetLog: DeltaLog, filesWithDVsSize: Int): Unit = {
+ val filesWithDVs = getFilesWithDeletionVectors(targetLog)
+ assert(filesWithDVs.size === filesWithDVsSize)
+ assertDeletionVectorsExist(targetLog, filesWithDVs)
+ }
+
+ /** Returns all [[AddFile]] actions of a Delta table that contain Deletion
Vectors. */
+ def getFilesWithDeletionVectors(log: DeltaLog): Seq[AddFile] =
+ log.update().allFiles.collect().filter(_.deletionVector != null).toSeq
+
+ /** Lists the Deletion Vectors files of a table. */
+ def listDeletionVectors(log: DeltaLog): Seq[File] = {
+ val dir = new File(log.dataPath.toUri.getPath)
+ dir.listFiles().filter(_.getName.startsWith(
+ DeletionVectorDescriptor.DELETION_VECTOR_FILE_NAME_CORE))
+ }
+
+ /** Helper to check that the Deletion Vectors of the provided file actions
exist on disk. */
+ def assertDeletionVectorsExist(log: DeltaLog, filesWithDVs: Seq[AddFile]):
Unit = {
+ val tablePath = new Path(log.dataPath.toUri.getPath)
+ for (file <- filesWithDVs) {
+ val dv = file.deletionVector
+ assert(dv != null)
+ assert(dv.isOnDisk && !dv.isInline)
+ assert(dv.offset.isDefined)
+
+ // Check that DV exists.
+ val dvPath = dv.absolutePath(tablePath)
+ assert(new File(dvPath.toString).exists(), s"DV not found $dvPath")
+
+ // Check that cardinality is correct.
+ val bitmap = newDVStore.read(dvPath, dv.offset.get, dv.sizeInBytes)
+ assert(dv.cardinality === bitmap.cardinality)
+ }
+ }
+
+ /** Enable persistent deletion vectors in new Delta tables. */
+ def enableDeletionVectorsInNewTables(conf: RuntimeConfig): Unit =
+
conf.set(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey,
"true")
+
+ /** Enable persistent Deletion Vectors in a Delta table. */
+ def enableDeletionVectorsInTable(tablePath: Path, enable: Boolean): Unit =
+ spark.sql(
+ s"""ALTER TABLE delta.`$tablePath`
+ |SET TBLPROPERTIES
('${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = '$enable')
+ |""".stripMargin)
+
+ /** Enable persistent Deletion Vectors in a Delta table. */
+ def enableDeletionVectorsInTable(deltaLog: DeltaLog, enable: Boolean =
true): Unit =
+ enableDeletionVectorsInTable(deltaLog.dataPath, enable)
+
+ /** Enable persistent deletion vectors in new tables and DELETE DML
commands. */
+ def enableDeletionVectors(conf: RuntimeConfig): Unit = {
+ enableDeletionVectorsInNewTables(conf)
+ conf.set(DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key, "true")
+ }
+
+ // ======== HELPER METHODS TO WRITE DVs ==========
+ /** Helper method to remove the specified rows in the given file using DVs */
+ protected def removeRowsFromFileUsingDV(
+ log: DeltaLog,
+ addFile: AddFile,
+ rowIds: Seq[Long]): Seq[Action] = {
+ val dv = RoaringBitmapArray(rowIds: _*)
+ writeFileWithDV(log, addFile, dv)
+ }
+
+ /** Utility method to remove a ratio of rows from the given file */
+ protected def deleteRows(
+ log: DeltaLog, file: AddFile, approxPhyRows: Long, ratioOfRowsToDelete:
Double): Unit = {
+ val numRowsToDelete =
+ Math.ceil(ratioOfRowsToDelete *
file.numPhysicalRecords.getOrElse(approxPhyRows)).toInt
+ removeRowsFromFile(log, file, Seq.range(0, numRowsToDelete))
+ }
+
+ /** Utility method to remove the given rows from the given file using DVs */
+ protected def removeRowsFromFile(
+ log: DeltaLog, addFile: AddFile, rowIndexesToRemove: Seq[Long]): Unit = {
+ val txn = log.startTransaction()
+ val actions = removeRowsFromFileUsingDV(log, addFile, rowIndexesToRemove)
+ txn.commit(actions, Truncate())
+ }
+
+ protected def getFileActionsInLastVersion(log: DeltaLog): (Seq[AddFile],
Seq[RemoveFile]) = {
+ val version = log.update().version
+ val allFiles = log.getChanges(version).toSeq.head._2
+ val add = allFiles.collect { case a: AddFile => a }
+ val remove = allFiles.collect { case r: RemoveFile => r }
+ (add, remove)
+ }
+
+ protected def serializeRoaringBitmapArrayWithDefaultFormat(
+ dv: RoaringBitmapArray): Array[Byte] = {
+ val serializationFormat = RoaringBitmapArrayFormat.Portable
+ dv.serializeAsByteArray(serializationFormat)
+ }
+
+ /**
+ * Produce a new [[AddFile]] that will store `dv` in the log using default
settings for choosing
+ * inline or on-disk storage.
+ *
+ * Also returns the corresponding [[RemoveFile]] action for `currentFile`.
+ *
+ * TODO: Always on-disk for now. Inline support comes later.
+ */
+ protected def writeFileWithDV(
+ log: DeltaLog,
+ currentFile: AddFile,
+ dv: RoaringBitmapArray): Seq[Action] = {
+ writeFileWithDVOnDisk(log, currentFile, dv)
+ }
+
+ /** Name of the partition column used by [[createTestDF()]]. */
+ val PARTITION_COL = "partitionColumn"
+
+ def createTestDF(
+ start: Long,
+ end: Long,
+ numFiles: Int,
+ partitionColumn: Option[Int] = None): DataFrame = {
+ val df = spark.range(start, end, 1, numFiles).withColumn("v", col("id"))
+ if (partitionColumn.isEmpty) {
+ df
+ } else {
+ df.withColumn(PARTITION_COL, lit(partitionColumn.get))
+ }
+ }
+
+ /**
+ * Produce a new [[AddFile]] that will reference the `dv` in the log while
storing it on-disk.
+ *
+ * Also returns the corresponding [[RemoveFile]] action for `currentFile`.
+ */
+ protected def writeFileWithDVOnDisk(
+ log: DeltaLog,
+ currentFile: AddFile,
+ dv: RoaringBitmapArray): Seq[Action] = writeFilesWithDVsOnDisk(log,
Seq((currentFile, dv)))
+
+ protected def withDVWriter[T](
+ log: DeltaLog,
+ dvFileID: UUID)(fn: DeletionVectorStore.Writer => T): T = {
+ val dvStore = newDVStore
+ // scalastyle:off deltahadoopconfiguration
+ val conf = spark.sessionState.newHadoopConf()
+ // scalastyle:on deltahadoopconfiguration
+ val tableWithFS = PathWithFileSystem.withConf(log.dataPath, conf)
+ val dvPath =
+
DeletionVectorStore.assembleDeletionVectorPathWithFileSystem(tableWithFS,
dvFileID)
+ val writer = dvStore.createWriter(dvPath)
+ try {
+ fn(writer)
+ } finally {
+ writer.close()
+ }
+ }
+
+ /**
+ * Produce new [[AddFile]] actions that will reference associated DVs in the
log while storing
+ * all DVs in the same file on-disk.
+ *
+ * Also returns the corresponding [[RemoveFile]] actions for the original
file entries.
+ */
+ protected def writeFilesWithDVsOnDisk(
+ log: DeltaLog,
+ filesWithDVs: Seq[(AddFile, RoaringBitmapArray)]): Seq[Action] = {
+ val dvFileId = UUID.randomUUID()
+ withDVWriter(log, dvFileId) { writer =>
+ filesWithDVs.flatMap { case (currentFile, dv) =>
+ val range =
writer.write(serializeRoaringBitmapArrayWithDefaultFormat(dv))
+ val dvData = DeletionVectorDescriptor.onDiskWithRelativePath(
+ id = dvFileId,
+ sizeInBytes = range.length,
+ cardinality = dv.cardinality,
+ offset = Some(range.offset))
+ val (add, remove) = currentFile.removeRows(
+ dvData,
+ updateStats = true
+ )
+ Seq(add, remove)
+ }
+ }
+ }
+
+ /**
+ * Removes the `numRowsToRemovePerFile` from each file via DV.
+ * Returns the total number of rows removed.
+ */
+ protected def removeRowsFromAllFilesInLog(
+ log: DeltaLog,
+ numRowsToRemovePerFile: Long): Long = {
+ var numFiles: Option[Int] = None
+ // This is needed to make the manual commit work correctly, since we are
not actually
+ // running a command that produces metrics.
+ withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "false") {
+ val txn = log.startTransaction()
+ val allAddFiles = txn.snapshot.allFiles.collect()
+ numFiles = Some(allAddFiles.length)
+ val bitmap = RoaringBitmapArray(0L until numRowsToRemovePerFile: _*)
+ val actions = allAddFiles.flatMap { file =>
+ if (file.numPhysicalRecords.isDefined) {
+ // Only when stats are enabled. Can't check when stats are disabled
+ assert(file.numPhysicalRecords.get > numRowsToRemovePerFile)
+ }
+ writeFileWithDV(log, file, bitmap)
+ }
+ txn.commit(actions, DeltaOperations.Delete(predicate = Seq.empty))
+ }
+ numFiles.get * numRowsToRemovePerFile
+ }
+
+ def newDVStore(): DeletionVectorStore = {
+ // scalastyle:off deltahadoopconfiguration
+ DeletionVectorStore.createInstance(spark.sessionState.newHadoopConf())
+ // scalastyle:on deltahadoopconfiguration
+ }
+
+ /**
+ * Updates an [[AddFile]] with a [[DeletionVectorDescriptor]].
+ */
+ protected def updateFileDV(
+ addFile: AddFile,
+ dvDescriptor: DeletionVectorDescriptor): (AddFile, RemoveFile) = {
+ addFile.removeRows(
+ dvDescriptor,
+ updateStats = true
+ )
+ }
+
+ /** Delete the DV file in the given [[AddFile]]. Assumes the [[AddFile]] has
a valid DV. */
+ protected def deleteDVFile(tablePath: String, addFile: AddFile): Unit = {
+ assert(addFile.deletionVector != null)
+ val dvPath = addFile.deletionVector.absolutePath(new Path(tablePath))
+ FileUtils.delete(new File(dvPath.toString))
+ }
+
+ /**
+ * Creates a [[DeletionVectorDescriptor]] from an [[RoaringBitmapArray]]
+ */
+ protected def writeDV(
+ log: DeltaLog,
+ bitmapArray: RoaringBitmapArray): DeletionVectorDescriptor = {
+ val dvFileId = UUID.randomUUID()
+ withDVWriter(log, dvFileId) { writer =>
+ val range =
writer.write(serializeRoaringBitmapArrayWithDefaultFormat(bitmapArray))
+ DeletionVectorDescriptor.onDiskWithRelativePath(
+ id = dvFileId,
+ sizeInBytes = range.length,
+ cardinality = bitmapArray.cardinality,
+ offset = Some(range.offset))
+ }
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala
new file mode 100644
index 0000000000..68c47b42bb
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{Column, Dataset}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
+import org.apache.spark.sql.delta.schema.SchemaUtils
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{AtomicType, StructField, StructType}
+
+import org.apache.hadoop.fs.Path
+
+import java.io.File
+
+import scala.collection.mutable
+
+// spotless:off
+trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession {
+
+ import testImplicits._
+
+ protected def columnMappingMode: String = NoMapping.name
+
+ private val PHYSICAL_NAME_REGEX =
+
"col-[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r
+
+ implicit class PhysicalNameString(s: String) {
+ def phy(deltaLog: DeltaLog): String = {
+ PHYSICAL_NAME_REGEX
+ .findFirstIn(s)
+ .getOrElse(getPhysicalName(s, deltaLog))
+ }
+ }
+
+ protected def columnMappingEnabled: Boolean = {
+ columnMappingModeString != "none"
+ }
+
+ protected def columnMappingModeString: String = {
+
spark.conf.getOption(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey)
+ .getOrElse("none")
+ }
+
+ /**
+ * Check if two schemas are equal ignoring column mapping metadata
+ * @param schema1 Schema
+ * @param schema2 Schema
+ */
+ protected def assertEqual(schema1: StructType, schema2: StructType): Unit = {
+ if (columnMappingEnabled) {
+ assert(
+ DeltaColumnMapping.dropColumnMappingMetadata(schema1) ==
+ DeltaColumnMapping.dropColumnMappingMetadata(schema2)
+ )
+ } else {
+ assert(schema1 == schema2)
+ }
+ }
+
+ /**
+ * Check if two table configurations are equal ignoring column mapping
metadata
+ * @param config1 Table config
+ * @param config2 Table config
+ */
+ protected def assertEqual(
+ config1: Map[String, String],
+ config2: Map[String, String]): Unit = {
+ if (columnMappingEnabled) {
+ assert(dropColumnMappingConfigurations(config1) ==
dropColumnMappingConfigurations(config2))
+ } else {
+ assert(config1 == config2)
+ }
+ }
+
+ /**
+ * Check if a partition with specific values exists.
+ * Handles both column mapped and non-mapped cases
+ * @param partCol Partition column name
+ * @param partValue Partition value
+ * @param deltaLog DeltaLog
+ */
+ protected def assertPartitionWithValueExists(
+ partCol: String,
+ partValue: String,
+ deltaLog: DeltaLog): Unit = {
+ assert(getPartitionFilePathsWithValue(partCol, partValue,
deltaLog).nonEmpty)
+ }
+
+ /**
+ * Assert partition exists in an array of set of partition names/paths
+ * @param partCol Partition column name
+ * @param deltaLog Delta log
+ * @param inputFiles Input files to scan for DF
+ */
+ protected def assertPartitionExists(
+ partCol: String,
+ deltaLog: DeltaLog,
+ inputFiles: Array[String]): Unit = {
+ val physicalName = partCol.phy(deltaLog)
+ val allFiles = deltaLog.update().allFiles.collect()
+ // NOTE: inputFiles are *not* URL-encoded.
+ val filesWithPartitions = inputFiles.map { f =>
+ allFiles.filter { af =>
+ f.contains(af.toPath.toString)
+ }.flatMap(_.partitionValues.keys).toSet
+ }
+ assert(filesWithPartitions.forall(p => p.count(_ == physicalName) > 0))
+ // for non-column mapped mode, we can check the file paths as well
+ if (!columnMappingEnabled) {
+ assert(inputFiles.forall(path => path.contains(s"$physicalName=")),
+ s"${inputFiles.toSeq.mkString("\n")}\ndidn't contain partition
columns $physicalName")
+ }
+ }
+
+ /**
+ * Load Deltalog from path
+ * @param pathOrIdentifier Location
+ * @param isIdentifier Whether the previous argument is a metastore
identifier
+ * @return
+ */
+ protected def loadDeltaLog(pathOrIdentifier: String, isIdentifier: Boolean =
false): DeltaLog = {
+ if (isIdentifier) {
+ DeltaLog.forTable(spark, TableIdentifier(pathOrIdentifier))
+ } else {
+ DeltaLog.forTable(spark, pathOrIdentifier)
+ }
+ }
+
+ /**
+ * Convert a (nested) column string to sequence of name parts
+ * @param col Column string
+ * @return Sequence of parts
+ */
+ protected def columnNameToParts(col: String): Seq[String] = {
+ UnresolvedAttribute.parseAttributeName(col)
+ }
+
+ /**
+ * Get partition file paths for a specific partition value
+ * @param partCol Logical or physical partition name
+ * @param partValue Partition value
+ * @param deltaLog DeltaLog
+ * @return List of paths
+ */
+ protected def getPartitionFilePathsWithValue(
+ partCol: String,
+ partValue: String,
+ deltaLog: DeltaLog): Array[String] = {
+ getPartitionFilePaths(partCol, deltaLog).getOrElse(partValue, Array.empty)
+ }
+
+ /**
+ * Get the partition value for null
+ */
+ protected def nullPartitionValue: String = {
+ if (columnMappingEnabled) {
+ null
+ } else {
+ ExternalCatalogUtils.DEFAULT_PARTITION_NAME
+ }
+ }
+
+ /**
+ * Get partition file paths grouped by partition value
+ * @param partCol Logical or physical partition name
+ * @param deltaLog DeltaLog
+ * @return Partition value to paths
+ */
+ protected def getPartitionFilePaths(
+ partCol: String,
+ deltaLog: DeltaLog): Map[String, Array[String]] = {
+ if (columnMappingEnabled) {
+ val colName = partCol.phy(deltaLog)
+ deltaLog.update().allFiles.collect()
+ .groupBy(_.partitionValues(colName))
+ .mapValues(_.map(deltaLog.dataPath.toUri.getPath + "/" + _.path)).toMap
+ } else {
+ val partColEscaped = s"${ExternalCatalogUtils.escapePathName(partCol)}"
+ val dataPath = new File(deltaLog.dataPath.toUri.getPath)
+ dataPath.listFiles().filter(_.getName.startsWith(s"$partColEscaped="))
+ .groupBy(_.getName.split("=").last).mapValues(_.map(_.getPath)).toMap
+ }
+ }
+
+ /**
+ * Group a list of input file paths by partition key-value pair w.r.t. delta
log
+ * @param inputFiles Input file paths
+ * @param deltaLog Delta log
+ * @return A mapped array each with the corresponding partition keys
+ */
+ protected def groupInputFilesByPartition(
+ inputFiles: Array[String],
+ deltaLog: DeltaLog): Map[(String, String), Array[String]] = {
+ if (columnMappingEnabled) {
+ val allFiles = deltaLog.update().allFiles.collect()
+ val grouped = inputFiles.flatMap { f =>
+ allFiles.find {
+ af => f.contains(af.toPath.toString)
+ }.head.partitionValues.map(entry => (f, entry))
+ }.groupBy(_._2)
+ grouped.mapValues(_.map(_._1)).toMap
+ } else {
+ inputFiles.groupBy(p => {
+ val nameParts = new Path(p).getParent.getName.split("=")
+ (nameParts(0), nameParts(1))
+ })
+ }
+ }
+
+ /**
+ * Drop column mapping configurations from Map
+ * @param configuration Table configuration
+ * @return Configuration
+ */
+ protected def dropColumnMappingConfigurations(
+ configuration: Map[String, String]): Map[String, String] = {
+ configuration - DeltaConfigs.COLUMN_MAPPING_MODE.key -
DeltaConfigs.COLUMN_MAPPING_MAX_ID.key
+ }
+
+ /**
+ * Drop column mapping configurations from Dataset (e.g. sql("SHOW
TBLPROPERTIES t1")
+ * @param configs Table configuration
+ * @return Configuration Dataset
+ */
+ protected def dropColumnMappingConfigurations(
+ configs: Dataset[(String, String)]): Dataset[(String, String)] = {
+ spark.createDataset(configs.collect().filter(p =>
+ !Seq(
+ DeltaConfigs.COLUMN_MAPPING_MAX_ID.key,
+ DeltaConfigs.COLUMN_MAPPING_MODE.key
+ ).contains(p._1)
+ ))
+ }
+
+ /** Return KV pairs of Protocol-related stuff for checking the result of
DESCRIBE TABLE. */
+ protected def buildProtocolProps(snapshot: Snapshot): Seq[(String, String)]
= {
+ val mergedConf =
+ DeltaConfigs.mergeGlobalConfigs(spark.sessionState.conf,
snapshot.metadata.configuration)
+ val metadata = snapshot.metadata.copy(configuration = mergedConf)
+ var props = Seq(
+ (Protocol.MIN_READER_VERSION_PROP,
+ Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString),
+ (Protocol.MIN_WRITER_VERSION_PROP,
+ Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString))
+ if (snapshot.protocol.supportsReaderFeatures ||
snapshot.protocol.supportsWriterFeatures) {
+ props ++=
+ Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures(
+ spark, metadata, snapshot.protocol)
+ ._3
+ .map(f => (
+ s"${TableFeatureProtocolUtils.FEATURE_PROP_PREFIX}${f.name}",
+ TableFeatureProtocolUtils.FEATURE_PROP_SUPPORTED))
+ }
+ props
+ }
+
+ /**
+ * Convert (nested) column name string into physical name with reference
from DeltaLog
+ * If target field does not have physical name, display name is returned
+ * @param col Logical column name
+ * @param deltaLog Reference DeltaLog
+ * @return Physical column name
+ */
+ protected def getPhysicalName(col: String, deltaLog: DeltaLog): String = {
+ val nameParts = UnresolvedAttribute.parseAttributeName(col)
+ val realSchema = deltaLog.update().schema
+ getPhysicalName(nameParts, realSchema)
+ }
+
+ protected def getPhysicalName(col: String, schema: StructType): String = {
+ val nameParts = UnresolvedAttribute.parseAttributeName(col)
+ getPhysicalName(nameParts, schema)
+ }
+
+ protected def getPhysicalName(nameParts: Seq[String], schema: StructType):
String = {
+ SchemaUtils.findNestedFieldIgnoreCase(schema, nameParts,
includeCollections = true)
+ .map(DeltaColumnMapping.getPhysicalName)
+ .get
+ }
+
+ protected def withColumnMappingConf(mode: String)(f: => Any): Any = {
+ withSQLConf(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey ->
mode) {
+ f
+ }
+ }
+
+ protected def withMaxColumnIdConf(maxId: String)(f: => Any): Any = {
+ withSQLConf(DeltaConfigs.COLUMN_MAPPING_MAX_ID.defaultTablePropertyKey ->
maxId) {
+ f
+ }
+ }
+
+ /**
+ * Gets the physical names of a path. This is used for converting column
paths in stats schema,
+ * so it's ok to not support MapType and ArrayType.
+ */
+ def getPhysicalPathForStats(path: Seq[String], schema: StructType):
Option[Seq[String]] = {
+ if (path.isEmpty) return Some(Seq.empty)
+ val field = schema.fields.find(_.name.equalsIgnoreCase(path.head))
+ field match {
+ case Some(f @ StructField(_, _: AtomicType, _, _ )) =>
+ if (path.size == 1) Some(Seq(DeltaColumnMapping.getPhysicalName(f)))
else None
+ case Some(f @ StructField(_, st: StructType, _, _)) =>
+ val tail = getPhysicalPathForStats(path.tail, st)
+ tail.map(DeltaColumnMapping.getPhysicalName(f) +: _)
+ case _ =>
+ None
+ }
+ }
+
+ /**
+ * Convert (nested) column name string into physical name.
+ * Ignore parts of special paths starting with:
+ * 1. stats columns: minValues, maxValues, numRecords
+ * 2. stats df: stats_parsed
+ * 3. partition values: partitionValues_parsed, partitionValues
+ * @param col Logical column name (e.g. a.b.c)
+ * @param schema Reference schema with metadata
+ * @return Unresolved attribute with physical name paths
+ */
+ protected def convertColumnNameToAttributeWithPhysicalName(
+ col: String,
+ schema: StructType): UnresolvedAttribute = {
+ val parts = UnresolvedAttribute.parseAttributeName(col)
+ val shouldIgnoreFirstPart = Set(
+ "minValues",
+ "maxValues",
+ "numRecords",
+ Checkpoints.STRUCT_PARTITIONS_COL_NAME,
+ "partitionValues")
+ val shouldIgnoreSecondPart = Set(Checkpoints.STRUCT_STATS_COL_NAME,
"stats")
+ val physical = if (shouldIgnoreFirstPart.contains(parts.head)) {
+ parts.head +: getPhysicalPathForStats(parts.tail,
schema).getOrElse(parts.tail)
+ } else if (shouldIgnoreSecondPart.contains(parts.head)) {
+ parts.take(2) ++ getPhysicalPathForStats(parts.slice(2, parts.length),
schema)
+ .getOrElse(parts.slice(2, parts.length))
+ } else {
+ getPhysicalPathForStats(parts, schema).getOrElse(parts)
+ }
+ UnresolvedAttribute(physical)
+ }
+
+ /**
+ * Convert a list of (nested) stats columns into physical name with
reference from DeltaLog
+ * @param columns Logical columns
+ * @param deltaLog Reference DeltaLog
+ * @return Physical columns
+ */
+ protected def convertToPhysicalColumns(
+ columns: Seq[Column],
+ deltaLog: DeltaLog): Seq[Column] = {
+ val schema = deltaLog.update().schema
+ columns.map { col =>
+ val newExpr = col.expr.transform {
+ case a: Attribute =>
+ convertColumnNameToAttributeWithPhysicalName(a.name, schema)
+ }
+ Column(newExpr)
+ }
+ }
+
+ /**
+ * Standard CONVERT TO DELTA
+ * @param tableOrPath String
+ */
+ protected def convertToDelta(tableOrPath: String): Unit = {
+ sql(s"CONVERT TO DELTA $tableOrPath")
+ }
+
+ /**
+ * Force enable streaming read (with possible data loss) on column mapping
enabled table with
+ * drop / rename schema changes.
+ */
+ protected def withStreamingReadOnColumnMappingTableEnabled(f: => Unit): Unit
= {
+ if (columnMappingEnabled) {
+ withSQLConf(DeltaSQLConf
+
.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES.key
-> "true") {
+ f
+ }
+ } else {
+ f
+ }
+ }
+
+}
+
+trait DeltaColumnMappingTestUtils extends DeltaColumnMappingTestUtilsBase
+
+/**
+ * Include this trait to enable Id column mapping mode for a suite
+ */
+trait DeltaColumnMappingEnableIdMode extends SharedSparkSession
+ with DeltaColumnMappingTestUtils
+ with DeltaColumnMappingSelectedTestMixin {
+
+ protected override def columnMappingMode: String = IdMapping.name
+
+ protected override def sparkConf: SparkConf =
+
super.sparkConf.set(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey,
"id")
+
+ /**
+ * CONVERT TO DELTA blocked in id mode
+ */
+ protected override def convertToDelta(tableOrPath: String): Unit =
+ throw DeltaErrors.convertToDeltaWithColumnMappingNotSupported(
+ DeltaColumnMappingMode(columnMappingModeString)
+ )
+}
+
+/**
+ * Include this trait to enable Name column mapping mode for a suite
+ */
+trait DeltaColumnMappingEnableNameMode extends SharedSparkSession
+ with DeltaColumnMappingTestUtils
+ with DeltaColumnMappingSelectedTestMixin {
+
+ protected override def columnMappingMode: String = NameMapping.name
+
+ protected override def sparkConf: SparkConf =
+
super.sparkConf.set(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey,
columnMappingMode)
+
+ /**
+ * CONVERT TO DELTA can be possible under name mode in tests
+ */
+ protected override def convertToDelta(tableOrPath: String): Unit = {
+ withColumnMappingConf("none") {
+ super.convertToDelta(tableOrPath)
+ }
+
+ val (deltaPath, deltaLog) =
+ if (tableOrPath.contains("parquet") && tableOrPath.contains("`")) {
+ // parquet.`PATH`
+ val plainPath = tableOrPath.split('.').last.drop(1).dropRight(1)
+ (s"delta.`$plainPath`", DeltaLog.forTable(spark, plainPath))
+ } else {
+ (tableOrPath, DeltaLog.forTable(spark, TableIdentifier(tableOrPath)))
+ }
+
+ val tableReaderVersion =
deltaLog.unsafeVolatileSnapshot.protocol.minReaderVersion
+ val tableWriterVersion =
deltaLog.unsafeVolatileSnapshot.protocol.minWriterVersion
+ val requiredReaderVersion = if (tableWriterVersion >=
+ TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION) {
+ // If the writer version of the table supports table features, we need to
+ // bump the reader version to table features to enable column mapping.
+ TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION
+ } else {
+ ColumnMappingTableFeature.minReaderVersion
+ }
+ val readerVersion =
spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION).max(
+ requiredReaderVersion)
+ val writerVersion =
spark.conf.get(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION).max(
+ ColumnMappingTableFeature.minWriterVersion)
+
+ val properties = mutable.ListBuffer(DeltaConfigs.COLUMN_MAPPING_MODE.key
-> "name")
+ if (tableReaderVersion < readerVersion) {
+ properties += DeltaConfigs.MIN_READER_VERSION.key ->
readerVersion.toString
+ }
+ if (tableWriterVersion < writerVersion) {
+ properties += DeltaConfigs.MIN_WRITER_VERSION.key ->
writerVersion.toString
+ }
+ val propertiesStr = properties.map(kv => s"'${kv._1}' =
'${kv._2}'").mkString(", ")
+ sql(s"ALTER TABLE $deltaPath SET TBLPROPERTIES ($propertiesStr)")
+ }
+
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
new file mode 100644
index 0000000000..4e7326c8c1
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala
@@ -0,0 +1,635 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.spark.{SparkContext, SparkFunSuite, SparkThrowable}
+import org.apache.spark.scheduler.{JobFailed, SparkListener,
SparkListenerJobEnd, SparkListenerJobStart}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode}
+import org.apache.spark.sql.delta.DeltaTestUtils.Plans
+import org.apache.spark.sql.delta.actions._
+import org.apache.spark.sql.delta.commands.cdc.CDCReader
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
+import org.apache.spark.sql.delta.util.FileNames
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.aggregate.HashAggregateExec
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.QueryExecutionListener
+import org.apache.spark.util.Utils
+
+import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord}
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import io.delta.tables.{DeltaTable => IODeltaTable}
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.scalatest.BeforeAndAfterEach
+
+import java.io.{BufferedReader, File, InputStreamReader}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Locale
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent
+import scala.reflect.ClassTag
+import scala.util.matching.Regex
+
+// spotless:off
+trait DeltaTestUtilsBase {
+ import DeltaTestUtils.TableIdentifierOrPath
+
+ final val BOOLEAN_DOMAIN: Seq[Boolean] = Seq(true, false)
+
+ class PlanCapturingListener() extends QueryExecutionListener {
+
+ private[this] var capturedPlans = List.empty[Plans]
+
+ def plans: Seq[Plans] = capturedPlans.reverse
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ capturedPlans ::= Plans(
+ qe.analyzed,
+ qe.optimizedPlan,
+ qe.sparkPlan,
+ qe.executedPlan)
+ }
+
+ override def onFailure(
+ funcName: String, qe: QueryExecution, error: Exception): Unit = {}
+ }
+
+ /**
+ * Run a thunk with physical plans for all queries captured and passed into
a provided buffer.
+ */
+ def withLogicalPlansCaptured[T](
+ spark: SparkSession,
+ optimizedPlan: Boolean)(
+ thunk: => Unit): Seq[LogicalPlan] = {
+ val planCapturingListener = new PlanCapturingListener
+
+ spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+ spark.listenerManager.register(planCapturingListener)
+ try {
+ thunk
+ spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+ planCapturingListener.plans.map { plans =>
+ if (optimizedPlan) plans.optimized else plans.analyzed
+ }
+ } finally {
+ spark.listenerManager.unregister(planCapturingListener)
+ }
+ }
+
+ /**
+ * Run a thunk with physical plans for all queries captured and passed into
a provided buffer.
+ */
+ def withPhysicalPlansCaptured[T](
+ spark: SparkSession)(
+ thunk: => Unit): Seq[SparkPlan] = {
+ val planCapturingListener = new PlanCapturingListener
+
+ spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+ spark.listenerManager.register(planCapturingListener)
+ try {
+ thunk
+ spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+ planCapturingListener.plans.map(_.sparkPlan)
+ } finally {
+ spark.listenerManager.unregister(planCapturingListener)
+ }
+ }
+
+ /**
+ * Run a thunk with logical and physical plans for all queries captured and
passed
+ * into a provided buffer.
+ */
+ def withAllPlansCaptured[T](
+ spark: SparkSession)(
+ thunk: => Unit): Seq[Plans] = {
+ val planCapturingListener = new PlanCapturingListener
+
+ spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+ spark.listenerManager.register(planCapturingListener)
+ try {
+ thunk
+ spark.sparkContext.listenerBus.waitUntilEmpty(15000)
+ planCapturingListener.plans
+ } finally {
+ spark.listenerManager.unregister(planCapturingListener)
+ }
+ }
+
+ def countSparkJobs(sc: SparkContext, f: => Unit): Int = {
+ val jobs: concurrent.Map[Int, Long] = new ConcurrentHashMap[Int,
Long]().asScala
+ val listener = new SparkListener {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+ jobs.put(jobStart.jobId, jobStart.stageInfos.map(_.numTasks).sum)
+ }
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit =
jobEnd.jobResult match {
+ case JobFailed(_) => jobs.remove(jobEnd.jobId)
+ case _ => // On success, do nothing.
+ }
+ }
+ sc.addSparkListener(listener)
+ try {
+ sc.listenerBus.waitUntilEmpty(15000)
+ f
+ sc.listenerBus.waitUntilEmpty(15000)
+ } finally {
+ sc.removeSparkListener(listener)
+ }
+ // Spark will always log a job start/end event even when the job does not
launch any task.
+ jobs.values.count(_ > 0)
+ }
+
+ /** Filter `usageRecords` by the `opType` tag or field. */
+ def filterUsageRecords(usageRecords: Seq[UsageRecord], opType: String):
Seq[UsageRecord] = {
+ usageRecords.filter { r =>
+ r.tags.get("opType").contains(opType) ||
r.opType.map(_.typeName).contains(opType)
+ }
+ }
+
+ def collectUsageLogs(opType: String)(f: => Unit):
collection.Seq[UsageRecord] = {
+ Log4jUsageLogger.track(f).filter { r =>
+ r.metric == "tahoeEvent" &&
+ r.tags.get("opType").contains(opType)
+ }
+ }
+
+ /**
+ * Remove protocol and metadata fields from checksum file of json format
+ */
+ def removeProtocolAndMetadataFromChecksumFile(checksumFilePath : Path): Unit
= {
+ // scalastyle:off deltahadoopconfiguration
+ val fs = checksumFilePath.getFileSystem(
+ SparkSession.getActiveSession.map(_.sessionState.newHadoopConf()).get
+ )
+ // scalastyle:on deltahadoopconfiguration
+ if (!fs.exists(checksumFilePath)) return
+ val stream = fs.open(checksumFilePath)
+ val reader = new BufferedReader(new InputStreamReader(stream, UTF_8))
+ val content = reader.readLine()
+ stream.close()
+ val mapper = new ObjectMapper()
+ mapper.registerModule(DefaultScalaModule)
+ val map = mapper.readValue(content, classOf[Map[String, String]])
+ val partialContent =
mapper.writeValueAsString(map.-("protocol").-("metadata")) + "\n"
+ val output = fs.create(checksumFilePath, true)
+ output.write(partialContent.getBytes(UTF_8))
+ output.close()
+ }
+
+ protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
+ // The expected plan for touched file computation is of the format below.
+ // The data column should be pruned from both leaves.
+ // HashAggregate(output=[count#3463L])
+ // +- HashAggregate(output=[count#3466L])
+ // +- Project
+ // +- Filter (isnotnull(count#3454L) AND (count#3454L > 1))
+ // +- HashAggregate(output=[count#3454L])
+ // +- HashAggregate(output=[_row_id_#3418L, sum#3468L])
+ // +- Project [_row_id_#3418L, UDF(_file_name_#3422) AS
one#3448]
+ // +- BroadcastHashJoin [id#3342L], [id#3412L], Inner,
BuildLeft
+ // :- Project [id#3342L]
+ // : +- Filter isnotnull(id#3342L)
+ // : +- FileScan parquet [id#3342L,part#3343L]
+ // +- Filter isnotnull(id#3412L)
+ // +- Project [...]
+ // +- Project [...]
+ // +- FileScan parquet [id#3412L,part#3413L]
+ // Note: It can be RDDScanExec instead of FileScan if the source was
materialized.
+ // We pick the first plan starting from FileScan and ending in
HashAggregate as a
+ // stable heuristic for the one we want.
+ plans.map(_.executedPlan)
+ .filter {
+ case WholeStageCodegenExec(hash: HashAggregateExec) =>
+ hash.collectLeaves().size == 2 &&
+ hash.collectLeaves()
+ .forall { s =>
+ s.isInstanceOf[FileSourceScanExec] ||
+ s.isInstanceOf[RDDScanExec]
+ }
+ case _ => false
+ }.head
+ }
+
+ /**
+ * Separate name- from path-based SQL table identifiers.
+ */
+ def getTableIdentifierOrPath(sqlIdentifier: String): TableIdentifierOrPath =
{
+ // Match: delta.`path`[[ as] alias] or tahoe.`path`[[ as] alias]
+ val pathMatcher: Regex = raw"(?:delta|tahoe)\.`([^`]+)`(?:(?: as)?
(.+))?".r
+ // Match: db.table[[ as] alias]
+ val qualifiedDbMatcher: Regex = raw"`?([^\.` ]+)`?\.`?([^\.` ]+)`?(?:(?:
as)? (.+))?".r
+ // Match: table[[ as] alias]
+ val unqualifiedNameMatcher: Regex = raw"([^ ]+)(?:(?: as)? (.+))?".r
+ sqlIdentifier match {
+ case pathMatcher(path, alias) =>
+ TableIdentifierOrPath.Path(path, Option(alias))
+ case qualifiedDbMatcher(dbName, tableName, alias) =>
+ TableIdentifierOrPath.Identifier(TableIdentifier(tableName,
Some(dbName)), Option(alias))
+ case unqualifiedNameMatcher(tableName, alias) =>
+ TableIdentifierOrPath.Identifier(TableIdentifier(tableName),
Option(alias))
+ }
+ }
+
+ /**
+ * Produce a DeltaTable instance given a `TableIdentifierOrPath` instance.
+ */
+ def getDeltaTableForIdentifierOrPath(
+ spark: SparkSession,
+ identifierOrPath: TableIdentifierOrPath): IODeltaTable = {
+ identifierOrPath match {
+ case TableIdentifierOrPath.Identifier(id, optionalAlias) =>
+ val table = IODeltaTable.forName(spark, id.unquotedString)
+ optionalAlias.map(table.as(_)).getOrElse(table)
+ case TableIdentifierOrPath.Path(path, optionalAlias) =>
+ val table = IODeltaTable.forPath(spark, path)
+ optionalAlias.map(table.as(_)).getOrElse(table)
+ }
+ }
+
+ @deprecated("Use checkError() instead")
+ protected def errorContains(errMsg: String, str: String): Unit = {
+
assert(errMsg.toLowerCase(Locale.ROOT).contains(str.toLowerCase(Locale.ROOT)))
+ }
+
+ /**
+ * Helper types to define the expected result of a test case.
+ * Either:
+ * - Success: include an expected value to check, e.g. expected schema or
result as a DF or rows.
+ * - Failure: an exception is thrown and the caller passes a function to
check that it matches an
+ * expected error, typ. `checkError()` or `checkErrorMatchPVals()`.
+ */
+ sealed trait ExpectedResult[-T]
+ object ExpectedResult {
+ case class Success[T](expected: T) extends ExpectedResult[T]
+ case class Failure[T](checkError: SparkThrowable => Unit) extends
ExpectedResult[T]
+ }
+
+ /** Utility method to check exception `e` is of type `E` or a cause of it is
of type `E` */
+ def findIfResponsible[E <: Throwable: ClassTag](e: Throwable): Option[E] = e
match {
+ case culprit: E => Some(culprit)
+ case _ =>
+ val children = Option(e.getCause).iterator ++ e.getSuppressed.iterator
+ children
+ .map(findIfResponsible[E](_))
+ .collectFirst { case Some(culprit) => culprit }
+ }
+
+ def verifyBackfilled(file: FileStatus): Unit = {
+ val unbackfilled =
file.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString)
+ assert(!unbackfilled, s"File $file was not backfilled")
+ }
+
+ def verifyUnbackfilled(file: FileStatus): Unit = {
+ val unbackfilled =
file.getPath.getName.matches(FileNames.uuidDeltaFileRegex.toString)
+ assert(unbackfilled, s"File $file was backfilled")
+ }
+}
+
+trait DeltaCheckpointTestUtils
+ extends DeltaTestUtilsBase { self: SparkFunSuite with SharedSparkSession =>
+
+ def testDifferentCheckpoints(testName: String, quiet: Boolean = false)
+ (f: (CheckpointPolicy.Policy, Option[V2Checkpoint.Format]) => Unit):
Unit = {
+ test(s"$testName [Checkpoint V1]") {
+ def testFunc(): Unit = {
+ withSQLConf(DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey ->
+ CheckpointPolicy.Classic.name) {
+ f(CheckpointPolicy.Classic, None)
+ }
+ }
+ if (quiet) quietly { testFunc() } else testFunc()
+ }
+ for (checkpointFormat <- V2Checkpoint.Format.ALL)
+ test(s"$testName [Checkpoint V2, format: ${checkpointFormat.name}]") {
+ def testFunc(): Unit = {
+ withSQLConf(
+ DeltaConfigs.CHECKPOINT_POLICY.defaultTablePropertyKey ->
CheckpointPolicy.V2.name,
+ DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key ->
checkpointFormat.name
+ ) {
+ f(CheckpointPolicy.V2, Some(checkpointFormat))
+ }
+ }
+ if (quiet) quietly { testFunc() } else testFunc()
+ }
+ }
+
+ /**
+ * Helper method to get the dataframe corresponding to the files which has
the file actions for a
+ * given checkpoint.
+ */
+ def getCheckpointDfForFilesContainingFileActions(
+ log: DeltaLog,
+ checkpointFile: Path): DataFrame = {
+ val ci = CheckpointInstance.apply(checkpointFile)
+ val allCheckpointFiles = log
+ .listFrom(ci.version)
+ .filter(FileNames.isCheckpointFile)
+ .filter(f => CheckpointInstance(f.getPath) == ci)
+ .toSeq
+ val fileActionsFileIndex = ci.format match {
+ case CheckpointInstance.Format.V2 =>
+ val incompleteCheckpointProvider = ci.getCheckpointProvider(log,
allCheckpointFiles)
+ val df =
log.loadIndex(incompleteCheckpointProvider.topLevelFileIndex.get,
Action.logSchema)
+ val sidecarFileStatuses =
df.as[SingleAction].collect().map(_.unwrap).collect {
+ case sf: SidecarFile => sf
+ }.map(sf => sf.toFileStatus(log.logPath))
+ DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET,
sidecarFileStatuses)
+ case CheckpointInstance.Format.SINGLE |
CheckpointInstance.Format.WITH_PARTS =>
+ DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET,
+ allCheckpointFiles.toArray)
+ case _ =>
+ throw new Exception(s"Unexpected checkpoint format for file
$checkpointFile")
+ }
+ fileActionsFileIndex.files
+ .map(fileStatus => spark.read.parquet(fileStatus.getPath.toString))
+ .reduce(_.union(_))
+ }
+}
+
+object DeltaTestUtils extends DeltaTestUtilsBase {
+
+ sealed trait TableIdentifierOrPath
+ object TableIdentifierOrPath {
+ case class Identifier(id: TableIdentifier, alias: Option[String])
+ extends TableIdentifierOrPath
+ case class Path(path: String, alias: Option[String]) extends
TableIdentifierOrPath
+ }
+
+ case class Plans(
+ analyzed: LogicalPlan,
+ optimized: LogicalPlan,
+ sparkPlan: SparkPlan,
+ executedPlan: SparkPlan)
+
+ /**
+ * Creates an AddFile that can be used for tests where the exact parameters
do not matter.
+ */
+ def createTestAddFile(
+ encodedPath: String = "foo",
+ partitionValues: Map[String, String] = Map.empty,
+ size: Long = 1L,
+ modificationTime: Long = 1L,
+ dataChange: Boolean = true,
+ stats: String = "{\"numRecords\": 1}"): AddFile = {
+ AddFile(encodedPath, partitionValues, size, modificationTime, dataChange,
stats)
+ }
+
+ /**
+ * Extracts the table name and alias (if any) from the given string.
Correctly handles whitespaces
+ * in table name but doesn't support whitespaces in alias.
+ */
+ def parseTableAndAlias(table: String): (String, Option[String]) = {
+ // Matches 'delta.`path` AS alias' (case insensitive).
+ val deltaPathWithAsAlias = raw"(?i)(delta\.`.+`)(?: AS) (\S+)".r
+ // Matches 'delta.`path` alias'.
+ val deltaPathWithAlias = raw"(delta\.`.+`) (\S+)".r
+ // Matches 'delta.`path`'.
+ val deltaPath = raw"(delta\.`.+`)".r
+ // Matches 'tableName AS alias' (case insensitive).
+ val tableNameWithAsAlias = raw"(?i)(.+)(?: AS) (\S+)".r
+ // Matches 'tableName alias'.
+ val tableNameWithAlias = raw"(.+) (.+)".r
+
+ table match {
+ case deltaPathWithAsAlias(tableName, alias) => tableName -> Some(alias)
+ case deltaPathWithAlias(tableName, alias) => tableName -> Some(alias)
+ case deltaPath(tableName) => tableName -> None
+ case tableNameWithAsAlias(tableName, alias) => tableName -> Some(alias)
+ case tableNameWithAlias(tableName, alias) => tableName -> Some(alias)
+ case tableName => tableName -> None
+ }
+ }
+
+ /**
+ * Implements an ordering where `x < y` iff both reader and writer versions
of
+ * `x` are strictly less than those of `y`.
+ *
+ * Can be used to conveniently check that this relationship holds in
tests/assertions
+ * without having to write out the conjunction of the two subconditions
every time.
+ */
+ case object StrictProtocolOrdering extends PartialOrdering[Protocol] {
+ override def tryCompare(x: Protocol, y: Protocol): Option[Int] = {
+ if (x.minReaderVersion == y.minReaderVersion &&
+ x.minWriterVersion == y.minWriterVersion) {
+ Some(0)
+ } else if (x.minReaderVersion < y.minReaderVersion &&
+ x.minWriterVersion < y.minWriterVersion) {
+ Some(-1)
+ } else if (x.minReaderVersion > y.minReaderVersion &&
+ x.minWriterVersion > y.minWriterVersion) {
+ Some(1)
+ } else {
+ None
+ }
+ }
+
+ override def lteq(x: Protocol, y: Protocol): Boolean =
+ x.minReaderVersion <= y.minReaderVersion && x.minWriterVersion <=
y.minWriterVersion
+
+ // Just a more readable version of `lteq`.
+ def fulfillsVersionRequirements(actual: Protocol, requirement: Protocol):
Boolean =
+ lteq(requirement, actual)
+ }
+}
+
+trait DeltaTestUtilsForTempViews
+ extends SharedSparkSession
+ with DeltaTestUtilsBase {
+
+ def testWithTempView(testName: String)(testFun: Boolean => Any): Unit = {
+ Seq(true, false).foreach { isSQLTempView =>
+ val tempViewUsed = if (isSQLTempView) "SQL TempView" else "Dataset
TempView"
+ test(s"$testName - $tempViewUsed") {
+ withTempView("v") {
+ testFun(isSQLTempView)
+ }
+ }
+ }
+ }
+
+ def testQuietlyWithTempView(testName: String)(testFun: Boolean => Any): Unit
= {
+ Seq(true, false).foreach { isSQLTempView =>
+ val tempViewUsed = if (isSQLTempView) "SQL TempView" else "Dataset
TempView"
+ testQuietly(s"$testName - $tempViewUsed") {
+ withTempView("v") {
+ testFun(isSQLTempView)
+ }
+ }
+ }
+ }
+
+ def createTempViewFromTable(
+ tableName: String,
+ isSQLTempView: Boolean,
+ format: Option[String] = None): Unit = {
+ if (isSQLTempView) {
+ sql(s"CREATE OR REPLACE TEMP VIEW v AS SELECT * from $tableName")
+ } else {
+
spark.read.format(format.getOrElse("delta")).table(tableName).createOrReplaceTempView("v")
+ }
+ }
+
+ def createTempViewFromSelect(text: String, isSQLTempView: Boolean): Unit = {
+ if (isSQLTempView) {
+ sql(s"CREATE OR REPLACE TEMP VIEW v AS $text")
+ } else {
+ sql(text).createOrReplaceTempView("v")
+ }
+ }
+
+ def testErrorMessageAndClass(
+ isSQLTempView: Boolean,
+ ex: AnalysisException,
+ expectedErrorMsgForSQLTempView: String = null,
+ expectedErrorMsgForDataSetTempView: String = null,
+ expectedErrorClassForSQLTempView: String = null,
+ expectedErrorClassForDataSetTempView: String = null): Unit = {
+ if (isSQLTempView) {
+ if (expectedErrorMsgForSQLTempView != null) {
+ errorContains(ex.getMessage, expectedErrorMsgForSQLTempView)
+ }
+ if (expectedErrorClassForSQLTempView != null) {
+ assert(ex.getErrorClass == expectedErrorClassForSQLTempView)
+ }
+ } else {
+ if (expectedErrorMsgForDataSetTempView != null) {
+ errorContains(ex.getMessage, expectedErrorMsgForDataSetTempView)
+ }
+ if (expectedErrorClassForDataSetTempView != null) {
+ assert(ex.getErrorClass == expectedErrorClassForDataSetTempView,
ex.getMessage)
+ }
+ }
+ }
+}
+
+/**
+ * Trait collecting helper methods for DML tests e.p. creating a test table
for each test and
+ * cleaning it up after each test.
+ */
+trait DeltaDMLTestUtils
+ extends DeltaSQLTestUtils
+ with DeltaTestUtilsBase
+ with BeforeAndAfterEach {
+ self: SharedSparkSession =>
+
+ import testImplicits._
+
+ protected var tempDir: File = _
+
+ protected var deltaLog: DeltaLog = _
+
+ protected def tempPath: String = tempDir.getCanonicalPath
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ // Using a space in path to provide coverage for special characters.
+ tempDir = Utils.createTempDir(namePrefix = "spark test")
+ deltaLog = DeltaLog.forTable(spark, new Path(tempPath))
+ }
+
+ override protected def afterEach(): Unit = {
+ try {
+ Utils.deleteRecursively(tempDir)
+ DeltaLog.clearCache()
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ protected def append(df: DataFrame, partitionBy: Seq[String] = Nil): Unit = {
+ val dfw = df.write.format("delta").mode("append")
+ if (partitionBy.nonEmpty) {
+ dfw.partitionBy(partitionBy: _*)
+ }
+ dfw.save(tempPath)
+ }
+
+ protected def withKeyValueData(
+ source: Seq[(Int, Int)],
+ target: Seq[(Int, Int)],
+ isKeyPartitioned: Boolean = false,
+ sourceKeyValueNames: (String, String) = ("key", "value"),
+ targetKeyValueNames: (String, String) = ("key", "value"))(
+ thunk: (String, String) => Unit = null): Unit = {
+
+ import testImplicits._
+
+ append(target.toDF(targetKeyValueNames._1,
targetKeyValueNames._2).coalesce(2),
+ if (isKeyPartitioned) Seq(targetKeyValueNames._1) else Nil)
+ withTempView("source") {
+ source.toDF(sourceKeyValueNames._1,
sourceKeyValueNames._2).createOrReplaceTempView("source")
+ thunk("source", s"delta.`$tempPath`")
+ }
+ }
+
+ /**
+ * Parse the input JSON data into a dataframe, one row per input element.
+ * Throws an exception on malformed inputs or records that don't comply with
the provided schema.
+ */
+ protected def readFromJSON(data: Seq[String], schema: StructType = null):
DataFrame = {
+ if (schema != null) {
+ spark.read
+ .schema(schema)
+ .option("mode", FailFastMode.name)
+ .json(data.toDS)
+ } else {
+ spark.read
+ .option("mode", FailFastMode.name)
+ .json(data.toDS)
+ }
+ }
+
+ protected def readDeltaTable(path: String): DataFrame = {
+ spark.read.format("delta").load(path)
+ }
+
+ protected def getDeltaFileStmt(path: String): String = s"SELECT * FROM
delta.`$path`"
+
+ /**
+ * Finds the latest operation of the given type that ran on the test table
and returns the
+ * dataframe with the changes of the corresponding table version.
+ *
+ * @param operation Delta operation name, see [[DeltaOperations]].
+ */
+ protected def getCDCForLatestOperation(deltaLog: DeltaLog, operation:
String): DataFrame = {
+ val latestOperation = deltaLog.history
+ .getHistory(None)
+ .find(_.operation == operation)
+ assert(latestOperation.nonEmpty, s"Couldn't find a ${operation} operation
to check CDF")
+
+ val latestOperationVersion = latestOperation.get.version
+ assert(latestOperationVersion.nonEmpty,
+ s"Latest ${operation} operation doesn't have a version associated with
it")
+
+ CDCReader
+ .changesToBatchDF(
+ deltaLog,
+ latestOperationVersion.get,
+ latestOperationVersion.get,
+ spark)
+ .drop(CDCReader.CDC_COMMIT_TIMESTAMP)
+ .drop(CDCReader.CDC_COMMIT_VERSION)
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala
new file mode 100644
index 0000000000..135dd97bfa
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaColumnMappingSelectedTestMixin.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.delta.DeltaColumnMappingTestUtils
+import org.apache.spark.sql.delta.DeltaConfigs
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+import org.scalatest.exceptions.TestFailedException
+
+import scala.collection.mutable
+
+// spotless:off
+/**
+ * A trait for selective enabling certain tests to run for column mapping modes
+ */
+trait DeltaColumnMappingSelectedTestMixin extends SparkFunSuite
+ with DeltaSQLTestUtils with DeltaColumnMappingTestUtils {
+
+ protected def runOnlyTests: Seq[String] = Seq()
+
+ /**
+ * If true, will run all tests.
+ * Requires that `runOnlyTests` is empty.
+ */
+ protected def runAllTests: Boolean = false
+
+ private val testsRun: mutable.Set[String] = mutable.Set.empty
+
+ override protected def test(
+ testName: String,
+ testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = {
+ require(!runAllTests || runOnlyTests.isEmpty,
+ "If `runAllTests` is true then `runOnlyTests` must be empty")
+
+ if (runAllTests || runOnlyTests.contains(testName)) {
+ super.test(s"$testName - column mapping $columnMappingMode mode",
testTags: _*) {
+ testsRun.add(testName)
+ withSQLConf(
+ DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey ->
columnMappingMode) {
+ testFun
+ }
+ }
+ } else {
+ super.ignore(s"$testName - ignored by
DeltaColumnMappingSelectedTestMixin")(testFun)
+ }
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ val missingTests = runOnlyTests.toSet diff testsRun
+ if (missingTests.nonEmpty) {
+ throw new TestFailedException(
+ Some("Not all selected column mapping tests were run. Missing: " +
+ missingTests.mkString(", ")), None, 0)
+ }
+ }
+
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala
new file mode 100644
index 0000000000..b166697284
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaExcludedTestMixin.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.sql.QueryTest
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+// spotless:off
+trait DeltaExcludedTestMixin extends QueryTest {
+
+ /** Tests to be ignored by the runner. */
+ override def excluded: Seq[String] = Seq.empty
+
+ protected override def test(testName: String, testTags: Tag*)
+ (testFun: => Any)
+ (implicit pos: Position): Unit = {
+ if (excluded.contains(testName)) {
+ super.ignore(testName, testTags: _*)(testFun)
+ } else {
+ super.test(testName, testTags: _*)(testFun)
+ }
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
new file mode 100644
index 0000000000..3d94d2bde3
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLCommandTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.delta.catalog.DeltaCatalog
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+import org.apache.spark.sql.test.SharedSparkSession
+
+import io.delta.sql.DeltaSparkSessionExtension
+
+// spotless:off
+/**
+ * A trait for tests that are testing a fully set up SparkSession with all of
Delta's requirements,
+ * such as the configuration of the DeltaCatalog and the addition of all Delta
extensions.
+ */
+trait DeltaSQLCommandTest extends SharedSparkSession {
+
+ override protected def sparkConf: SparkConf = {
+ val conf = super.sparkConf
+
+ // Delta.
+ conf.set(StaticSQLConf.SPARK_SESSION_EXTENSIONS.key,
+ classOf[DeltaSparkSessionExtension].getName)
+ .set(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION.key,
+ classOf[DeltaCatalog].getName)
+
+ // Gluten.
+ conf.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.default.parallelism", "1")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.sql.shuffle.partitions", "1")
+ .set("spark.memory.offHeap.size", "2g")
+ .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala
new file mode 100644
index 0000000000..22f4e9fa11
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaSQLTestUtils.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+import java.io.File
+
+// spotless:off
+trait DeltaSQLTestUtils extends SQLTestUtils {
+ /**
+ * Override the temp dir/path creation methods from [[SQLTestUtils]] to:
+ * 1. Drop the call to `waitForTasksToFinish` which is a source of flakiness
due to timeouts
+ * without clear benefits.
+ * 2. Allow creating paths with special characters for better test coverage.
+ */
+
+ protected val defaultTempDirPrefix: String = "spark%dir%prefix"
+
+ override protected def withTempDir(f: File => Unit): Unit = {
+ withTempDir(prefix = defaultTempDirPrefix)(f)
+ }
+
+ override protected def withTempPaths(numPaths: Int)(f: Seq[File] => Unit):
Unit = {
+ withTempPaths(numPaths, prefix = defaultTempDirPrefix)(f)
+ }
+
+ override def withTempPath(f: File => Unit): Unit = {
+ withTempPath(prefix = defaultTempDirPrefix)(f)
+ }
+
+ /**
+ * Creates a temporary directory, which is then passed to `f` and will be
deleted after `f`
+ * returns.
+ */
+ def withTempDir(prefix: String)(f: File => Unit): Unit = {
+ val path = Utils.createTempDir(namePrefix = prefix)
+ try f(path) finally Utils.deleteRecursively(path)
+ }
+
+ /**
+ * Generates a temporary directory path without creating the actual
directory, which is then
+ * passed to `f` and will be deleted after `f` returns.
+ */
+ def withTempPath(prefix: String)(f: File => Unit): Unit = {
+ val path = Utils.createTempDir(namePrefix = prefix)
+ path.delete()
+ try f(path) finally Utils.deleteRecursively(path)
+ }
+
+ /**
+ * Generates the specified number of temporary directory paths without
creating the actual
+ * directories, which are then passed to `f` and will be deleted after `f`
returns.
+ */
+ protected def withTempPaths(numPaths: Int, prefix: String)(f: Seq[File] =>
Unit): Unit = {
+ val files =
+ Seq.fill[File](numPaths)(Utils.createTempDir(namePrefix =
prefix).getCanonicalFile)
+ files.foreach(_.delete())
+ try f(files) finally {
+ files.foreach(Utils.deleteRecursively)
+ }
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
new file mode 100644
index 0000000000..f2e7acc695
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.test
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.delta.{DeltaLog, OptimisticTransaction, Snapshot}
+import org.apache.spark.sql.delta.DeltaOperations.{ManualUpdate, Operation,
Write}
+import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, Protocol}
+import org.apache.spark.sql.delta.catalog.DeltaTableV2
+import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
+import
org.apache.spark.sql.delta.coordinatedcommits.TableCommitCoordinatorClient
+import org.apache.spark.sql.delta.hooks.AutoCompact
+import org.apache.spark.sql.delta.stats.StatisticsCollection
+import org.apache.spark.util.Clock
+
+import io.delta.storage.commit.{CommitResponse, GetCommitsResponse,
UpdatedActions}
+import org.apache.hadoop.fs.Path
+
+import java.io.File
+
+// spotless:off
+/**
+ * Additional method definitions for Delta classes that are intended for use
only in testing.
+ */
+object DeltaTestImplicits {
+ implicit class OptimisticTxnTestHelper(txn: OptimisticTransaction) {
+
+ /** Ensure that the initial commit of a Delta table always contains a
Metadata action */
+ def commitActions(op: Operation, actions: Action*): Long = {
+ if (txn.readVersion == -1) {
+ val metadataOpt = actions.collectFirst { case m: Metadata => m }
+ val protocolOpt = actions.collectFirst { case p: Protocol => p }
+ val otherActions =
+ actions.filterNot(a => a.isInstanceOf[Metadata] ||
a.isInstanceOf[Protocol])
+ (metadataOpt, protocolOpt) match {
+ case (Some(metadata), Some(protocol)) =>
+ // When both metadata and protocol are explicitly passed, use them.
+ txn.updateProtocol(protocol)
+ // This will auto upgrade any required table features in the
passed protocol as per
+ // given metadata.
+ txn.updateMetadataForNewTable(metadata)
+ case (Some(metadata), None) =>
+ // When just metadata is passed, use it.
+ // This will auto generate protocol as per metadata.
+ txn.updateMetadataForNewTable(metadata)
+ case (None, Some(protocol)) =>
+ txn.updateProtocol(protocol)
+ txn.updateMetadataForNewTable(Metadata())
+ case (None, None) =>
+ // If neither metadata nor protocol is explicitly passed, then use
default Metadata and
+ // with the maximum protocol.
+ txn.updateMetadataForNewTable(Metadata())
+ txn.updateProtocol(Action.supportedProtocolVersion())
+ }
+ txn.commit(otherActions, op)
+ } else {
+ txn.commit(actions, op)
+ }
+ }
+
+ def commitManually(actions: Action*): Long = {
+ commitActions(ManualUpdate, actions: _*)
+ }
+
+ def commitWriteAppend(actions: Action*): Long = {
+ commitActions(Write(SaveMode.Append), actions: _*)
+ }
+ }
+
+ /** Add test-only File overloads for DeltaTable.forPath */
+ implicit class DeltaLogObjectTestHelper(deltaLog: DeltaLog.type) {
+ def forTable(spark: SparkSession, dataPath: File): DeltaLog = {
+ DeltaLog.forTable(spark, new Path(dataPath.getCanonicalPath))
+ }
+
+ def forTable(spark: SparkSession, dataPath: File, clock: Clock): DeltaLog
= {
+ DeltaLog.forTable(spark, new Path(dataPath.getCanonicalPath), clock)
+ }
+ }
+
+ /** Helper class for working with [[TableCommitCoordinatorClient]] */
+ implicit class TableCommitCoordinatorClientTestHelper(
+ tableCommitCoordinatorClient: TableCommitCoordinatorClient) {
+
+ def commit(
+ commitVersion: Long,
+ actions: Iterator[String],
+ updatedActions: UpdatedActions): CommitResponse = {
+ tableCommitCoordinatorClient.commit(
+ commitVersion, actions, updatedActions, tableIdentifierOpt = None)
+ }
+
+ def getCommits(
+ startVersion: Option[Long] = None,
+ endVersion: Option[Long] = None): GetCommitsResponse = {
+ tableCommitCoordinatorClient.getCommits(tableIdentifierOpt = None,
startVersion, endVersion)
+ }
+
+ def backfillToVersion(
+ version: Long,
+ lastKnownBackfilledVersion: Option[Long] = None): Unit = {
+ tableCommitCoordinatorClient.backfillToVersion(
+ tableIdentifierOpt = None, version, lastKnownBackfilledVersion)
+ }
+ }
+
+
+ /** Helper class for working with [[Snapshot]] */
+ implicit class SnapshotTestHelper(snapshot: Snapshot) {
+ def ensureCommitFilesBackfilled(): Unit = {
+ snapshot.ensureCommitFilesBackfilled(catalogTableOpt = None)
+ }
+ }
+
+ /**
+ * Helper class for working with the most recent snapshot in the deltaLog
+ */
+ implicit class DeltaLogTestHelper(deltaLog: DeltaLog) {
+ def snapshot: Snapshot = {
+ deltaLog.unsafeVolatileSnapshot
+ }
+
+ def checkpoint(): Unit = {
+ deltaLog.checkpoint(snapshot)
+ }
+
+ def checkpointInterval(): Int = {
+ deltaLog.checkpointInterval(snapshot.metadata)
+ }
+
+ def deltaRetentionMillis(): Long = {
+ deltaLog.deltaRetentionMillis(snapshot.metadata)
+ }
+
+ def enableExpiredLogCleanup(): Boolean = {
+ deltaLog.enableExpiredLogCleanup(snapshot.metadata)
+ }
+
+ def upgradeProtocol(newVersion: Protocol): Unit = {
+ upgradeProtocol(deltaLog.unsafeVolatileSnapshot, newVersion)
+ }
+
+ def upgradeProtocol(snapshot: Snapshot, newVersion: Protocol): Unit = {
+ deltaLog.upgradeProtocol(None, snapshot, newVersion)
+ }
+ }
+
+ implicit class DeltaTableV2ObjectTestHelper(dt: DeltaTableV2.type) {
+ /** Convenience overload that omits the cmd arg (which is not helpful in
tests). */
+ def apply(spark: SparkSession, id: TableIdentifier): DeltaTableV2 =
+ dt.apply(spark, id, "test")
+ }
+
+ implicit class DeltaTableV2TestHelper(deltaTable: DeltaTableV2) {
+ /** For backward compatibility with existing unit tests */
+ def snapshot: Snapshot = deltaTable.initialSnapshot
+ }
+
+ implicit class AutoCompactObjectTestHelper(ac: AutoCompact.type) {
+ private[delta] def compact(
+ spark: SparkSession,
+ deltaLog: DeltaLog,
+ partitionPredicates: Seq[Expression] = Nil,
+ opType: String = AutoCompact.OP_TYPE): Seq[OptimizeMetrics] = {
+ AutoCompact.compact(
+ spark, deltaLog, catalogTable = None,
+ partitionPredicates, opType)
+ }
+ }
+
+ implicit class StatisticsCollectionObjectTestHelper(sc:
StatisticsCollection.type) {
+
+ /**
+ * This is an implicit helper required for backward compatibility with
existing
+ * unit tests. It allows to call [[StatisticsCollection.recompute]]
without a
+ * catalog table and in the actual call, sets it to [[None]].
+ */
+ def recompute(
+ spark: SparkSession,
+ deltaLog: DeltaLog,
+ predicates: Seq[Expression] = Seq(Literal(true)),
+ fileFilter: AddFile => Boolean = af => true): Unit = {
+ StatisticsCollection.recompute(
+ spark, deltaLog, catalogTable = None, predicates, fileFilter)
+ }
+ }
+}
+// spotless:on
diff --git
a/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala
b/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala
new file mode 100644
index 0000000000..26c1a69481
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/shims/DeltaExcludedBySparkVersionTestMixinShims.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package shims
+
+import org.apache.spark.sql.QueryTest
+
+// spotless:off
+trait DeltaExcludedBySparkVersionTestMixinShims extends QueryTest {
+ /**
+ * Tests that are meant for Delta compiled against Spark Latest Release
only. Executed since this
+ * is the Spark Latest Release shim.
+ */
+ protected def testSparkLatestOnly(
+ testName: String, testTags: org.scalatest.Tag*)
+ (testFun: => Any)
+ (implicit pos: org.scalactic.source.Position): Unit = {
+ test(testName, testTags: _*)(testFun)(pos)
+ }
+
+ /**
+ * Tests that are meant for Delta compiled against Spark Master Release
only. Ignored since this
+ * is the Spark Latest Release shim.
+ */
+ protected def testSparkMasterOnly(
+ testName: String, testTags: org.scalatest.Tag*)
+ (testFun: => Any)
+ (implicit pos: org.scalactic.source.Position): Unit = {
+ ignore(testName, testTags: _*)(testFun)(pos)
+ }
+}
+// spotless:on
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]