This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 5382556a87 [GLUTEN-11106][VL] Spark 3.5 / Delta 3.3: Add several
suites for delta optimizations (#11430)
5382556a87 is described below
commit 5382556a87e97b90a0187b04921cd8cfc9be845f
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jan 20 10:27:44 2026 +0000
[GLUTEN-11106][VL] Spark 3.5 / Delta 3.3: Add several suites for delta
optimizations (#11430)
---
.../org/apache/spark/sql/delta/DeltaSuite.scala | 42 +-
.../perf/OptimizeMetadataOnlyDeltaQuerySuite.scala | 1121 ++++++++++++++++++++
.../sql/delta/perf/OptimizedWritesSuite.scala | 369 +++++++
3 files changed, 1512 insertions(+), 20 deletions(-)
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
index e41df6180f..7f9e3db230 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
@@ -99,7 +99,7 @@ class DeltaSuite
// Generate two files in two partitions
spark
.range(2)
- .withColumn("part", $"id" % 2)
+ .withColumn("part", Symbol("id") % 2)
.write
.format("delta")
.partitionBy("part")
@@ -227,7 +227,7 @@ class DeltaSuite
val basePath = dir.getAbsolutePath
spark
.range(10)
- .withColumn("part", 'id % 3)
+ .withColumn("part", Symbol("id") % 3)
.write
.format("delta")
.partitionBy("part")
@@ -1287,7 +1287,7 @@ class DeltaSuite
spark
.range(100)
- .select('id, ('id % 4).as("by4"), ('id % 8).as("by8"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id") %
8).as("by8"))
.write
.format("delta")
.partitionBy("by4", "by8")
@@ -1310,7 +1310,7 @@ class DeltaSuite
spark
.range(100)
- .select('id, ('id % 4).as("by4"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
.write
.format("delta")
.partitionBy("by4")
@@ -1341,7 +1341,7 @@ class DeltaSuite
val dfw = spark
.range(100)
- .select('id, ('id % 4).as("by,4"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by,4"))
.write
.format("delta")
.partitionBy("by,4")
@@ -1372,7 +1372,7 @@ class DeltaSuite
spark
.range(100)
- .select('id, ('id % 4).as("by4"), ('id % 8).as("by8"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id") %
8).as("by8"))
.write
.format("delta")
.partitionBy("by4", "by8")
@@ -1381,7 +1381,7 @@ class DeltaSuite
val e = intercept[AnalysisException] {
spark
.range(100)
- .select('id, ('id % 4).as("by4"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
.write
.format("delta")
.partitionBy("by4")
@@ -1401,7 +1401,7 @@ class DeltaSuite
spark
.range(100)
- .select('id, ('id * 3).cast("string").as("value"))
+ .select(Symbol("id"), (Symbol("id") * 3).cast("string").as("value"))
.write
.format("delta")
.save(tempDir.toString)
@@ -1409,7 +1409,7 @@ class DeltaSuite
val e = intercept[AnalysisException] {
spark
.range(100)
- .select('id, ('id * 3).as("value"))
+ .select(Symbol("id"), (Symbol("id") * 3).as("value"))
.write
.format("delta")
.mode("append")
@@ -1431,7 +1431,7 @@ class DeltaSuite
spark
.range(100)
- .select('id, ('id % 4).as("by4"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
.write
.format("delta")
.partitionBy("by4")
@@ -1444,7 +1444,7 @@ class DeltaSuite
spark
.range(101, 200)
- .select('id, ('id % 4).as("by4"), ('id % 8).as("by8"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id") %
8).as("by8"))
.write
.format("delta")
.option(DeltaOptions.MERGE_SCHEMA_OPTION, "true")
@@ -1453,7 +1453,9 @@ class DeltaSuite
checkAnswer(
spark.read.format("delta").load(tempDir.toString),
- spark.range(101, 200).select('id, ('id % 4).as("by4"), ('id %
8).as("by8")))
+ spark
+ .range(101, 200)
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id")
% 8).as("by8")))
}
}
@@ -1466,7 +1468,7 @@ class DeltaSuite
spark
.range(100)
- .select('id, ('id % 4).as("by4"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
.write
.format("delta")
.partitionBy("by4")
@@ -1480,7 +1482,7 @@ class DeltaSuite
val e = intercept[AnalysisException] {
spark
.range(101, 200)
- .select('id, ('id % 4).as("by4"), ('id % 8).as("by8"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id")
% 8).as("by8"))
.write
.format("delta")
.partitionBy("by4", "by8")
@@ -1504,7 +1506,7 @@ class DeltaSuite
val e = intercept[AnalysisException] {
spark
.range(100)
- .select('id, ('id % 4).as("by4"))
+ .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
.write
.format("delta")
.partitionBy("by4", "id")
@@ -1702,14 +1704,14 @@ class DeltaSuite
}
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- testDf('aBc)
+ testDf(Symbol("aBc"))
intercept[AnalysisException] {
- testDf('abc)
+ testDf(Symbol("abc"))
}
}
- testDf('aBc)
- testDf('abc)
+ testDf(Symbol("aBc"))
+ testDf(Symbol("abc"))
}
}
}
@@ -3303,7 +3305,7 @@ class DeltaNameColumnMappingSuite extends DeltaSuite with
DeltaColumnMappingEnab
// create partitioned table
spark
.range(100)
- .withColumn("part", 'id % 10)
+ .withColumn("part", Symbol("id") % 10)
.write
.format("delta")
.partitionBy("part")
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuerySuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuerySuite.scala
new file mode 100644
index 0000000000..d2265c8a15
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuerySuite.scala
@@ -0,0 +1,1121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.perf
+
+import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.delta.{DeletionVectorsTestUtils,
DeltaColumnMappingEnableIdMode, DeltaColumnMappingEnableNameMode, DeltaLog,
DeltaTestUtils}
+import org.apache.spark.sql.delta.catalog.DeltaTableV2
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.stats.PrepareDeltaScanBase
+import org.apache.spark.sql.delta.stats.StatisticsCollection
+import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.delta.test.DeltaTestImplicits._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+
+import io.delta.tables.DeltaTable
+import org.apache.hadoop.fs.Path
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+import scala.collection.mutable
+
+class OptimizeMetadataOnlyDeltaQuerySuite
+ extends QueryTest
+ with SharedSparkSession
+ with BeforeAndAfterAll
+ with DeltaSQLCommandTest
+ with DeletionVectorsTestUtils {
+ val testTableName = "table_basic"
+ val noStatsTableName = " table_nostats"
+ val mixedStatsTableName = " table_mixstats"
+
+ var dfPart1: DataFrame = null
+ var dfPart2: DataFrame = null
+
+ var totalRows: Long = -1
+ var minId: Long = -1
+ var maxId: Long = -1
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ dfPart1 = generateRowsDataFrame(spark.range(1L, 6L))
+ dfPart2 = generateRowsDataFrame(spark.range(6L, 11L))
+
+ withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false") {
+
dfPart1.write.format("delta").mode(SaveMode.Overwrite).saveAsTable(noStatsTableName)
+
dfPart1.write.format("delta").mode(SaveMode.Overwrite).saveAsTable(mixedStatsTableName)
+
+ spark.sql(s"DELETE FROM $noStatsTableName WHERE id = 1")
+ spark.sql(s"DELETE FROM $mixedStatsTableName WHERE id = 1")
+
+
dfPart2.write.format("delta").mode("append").saveAsTable(noStatsTableName)
+ }
+
+ withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> "true") {
+
dfPart1.write.format("delta").mode(SaveMode.Overwrite).saveAsTable(testTableName)
+
+ spark.sql(s"DELETE FROM $testTableName WHERE id = 1")
+
+
dfPart2.write.format("delta").mode(SaveMode.Append).saveAsTable(testTableName)
+
dfPart2.write.format("delta").mode(SaveMode.Append).saveAsTable(mixedStatsTableName)
+
+ // Run updates to generate more Delta Log and trigger a checkpoint
+ // and make sure stats works after checkpoints
+ for (a <- 1 to 10) {
+ spark.sql(s"UPDATE $testTableName SET data='$a' WHERE id = 7")
+ }
+ spark.sql(s"UPDATE $testTableName SET data=NULL WHERE id = 7")
+
+ // Creates an empty (numRecords == 0) AddFile record
+ generateRowsDataFrame(spark.range(11L, 12L)).write
+ .format("delta")
+ .mode("append")
+ .saveAsTable(testTableName)
+ spark.sql(s"DELETE FROM $testTableName WHERE id = 11")
+ }
+
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key ->
"false") {
+ val result = spark.sql(s"SELECT COUNT(*), MIN(id), MAX(id) FROM
$testTableName").head
+ totalRows = result.getLong(0)
+ minId = result.getLong(1)
+ maxId = result.getLong(2)
+ }
+ }
+
+ /** Class to hold test parameters */
+ case class ScalaTestParams(name: String, queryScala: () => DataFrame,
expectedPlan: String)
+
+ Seq(
+ new ScalaTestParams(
+ name = "count - simple query",
+ queryScala = () =>
+ spark.read
+ .format("delta")
+ .table(testTableName)
+ .agg(count(col("*"))),
+ expectedPlan = "LocalRelation [none#0L]"
+ ),
+ new ScalaTestParams(
+ name = "min-max - simple query",
+ queryScala = () =>
+ spark.read
+ .format("delta")
+ .table(testTableName)
+ .agg(
+ min(col("id")),
+ max(col("id")),
+ min(col("TinyIntColumn")),
+ max(col("TinyIntColumn")),
+ min(col("SmallIntColumn")),
+ max(col("SmallIntColumn")),
+ min(col("IntColumn")),
+ max(col("IntColumn")),
+ min(col("BigIntColumn")),
+ max(col("BigIntColumn")),
+ min(col("FloatColumn")),
+ max(col("FloatColumn")),
+ min(col("DoubleColumn")),
+ max(col("DoubleColumn")),
+ min(col("DateColumn")),
+ max(col("DateColumn"))
+ ),
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2, none#3, none#4,
none#5, none#6" +
+ ", none#7, none#8L, none#9L, none#10, none#11, none#12, none#13,
none#14, none#15]"
+ )
+ )
+ .foreach {
+ testParams =>
+ test(s"optimization supported - Scala - ${testParams.name}") {
+ checkResultsAndOptimizedPlan(testParams.queryScala,
testParams.expectedPlan)
+ }
+ }
+
+ /** Class to hold test parameters */
+ case class SqlTestParams(
+ name: String,
+ querySql: String,
+ expectedPlan: String,
+ querySetup: Option[Seq[String]] = None)
+
+ Seq(
+ new SqlTestParams(
+ name = "count - simple query",
+ querySql = s"SELECT COUNT(*) FROM $testTableName",
+ expectedPlan = "LocalRelation [none#0L]"),
+ new SqlTestParams(
+ name = "min-max - simple query",
+ querySql = s"SELECT MIN(id), MAX(id)" +
+ s", MIN(TinyIntColumn), MAX(TinyIntColumn)" +
+ s", MIN(SmallIntColumn), MAX(SmallIntColumn)" +
+ s", MIN(IntColumn), MAX(IntColumn)" +
+ s", MIN(BigIntColumn), MAX(BigIntColumn)" +
+ s", MIN(FloatColumn), MAX(FloatColumn)" +
+ s", MIN(DoubleColumn), MAX(DoubleColumn)" +
+ s", MIN(DateColumn), MAX(DateColumn)" +
+ s"FROM $testTableName",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2, none#3, none#4,
none#5, none#6" +
+ ", none#7, none#8L, none#9L, none#10, none#11, none#12, none#13,
none#14, none#15]"
+ ),
+ new SqlTestParams(
+ name = "min-max - column name non-matching case",
+ querySql = s"SELECT MIN(ID), MAX(iD)" +
+ s", MIN(tINYINTCOLUMN), MAX(tinyintcolumN)" +
+ s", MIN(sMALLINTCOLUMN), MAX(smallintcolumN)" +
+ s", MIN(iNTCOLUMN), MAX(intcolumN)" +
+ s", MIN(bIGINTCOLUMN), MAX(bigintcolumN)" +
+ s", MIN(fLOATCOLUMN), MAX(floatcolumN)" +
+ s", MIN(dOUBLECOLUMN), MAX(doublecolumN)" +
+ s", MIN(dATECOLUMN), MAX(datecolumN)" +
+ s"FROM $testTableName",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2, none#3, none#4,
none#5, none#6" +
+ ", none#7, none#8L, none#9L, none#10, none#11, none#12, none#13,
none#14, none#15]"
+ ),
+ new SqlTestParams(
+ name = "count with column name alias",
+ querySql = s"SELECT COUNT(*) as MyCount FROM $testTableName",
+ expectedPlan = "LocalRelation [none#0L]"),
+ new SqlTestParams(
+ name = "count-min-max with column name alias",
+ querySql = s"SELECT COUNT(*) as MyCount, MIN(id) as MyMinId, MAX(id) as
MyMaxId" +
+ s" FROM $testTableName",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+ ),
+ new SqlTestParams(
+ name = "count-min-max - table name with alias",
+ querySql = s"SELECT COUNT(*), MIN(id), MAX(id) FROM $testTableName
MyTable",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+ ),
+ new SqlTestParams(
+ name = "count-min-max - query using time travel - version 0",
+ querySql = s"SELECT COUNT(*), MIN(id), MAX(id) " +
+ s"FROM $testTableName VERSION AS OF 0",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+ ),
+ new SqlTestParams(
+ name = "count-min-max - query using time travel - version 1",
+ querySql = s"SELECT COUNT(*), MIN(id), MAX(id) " +
+ s"FROM $testTableName VERSION AS OF 1",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+ ),
+ new SqlTestParams(
+ name = "count-min-max - query using time travel - version 2",
+ querySql = s"SELECT COUNT(*), MIN(id), MAX(id) " +
+ s"FROM $testTableName VERSION AS OF 2",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+ ),
+ new SqlTestParams(
+ name = "count - sub-query",
+ querySql = s"SELECT (SELECT COUNT(*) FROM $testTableName)",
+ expectedPlan = "Project [scalar-subquery#0 [] AS #0L]\n" +
+ ": +- LocalRelation [none#0L]\n+- OneRowRelation"
+ ),
+ new SqlTestParams(
+ name = "min - sub-query",
+ querySql = s"SELECT (SELECT MIN(id) FROM $testTableName)",
+ expectedPlan = "Project [scalar-subquery#0 [] AS #0L]\n" +
+ ": +- LocalRelation [none#0L]\n+- OneRowRelation"
+ ),
+ new SqlTestParams(
+ name = "max - sub-query",
+ querySql = s"SELECT (SELECT MAX(id) FROM $testTableName)",
+ expectedPlan = "Project [scalar-subquery#0 [] AS #0L]\n" +
+ ": +- LocalRelation [none#0L]\n+- OneRowRelation"
+ ),
+ new SqlTestParams(
+ name = "count - sub-query filter",
+ querySql = s"SELECT 'ABC' WHERE" +
+ s" (SELECT COUNT(*) FROM $testTableName) = $totalRows",
+ expectedPlan = "Project [ABC AS #0]\n+- Filter (scalar-subquery#0 [] = "
+
+ totalRows + ")\n : +- LocalRelation [none#0L]\n +- OneRowRelation"
+ ),
+ new SqlTestParams(
+ name = "min - sub-query filter",
+ querySql = s"SELECT 'ABC' WHERE" +
+ s" (SELECT MIN(id) FROM $testTableName) = $minId",
+ expectedPlan = "Project [ABC AS #0]\n+- Filter (scalar-subquery#0 [] = "
+
+ minId + ")\n : +- LocalRelation [none#0L]\n +- OneRowRelation"
+ ),
+ new SqlTestParams(
+ name = "max - sub-query filter",
+ querySql = s"SELECT 'ABC' WHERE" +
+ s" (SELECT MAX(id) FROM $testTableName) = $maxId",
+ expectedPlan = "Project [ABC AS #0]\n+- Filter (scalar-subquery#0 [] = "
+
+ maxId + ")\n : +- LocalRelation [none#0L]\n +- OneRowRelation"
+ ),
+ // Limit doesn't affect aggregation results
+ new SqlTestParams(
+ name = "count-min-max - query with limit",
+ querySql = s"SELECT COUNT(*), MIN(id), MAX(id) FROM $testTableName LIMIT
3",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+ ),
+ new SqlTestParams(
+ name = "count-min-max - duplicated functions",
+ querySql = s"SELECT COUNT(*), COUNT(*), MIN(id), MIN(id), MAX(id),
MAX(id)" +
+ s" FROM $testTableName",
+ expectedPlan = "LocalRelation [none#0L, none#1L, none#2L, none#3L,
none#4L, none#5L]"
+ ),
+ new SqlTestParams(
+ name = "count - empty table",
+ querySetup = Some(Seq("CREATE TABLE TestEmpty (c1 int) USING DELTA")),
+ querySql = "SELECT COUNT(*) FROM TestEmpty",
+ expectedPlan = "LocalRelation [none#0L]"
+ ),
+ /**
+ * Dates are stored as Int in literals. This test make sure Date columns
works and NULL are
+ * handled correctly
+ */
+ new SqlTestParams(
+ name = "min-max - date columns",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestDateValues" +
+ " (Column1 DATE, Column2 DATE, Column3 DATE) USING DELTA;",
+ "INSERT INTO TestDateValues" +
+ " (Column1, Column2, Column3) VALUES (NULL, current_date(),
current_date());",
+ "INSERT INTO TestDateValues" +
+ " (Column1, Column2, Column3) VALUES (NULL, NULL, current_date());"
+ )),
+ querySql = "SELECT COUNT(*), MIN(Column1), MAX(Column1), MIN(Column2)" +
+ ", MAX(Column2), MIN(Column3), MAX(Column3) FROM TestDateValues",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4,
none#5, none#6]"
+ ),
+ new SqlTestParams(
+ name = "min-max - floating point infinity",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestFloatInfinity (FloatColumn Float, DoubleColumn
Double) USING DELTA",
+ "INSERT INTO TestFloatInfinity (FloatColumn, DoubleColumn) VALUES
(1, 1);",
+ "INSERT INTO TestFloatInfinity (FloatColumn, DoubleColumn) VALUES
(NULL, NULL);",
+ "INSERT INTO TestFloatInfinity (FloatColumn, DoubleColumn) VALUES " +
+ "(float('inf'), double('inf'))" +
+ ", (float('+inf'), double('+inf'))" +
+ ", (float('infinity'), double('infinity'))" +
+ ", (float('+infinity'), double('+infinity'))" +
+ ", (float('-inf'), double('-inf'))" +
+ ", (float('-infinity'), double('-infinity'))"
+ )),
+ querySql = "SELECT COUNT(*), MIN(FloatColumn), MAX(FloatColumn),
MIN(DoubleColumn)" +
+ ", MAX(DoubleColumn) FROM TestFloatInfinity",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+ ),
+ // NaN is larger than any other value, including Infinity
+ new SqlTestParams(
+ name = "min-max - floating point NaN values",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestFloatNaN (FloatColumn Float, DoubleColumn Double)
USING DELTA",
+ "INSERT INTO TestFloatNaN (FloatColumn, DoubleColumn) VALUES (1,
1);",
+ "INSERT INTO TestFloatNaN (FloatColumn, DoubleColumn) VALUES (NULL,
NULL);",
+ "INSERT INTO TestFloatNaN (FloatColumn, DoubleColumn) VALUES " +
+ "(float('inf'), double('inf'))" +
+ ", (float('+inf'), double('+inf'))" +
+ ", (float('infinity'), double('infinity'))" +
+ ", (float('+infinity'), double('+infinity'))" +
+ ", (float('-inf'), double('-inf'))" +
+ ", (float('-infinity'), double('-infinity'))",
+ "INSERT INTO TestFloatNaN (FloatColumn, DoubleColumn) VALUES " +
+ "(float('NaN'), double('NaN'));"
+ )),
+ querySql = "SELECT COUNT(*), MIN(FloatColumn), MAX(FloatColumn),
MIN(DoubleColumn)" +
+ ", MAX(DoubleColumn) FROM TestFloatNaN",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+ ),
+ new SqlTestParams(
+ name = "min-max - floating point min positive value",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestFloatPrecision (FloatColumn Float, DoubleColumn
Double) USING DELTA",
+ "INSERT INTO TestFloatPrecision (FloatColumn, DoubleColumn) VALUES "
+
+ "(CAST('1.4E-45' as FLOAT), CAST('4.9E-324' as DOUBLE))" +
+ ", (CAST('-1.4E-45' as FLOAT), CAST('-4.9E-324' as DOUBLE))" +
+ ", (0, 0);"
+ )),
+ querySql = "SELECT COUNT(*), MIN(FloatColumn), MAX(FloatColumn),
MIN(DoubleColumn)" +
+ ", MAX(DoubleColumn) FROM TestFloatPrecision",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+ ),
+ new SqlTestParams(
+ name = "min-max - NULL and non-NULL values",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestNullValues (Column1 INT, Column2 INT, Column3 INT)
USING DELTA",
+ "INSERT INTO TestNullValues (Column1, Column2, Column3) VALUES
(NULL, 1, 1);",
+ "INSERT INTO TestNullValues (Column1, Column2, Column3) VALUES
(NULL, NULL, 1);"
+ )),
+ querySql = "SELECT COUNT(*), MIN(Column1), MAX(Column1)," +
+ "MIN(Column2), MAX(Column2) FROM TestNullValues",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+ ),
+ new SqlTestParams(
+ name = "min-max - only NULL values",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestOnlyNullValues (Column1 INT, Column2 INT, Column3
INT) USING DELTA",
+ "INSERT INTO TestOnlyNullValues (Column1, Column2, Column3) VALUES
(NULL, NULL, 1);",
+ "INSERT INTO TestOnlyNullValues (Column1, Column2, Column3) VALUES
(NULL, NULL, 2);"
+ )),
+ querySql = "SELECT COUNT(*), MIN(Column1), MAX(Column1), MIN(Column2),
MAX(Column2), " +
+ "MIN(Column3), MAX(Column3) FROM TestOnlyNullValues",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4,
none#5, none#6]"
+ ),
+ new SqlTestParams(
+ name = "min-max - all supported data types",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestMinMaxValues (" +
+ "TINYINTColumn TINYINT, SMALLINTColumn SMALLINT, INTColumn INT,
BIGINTColumn BIGINT, " +
+ "FLOATColumn FLOAT, DOUBLEColumn DOUBLE, DATEColumn DATE) USING
DELTA",
+ "INSERT INTO TestMinMaxValues (TINYINTColumn, SMALLINTColumn,
INTColumn, BIGINTColumn," +
+ " FLOATColumn, DOUBLEColumn, DATEColumn)" +
+ " VALUES (-128, -32768, -2147483648, -9223372036854775808," +
+ " -3.4028235E38, -1.7976931348623157E308, CAST('1582-10-15' AS
DATE));",
+ "INSERT INTO TestMinMaxValues (TINYINTColumn, SMALLINTColumn,
INTColumn, BIGINTColumn," +
+ " FLOATColumn, DOUBLEColumn, DATEColumn)" +
+ " VALUES (127, 32767, 2147483647, 9223372036854775807," +
+ " 3.4028235E38, 1.7976931348623157E308, CAST('9999-12-31' AS
DATE));"
+ )),
+ querySql = "SELECT COUNT(*)," +
+ "MIN(TINYINTColumn), MAX(TINYINTColumn)" +
+ ", MIN(SMALLINTColumn), MAX(SMALLINTColumn)" +
+ ", MIN(INTColumn), MAX(INTColumn)" +
+ ", MIN(BIGINTColumn), MAX(BIGINTColumn)" +
+ ", MIN(FLOATColumn), MAX(FLOATColumn)" +
+ ", MIN(DOUBLEColumn), MAX(DOUBLEColumn)" +
+ ", MIN(DATEColumn), MAX(DATEColumn)" +
+ " FROM TestMinMaxValues",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4,
none#5, none#6" +
+ ", none#7L, none#8L, none#9, none#10, none#11, none#12, none#13,
none#14]"
+ ),
+ new SqlTestParams(
+ name = "count-min-max - partitioned table - simple query",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestPartitionedTable (Column1 INT, Column2 INT,
Column3 INT, Column4 INT)" +
+ " USING DELTA PARTITIONED BY (Column2, Column3)",
+ "INSERT INTO TestPartitionedTable" +
+ " (Column1, Column2, Column3, Column4) VALUES (1, 2, 3, 4);",
+ "INSERT INTO TestPartitionedTable" +
+ " (Column1, Column2, Column3, Column4) VALUES (2, 2, 3, 5);",
+ "INSERT INTO TestPartitionedTable" +
+ " (Column1, Column2, Column3, Column4) VALUES (3, 3, 2, 6);",
+ "INSERT INTO TestPartitionedTable" +
+ " (Column1, Column2, Column3, Column4) VALUES (4, 3, 2, 7);"
+ )),
+ querySql = "SELECT COUNT(*)" +
+ ", MIN(Column1), MAX(Column1)" +
+ ", MIN(Column2), MAX(Column2)" +
+ ", MIN(Column3), MAX(Column3)" +
+ ", MIN(Column4), MAX(Column4)" +
+ " FROM TestPartitionedTable",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3," +
+ " none#4, none#5, none#6, none#7, none#8]"
+ ),
+ /**
+ * Partitioned columns should be able to return MIN and MAX data even when
there are no column
+ * stats
+ */
+ new SqlTestParams(
+ name = "count-min-max - partitioned table - no stats",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestPartitionedTableNoStats" +
+ " (Column1 INT, Column2 INT, Column3 INT, Column4 INT)" +
+ " USING DELTA PARTITIONED BY (Column2, Column3)" +
+ " TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 0)",
+ "INSERT INTO TestPartitionedTableNoStats" +
+ " (Column1, Column2, Column3, Column4) VALUES (1, 2, 3, 4);",
+ "INSERT INTO TestPartitionedTableNoStats" +
+ " (Column1, Column2, Column3, Column4) VALUES (2, 2, 3, 5);",
+ "INSERT INTO TestPartitionedTableNoStats" +
+ " (Column1, Column2, Column3, Column4) VALUES (3, 3, 2, 6);",
+ "INSERT INTO TestPartitionedTableNoStats" +
+ " (Column1, Column2, Column3, Column4) VALUES (4, 3, 2, 7);"
+ )),
+ querySql = "SELECT COUNT(*)" +
+ ", MIN(Column2), MAX(Column2)" +
+ ", MIN(Column3), MAX(Column3)" +
+ " FROM TestPartitionedTableNoStats",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+ ),
+ new SqlTestParams(
+ name = "min-max - partitioned table - all supported data types",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestAllTypesPartitionedTable (" +
+ "TINYINTColumn TINYINT, SMALLINTColumn SMALLINT, INTColumn INT,
BIGINTColumn BIGINT, " +
+ "FLOATColumn FLOAT, DOUBLEColumn DOUBLE, DATEColumn DATE, Data
INT) USING DELTA" +
+ " PARTITIONED BY (TINYINTColumn, SMALLINTColumn, INTColumn,
BIGINTColumn," +
+ " FLOATColumn, DOUBLEColumn, DATEColumn)",
+ "INSERT INTO TestAllTypesPartitionedTable" +
+ " (TINYINTColumn, SMALLINTColumn, INTColumn, BIGINTColumn," +
+ " FLOATColumn, DOUBLEColumn, DATEColumn, Data)" +
+ " VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);",
+ "INSERT INTO TestAllTypesPartitionedTable" +
+ " (TINYINTColumn, SMALLINTColumn, INTColumn, BIGINTColumn," +
+ " FLOATColumn, DOUBLEColumn, DATEColumn, Data)" +
+ " VALUES (-128, -32768, -2147483648, -9223372036854775808," +
+ " -3.4028235E38, -1.7976931348623157E308, CAST('1582-10-15' AS
DATE), 1);"
+ )),
+ querySql = "SELECT COUNT(*)," +
+ "MIN(TINYINTColumn), MAX(TINYINTColumn)" +
+ ", MIN(SMALLINTColumn), MAX(SMALLINTColumn)" +
+ ", MIN(INTColumn), MAX(INTColumn)" +
+ ", MIN(BIGINTColumn), MAX(BIGINTColumn)" +
+ ", MIN(FLOATColumn), MAX(FLOATColumn)" +
+ ", MIN(DOUBLEColumn), MAX(DOUBLEColumn)" +
+ ", MIN(DATEColumn), MAX(DATEColumn)" +
+ ", MIN(Data), MAX(Data)" +
+ " FROM TestAllTypesPartitionedTable",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4,
none#5, none#6, " +
+ "none#7L, none#8L, none#9, none#10, none#11, none#12, none#13,
none#14, none#15, none#16]"
+ ),
+ new SqlTestParams(
+ name = "min-max - partitioned table - only NULL values",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestOnlyNullValuesPartitioned (" +
+ "TINYINTColumn TINYINT, SMALLINTColumn SMALLINT, INTColumn INT,
BIGINTColumn BIGINT, " +
+ "FLOATColumn FLOAT, DOUBLEColumn DOUBLE, DATEColumn DATE, Data
INT) USING DELTA" +
+ " PARTITIONED BY (TINYINTColumn, SMALLINTColumn, INTColumn,
BIGINTColumn," +
+ " FLOATColumn, DOUBLEColumn, DATEColumn)",
+ "INSERT INTO TestOnlyNullValuesPartitioned" +
+ " (TINYINTColumn, SMALLINTColumn, INTColumn, BIGINTColumn," +
+ " FLOATColumn, DOUBLEColumn, DATEColumn, Data)" +
+ " VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);"
+ )),
+ querySql = "SELECT COUNT(*)," +
+ "MIN(TINYINTColumn), MAX(TINYINTColumn)" +
+ ", MIN(SMALLINTColumn), MAX(SMALLINTColumn)" +
+ ", MIN(INTColumn), MAX(INTColumn)" +
+ ", MIN(BIGINTColumn), MAX(BIGINTColumn)" +
+ ", MIN(FLOATColumn), MAX(FLOATColumn)" +
+ ", MIN(DOUBLEColumn), MAX(DOUBLEColumn)" +
+ ", MIN(DATEColumn), MAX(DATEColumn)" +
+ ", MIN(Data), MAX(Data)" +
+ " FROM TestOnlyNullValuesPartitioned",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4,
none#5, none#6, " +
+ "none#7L, none#8L, none#9, none#10, none#11, none#12, none#13,
none#14, none#15, none#16]"
+ ),
+ new SqlTestParams(
+ name = "min-max - partitioned table - NULL and NON-NULL values",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestNullPartitioned (Column1 INT, Column2 INT, Column3
INT)" +
+ " USING DELTA PARTITIONED BY (Column2, Column3)",
+ "INSERT INTO TestNullPartitioned (Column1, Column2, Column3) VALUES
(NULL, NULL, 1);",
+ "INSERT INTO TestNullPartitioned (Column1, Column2, Column3) VALUES
(NULL, NULL, NULL);",
+ "INSERT INTO TestNullPartitioned (Column1, Column2, Column3) VALUES
(NULL, NULL, 2);"
+ )),
+ querySql = "SELECT COUNT(*), MIN(Column1), MAX(Column1), MIN(Column2),
MAX(Column2), " +
+ "MIN(Column3), MAX(Column3) FROM TestNullPartitioned",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4,
none#5, none#6]"
+ ),
+ new SqlTestParams(
+ name = "min-max - column name containing punctuation",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestPunctuationColumnName (`My.!?Column` INT) USING
DELTA",
+ "INSERT INTO TestPunctuationColumnName (`My.!?Column`) VALUES (1),
(2), (3);"
+ )),
+ querySql = "SELECT COUNT(*), MIN(`My.!?Column`), MAX(`My.!?Column`)" +
+ " FROM TestPunctuationColumnName",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2]"
+ ),
+ new SqlTestParams(
+ name = "min-max - partitioned table - column name containing
punctuation",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestPartitionedPunctuationColumnName (`My.!?Column`
INT, Data INT)" +
+ " USING DELTA PARTITIONED BY (`My.!?Column`)",
+ "INSERT INTO TestPartitionedPunctuationColumnName" +
+ " (`My.!?Column`, Data) VALUES (1, 1), (2, 1), (3, 1);"
+ )),
+ querySql = "SELECT COUNT(*), MIN(`My.!?Column`), MAX(`My.!?Column`)" +
+ " FROM TestPartitionedPunctuationColumnName",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2]"
+ ),
+ new SqlTestParams(
+ name = "min-max - partitioned table - special characters in column name",
+ querySetup = Some(
+ Seq(
+ "CREATE TABLE TestColumnMappingPartitioned" +
+ " (Column1 INT, Column2 INT, `Column3 .,;{}()\n\t=` INT, Column4
INT)" +
+ " USING DELTA PARTITIONED BY (Column2, `Column3 .,;{}()\n\t=`)" +
+ " TBLPROPERTIES('delta.columnMapping.mode' = 'name')",
+ "INSERT INTO TestColumnMappingPartitioned" +
+ " (Column1, Column2, `Column3 .,;{}()\n\t=`, Column4)" +
+ " VALUES (1, 2, 3, 4);",
+ "INSERT INTO TestColumnMappingPartitioned" +
+ " (Column1, Column2, `Column3 .,;{}()\n\t=`, Column4)" +
+ " VALUES (2, 2, 3, 5);",
+ "INSERT INTO TestColumnMappingPartitioned" +
+ " (Column1, Column2, `Column3 .,;{}()\n\t=`, Column4)" +
+ " VALUES (3, 3, 2, 6);",
+ "INSERT INTO TestColumnMappingPartitioned" +
+ " (Column1, Column2, `Column3 .,;{}()\n\t=`, Column4)" +
+ " VALUES (4, 3, 2, 7);"
+ )),
+ querySql = "SELECT COUNT(*)" +
+ ", MIN(Column1), MAX(Column1)" +
+ ", MIN(Column2), MAX(Column2)" +
+ ", MIN(`Column3 .,;{}()\n\t=`), MAX(`Column3 .,;{}()\n\t=`)" +
+ ", MIN(Column4), MAX(Column4)" +
+ " FROM TestColumnMappingPartitioned",
+ expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3," +
+ " none#4, none#5, none#6, none#7, none#8]"
+ )
+ )
+ .foreach {
+ testParams =>
+ test(s"optimization supported - SQL - ${testParams.name}") {
+ if (testParams.querySetup.isDefined) {
+ testParams.querySetup.get.foreach(spark.sql)
+ }
+ checkResultsAndOptimizedPlan(testParams.querySql,
testParams.expectedPlan)
+ }
+ }
+
+ test("count-min-max - external table") {
+ withTempDir {
+ dir =>
+ val testTablePath = dir.getCanonicalPath
+ dfPart1.write.format("delta").mode("overwrite").save(testTablePath)
+ DeltaTable.forPath(spark, testTablePath).delete("id = 1")
+ dfPart2.write.format("delta").mode(SaveMode.Append).save(testTablePath)
+
+ checkResultsAndOptimizedPlan(
+ s"SELECT COUNT(*), MIN(id), MAX(id) FROM delta.`$testTablePath`",
+ "LocalRelation [none#0L, none#1L, none#2L]")
+ }
+ }
+
+ test("min-max - partitioned column stats disabled") {
+ withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false") {
+ val tableName = "TestPartitionedNoStats"
+
+ spark.sql(
+ s"CREATE TABLE $tableName (Column1 INT, Column2 INT)" +
+ " USING DELTA PARTITIONED BY (Column2)")
+
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 3);")
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (2, 4);")
+
+ // Has no stats, including COUNT
+ checkOptimizationIsNotTriggered(
+ s"SELECT COUNT(*), MIN(Column2), MAX(Column2) FROM $tableName")
+
+ // Should work for partitioned columns even without stats
+ checkResultsAndOptimizedPlan(
+ s"SELECT MIN(Column2), MAX(Column2) FROM $tableName",
+ "LocalRelation [none#0, none#1]")
+ }
+ }
+
+ test("min-max - recompute column missing stats") {
+ val tableName = "TestRecomputeMissingStat"
+
+ spark.sql(
+ s"CREATE TABLE $tableName (Column1 INT, Column2 INT) USING DELTA" +
+ s" TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 0)")
+
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 4);")
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (2, 5);")
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (3, 6);")
+
+ checkOptimizationIsNotTriggered(s"SELECT COUNT(*), MIN(Column1),
MAX(Column1) FROM $tableName")
+
+ spark.sql(s"ALTER TABLE $tableName SET
TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 1);")
+
+ StatisticsCollection.recompute(
+ spark,
+ DeltaLog.forTable(spark, TableIdentifier(tableName)),
+ DeltaTableV2(spark, TableIdentifier(tableName)).catalogTable)
+
+ checkResultsAndOptimizedPlan(
+ s"SELECT COUNT(*), MIN(Column1), MAX(Column1) FROM $tableName",
+ "LocalRelation [none#0L, none#1, none#2]")
+
+ checkOptimizationIsNotTriggered(s"SELECT COUNT(*), MIN(Column2),
MAX(Column2) FROM $tableName")
+
+ spark.sql(s"ALTER TABLE $tableName SET
TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 2);")
+
+ StatisticsCollection.recompute(
+ spark,
+ DeltaLog.forTable(spark, TableIdentifier(tableName)),
+ DeltaTableV2(spark, TableIdentifier(tableName)).catalogTable)
+
+ checkResultsAndOptimizedPlan(
+ s"SELECT COUNT(*), MIN(Column2), MAX(Column2) FROM $tableName",
+ "LocalRelation [none#0L, none#1, none#2]")
+ }
+
+ test("min-max - recompute added column") {
+ val tableName = "TestRecomputeAddedColumn"
+
+ spark.sql(s"CREATE TABLE $tableName (Column1 INT) USING DELTA")
+ spark.sql(s"INSERT INTO $tableName (Column1) VALUES (1);")
+
+ spark.sql(s"ALTER TABLE $tableName ADD COLUMN (Column2 INT)")
+
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (2, 5);")
+
+ checkResultsAndOptimizedPlan(
+ s"SELECT COUNT(*), MIN(Column1), MAX(Column1) FROM $tableName",
+ "LocalRelation [none#0L, none#1, none#2]")
+
+ checkOptimizationIsNotTriggered(
+ s"SELECT COUNT(*), " +
+ s"MIN(Column1), MAX(Column1), MIN(Column2), MAX(Column2) FROM
$tableName")
+
+ StatisticsCollection.recompute(
+ spark,
+ DeltaLog.forTable(spark, TableIdentifier(tableName)),
+ DeltaTableV2(spark, TableIdentifier(tableName)).catalogTable)
+
+ checkResultsAndOptimizedPlan(
+ s"SELECT COUNT(*), " +
+ s"MIN(Column1), MAX(Column1), MIN(Column2), MAX(Column2) FROM
$tableName",
+ "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+ )
+ }
+
+ test("Select Count: snapshot isolation") {
+ sql(s"CREATE TABLE TestSnapshotIsolation (c1 int) USING DELTA")
+ spark.sql("INSERT INTO TestSnapshotIsolation VALUES (1)")
+
+ val scannedVersions = mutable.ArrayBuffer[Long]()
+ val query = "SELECT (SELECT COUNT(*) FROM TestSnapshotIsolation), " +
+ "(SELECT COUNT(*) FROM TestSnapshotIsolation)"
+
+ checkResultsAndOptimizedPlan(
+ query,
+ "Project [scalar-subquery#0 [] AS #0L, scalar-subquery#0 [] AS #1L]\n" +
+ ": :- LocalRelation [none#0L]\n" +
+ ": +- LocalRelation [none#0L]\n" +
+ "+- OneRowRelation"
+ )
+
+ PrepareDeltaScanBase.withCallbackOnGetDeltaScanGenerator(
+ scanGenerator => {
+ // Record the scanned version and make changes to the table. We will
verify changes in the
+ // middle of the query are not visible to the query.
+ scannedVersions += scanGenerator.snapshotToScan.version
+ // Insert a row after each call to get scanGenerator
+ // to test if the count doesn't change in the same query
+ spark.sql("INSERT INTO TestSnapshotIsolation VALUES (1)")
+ }) {
+ val result = spark.sql(query).collect()(0)
+ val c1 = result.getLong(0)
+ val c2 = result.getLong(1)
+ assertResult(c1, "Snapshot isolation should guarantee the results are
always the same")(c2)
+ assert(
+ scannedVersions.toSet.size == 1,
+ s"Scanned multiple versions of the same table in one query:
${scannedVersions.toSet}")
+ }
+ }
+
+ test(".collect() and .show() both use this optimization") {
+ var resultRow: Row = null
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key ->
"false") {
+ resultRow = spark.sql(s"SELECT COUNT(*), MIN(id), MAX(id) FROM
$testTableName").head
+ }
+
+ val totalRows = resultRow.getLong(0)
+ val minId = resultRow.getLong(1)
+ val maxId = resultRow.getLong(2)
+
+ val collectPlans = DeltaTestUtils.withLogicalPlansCaptured(spark,
optimizedPlan = true) {
+ spark.sql(s"SELECT COUNT(*) FROM $testTableName").collect()
+ }
+ val collectResultData = collectPlans.collect { case x: LocalRelation =>
x.data }
+ assert(collectResultData.size === 1)
+ assert(collectResultData.head.head.getLong(0) === totalRows)
+
+ val showPlans = DeltaTestUtils.withLogicalPlansCaptured(spark,
optimizedPlan = true) {
+ spark.sql(s"SELECT COUNT(*) FROM $testTableName").show()
+ }
+ val showResultData = showPlans.collect { case x: LocalRelation => x.data }
+ assert(showResultData.size === 1)
+ assert(showResultData.head.head.getString(0).toLong === totalRows)
+
+ val showMultAggPlans = DeltaTestUtils.withLogicalPlansCaptured(spark,
optimizedPlan = true) {
+ spark.sql(s"SELECT COUNT(*), MIN(id), MAX(id) FROM
$testTableName").show()
+ }
+
+ val showMultipleAggResultData = showMultAggPlans.collect { case x:
LocalRelation => x.data }
+ assert(showMultipleAggResultData.size === 1)
+ val firstRow = showMultipleAggResultData.head.head
+ assert(firstRow.getString(0).toLong === totalRows)
+ assert(firstRow.getString(1).toLong === minId)
+ assert(firstRow.getString(2).toLong === maxId)
+ }
+
+ test("min-max .show() - only NULL values") {
+ val tableName = "TestOnlyNullValuesShow"
+
+ spark.sql(s"CREATE TABLE $tableName (Column1 INT) USING DELTA")
+ spark.sql(s"INSERT INTO $tableName (Column1) VALUES (NULL);")
+
+ val showMultAggPlans = DeltaTestUtils.withLogicalPlansCaptured(spark,
optimizedPlan = true) {
+ spark.sql(s"SELECT MIN(Column1), MAX(Column1) FROM $tableName").show()
+ }
+
+ val showMultipleAggResultData = showMultAggPlans.collect { case x:
LocalRelation => x.data }
+ assert(showMultipleAggResultData.size === 1)
+ val firstRow = showMultipleAggResultData.head.head
+ assert(firstRow.getString(0) === "NULL")
+ assert(firstRow.getString(1) === "NULL")
+ }
+
+ test("min-max .show() - Date Columns") {
+ val tableName = "TestDateColumnsShow"
+
+ spark.sql(s"CREATE TABLE $tableName (Column1 DATE, Column2 DATE) USING
DELTA")
+ spark.sql(
+ s"INSERT INTO $tableName (Column1, Column2) VALUES " +
+ s"(CAST('1582-10-15' AS DATE), NULL);")
+
+ val showMultAggPlans = DeltaTestUtils.withLogicalPlansCaptured(spark,
optimizedPlan = true) {
+ spark.sql(s"SELECT MIN(Column1), MIN(Column2) FROM $tableName").show()
+ }
+
+ val showMultipleAggResultData = showMultAggPlans.collect { case x:
LocalRelation => x.data }
+ assert(showMultipleAggResultData.size === 1)
+ val firstRow = showMultipleAggResultData.head.head
+ assert(firstRow.getString(0) === "1582-10-15")
+ assert(firstRow.getString(1) === "NULL")
+ }
+
+ test("count - dv-enabled") {
+ withTempDir {
+ dir =>
+ val tempPath = dir.getCanonicalPath
+ spark.range(1, 10, 1, 1).write.format("delta").save(tempPath)
+
+ enableDeletionVectorsInTable(new Path(tempPath), true)
+ DeltaTable.forPath(spark, tempPath).delete("id = 1")
+ assert(!getFilesWithDeletionVectors(DeltaLog.forTable(spark, new
Path(tempPath))).isEmpty)
+
+ checkResultsAndOptimizedPlan(
+ s"SELECT COUNT(*) FROM delta.`$tempPath`",
+ "LocalRelation [none#0L]")
+ }
+ }
+
+ test("count - zero rows AddFile") {
+ withTempDir {
+ dir =>
+ val tempPath = dir.getCanonicalPath
+ val df = spark.range(1, 10)
+ val expectedResult = df.count()
+ df.write.format("delta").save(tempPath)
+
+ // Creates AddFile entries with non-existing files
+ // The query should read only the delta log and not the parquet files
+ val log = DeltaLog.forTable(spark, tempPath)
+ val txn = log.startTransaction()
+ txn.commitManually(
+ DeltaTestUtils.createTestAddFile(
+ encodedPath = "1.parquet",
+ stats = "{\"numRecords\": 0}"),
+ DeltaTestUtils.createTestAddFile(
+ encodedPath = "2.parquet",
+ stats = "{\"numRecords\": 0}"),
+ DeltaTestUtils.createTestAddFile(encodedPath = "3.parquet", stats =
"{\"numRecords\": 0}")
+ )
+
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key ->
"true") {
+ val queryDf = spark.sql(s"SELECT COUNT(*) FROM delta.`$tempPath`")
+ val optimizedPlan =
queryDf.queryExecution.optimizedPlan.canonicalized.toString()
+
+ assert(queryDf.head().getLong(0) === expectedResult)
+
+ assertResult("LocalRelation [none#0L]") {
+ optimizedPlan.trim
+ }
+ }
+ }
+ }
+
+ // Tests to validate the optimizer won't incorrectly change queries it can't
correctly handle
+
+ Seq(
+ (s"SELECT COUNT(*) FROM $mixedStatsTableName", "missing stats"),
+ (s"SELECT COUNT(*) FROM $noStatsTableName", "missing stats"),
+ (s"SELECT MIN(id), MAX(id) FROM $mixedStatsTableName", "missing stats"),
+ (s"SELECT MIN(id), MAX(id) FROM $noStatsTableName", "missing stats"),
+ (s"SELECT group, COUNT(*) FROM $testTableName GROUP BY group", "group by"),
+ (s"SELECT group, MIN(id), MAX(id) FROM $testTableName GROUP BY group",
"group by"),
+ (s"SELECT COUNT(*) + 1 FROM $testTableName", "plus literal"),
+ (s"SELECT MAX(id) + 1 FROM $testTableName", "plus literal"),
+ (s"SELECT COUNT(DISTINCT data) FROM $testTableName", "distinct count"),
+ (s"SELECT COUNT(*) FROM $testTableName WHERE id > 0", "filter"),
+ (s"SELECT MAX(id) FROM $testTableName WHERE id > 0", "filter"),
+ (s"SELECT (SELECT COUNT(*) FROM $testTableName WHERE id > 0)", "sub-query
with filter"),
+ (s"SELECT (SELECT MAX(id) FROM $testTableName WHERE id > 0)", "sub-query
with filter"),
+ (s"SELECT COUNT(ALL data) FROM $testTableName", "count non-null"),
+ (s"SELECT COUNT(data) FROM $testTableName", "count non-null"),
+ (s"SELECT COUNT(*) FROM $testTableName A, $testTableName B", "join"),
+ (s"SELECT MAX(A.id) FROM $testTableName A, $testTableName B", "join"),
+ (s"SELECT COUNT(*) OVER() FROM $testTableName LIMIT 1", "over"),
+ (s"SELECT MAX(id) OVER() FROM $testTableName LIMIT 1", "over")
+ )
+ .foreach {
+ case (query, desc) =>
+ test(s"optimization not supported - $desc - $query") {
+ checkOptimizationIsNotTriggered(query)
+ }
+ }
+
+ test("optimization not supported - min-max unsupported data types") {
+ val tableName = "TestUnsupportedTypes"
+
+ spark.sql(
+ s"CREATE TABLE $tableName " +
+ s"(STRINGColumn STRING, DECIMALColumn DECIMAL(38,0)" +
+ s", TIMESTAMPColumn TIMESTAMP, BINARYColumn BINARY, " +
+ s"BOOLEANColumn BOOLEAN, ARRAYColumn ARRAY<INT>, MAPColumn MAP<INT,
INT>, " +
+ s"STRUCTColumn STRUCT<Id: INT, Name: STRING>) USING DELTA")
+
+ spark.sql(s"INSERT INTO $tableName" +
+ s" (STRINGColumn, DECIMALColumn, TIMESTAMPColumn, BINARYColumn" +
+ s", BOOLEANColumn, ARRAYColumn, MAPColumn, STRUCTColumn) VALUES " +
+ s"('A', -99999999999999999999999999999999999999, CAST('1900-01-01
00:00:00.0' AS TIMESTAMP)" +
+ s", X'1ABF', TRUE, ARRAY(1, 2, 3), MAP(1, 10, 2, 20), STRUCT(1,
'Spark'));")
+
+ val columnNames = List(
+ "STRINGColumn",
+ "DECIMALColumn",
+ "TIMESTAMPColumn",
+ "BINARYColumn",
+ "BOOLEANColumn",
+ "ARRAYColumn",
+ "STRUCTColumn")
+
+ columnNames.foreach(
+ colName => checkOptimizationIsNotTriggered(s"SELECT MAX($colName) FROM
$tableName"))
+ }
+
+ test("optimization not supported - min-max column without stats") {
+ val tableName = "TestColumnWithoutStats"
+
+ spark.sql(
+ s"CREATE TABLE $tableName (Column1 INT, Column2 INT) USING DELTA" +
+ s" TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 1)")
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 2);")
+
+ checkOptimizationIsNotTriggered(s"SELECT MAX(Column2) FROM $tableName")
+ }
+
+ // For empty tables the stats won't be found and the query should not be
optimized
+ test("optimization not supported - min-max empty table") {
+ val tableName = "TestMinMaxEmptyTable"
+
+ spark.sql(s"CREATE TABLE $tableName (Column1 INT) USING DELTA")
+
+ checkOptimizationIsNotTriggered(s"SELECT MIN(Column1), MAX(Column1) FROM
$tableName")
+ }
+
+ test("optimization not supported - min-max dv-enabled") {
+ withTempDir {
+ dir =>
+ val tempPath = dir.getCanonicalPath
+ spark.range(1, 10, 1, 1).write.format("delta").save(tempPath)
+ val querySql = s"SELECT MIN(id), MAX(id) FROM delta.`$tempPath`"
+ checkResultsAndOptimizedPlan(querySql, "LocalRelation [none#0L,
none#1L]")
+
+ enableDeletionVectorsInTable(new Path(tempPath), true)
+ DeltaTable.forPath(spark, tempPath).delete("id = 1")
+ assert(!getFilesWithDeletionVectors(DeltaLog.forTable(spark, new
Path(tempPath))).isEmpty)
+ checkOptimizationIsNotTriggered(querySql)
+ }
+ }
+
+ test("optimization not supported - filter on partitioned column") {
+ val tableName = "TestPartitionedFilter"
+
+ spark.sql(
+ s"CREATE TABLE $tableName (Column1 INT, Column2 INT)" +
+ " USING DELTA PARTITIONED BY (Column2)")
+
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 2);")
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (2, 2);")
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (3, 3);")
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (4, 3);")
+
+ // Filter by partition column
+ checkOptimizationIsNotTriggered(
+ "SELECT COUNT(*)" +
+ ", MIN(Column1), MAX(Column1)" +
+ ", MIN(Column2), MAX(Column2)" +
+ s" FROM $tableName WHERE Column2 = 2")
+
+ // Filter both partition and data columns
+ checkOptimizationIsNotTriggered(
+ "SELECT COUNT(*)" +
+ ", MIN(Column1), MAX(Column1)" +
+ ", MIN(Column2), MAX(Column2)" +
+ s" FROM $tableName WHERE Column1 = 2 AND Column2 = 2")
+ }
+
+ test("optimization not supported - sub-query with column alias") {
+ val tableName = "TestColumnAliasSubQuery"
+
+ spark.sql(s"CREATE TABLE $tableName (Column1 INT, Column2 INT, Column3
INT) USING DELTA")
+
+ spark.sql(s"INSERT INTO $tableName (Column1, Column2, Column3) VALUES (1,
2, 3);")
+
+ checkOptimizationIsNotTriggered(
+ s"SELECT MAX(Column2) FROM (SELECT Column1 AS Column2 FROM $tableName)")
+
+ checkOptimizationIsNotTriggered(
+ s"SELECT MAX(Column1), MAX(Column2), MAX(Column3) FROM " +
+ s"(SELECT Column1 AS Column2, Column2 AS Column3, Column3 AS Column1
FROM $tableName)")
+ }
+
+ test("optimization not supported - nested columns") {
+ val tableName = "TestNestedColumns"
+
+ spark.sql(
+ s"CREATE TABLE $tableName " +
+ s"(Column1 STRUCT<Id: INT>, " +
+ s"`Column1.Id` INT) USING DELTA")
+
+ spark.sql(
+ s"INSERT INTO $tableName" +
+ s" (Column1, `Column1.Id`) VALUES " +
+ s"(STRUCT(1), 2);")
+
+ // Nested Column
+ checkOptimizationIsNotTriggered(s"SELECT MAX(Column1.Id) FROM $tableName")
+
+ checkOptimizationIsNotTriggered(s"SELECT MAX(Column1.Id) AS XYZ FROM
$tableName")
+
+ // Validate the scenario where all the columns are read
+ // since it creates a different query plan
+ checkOptimizationIsNotTriggered(
+ s"SELECT MAX(Column1.Id), " +
+ s"MAX(`Column1.Id`) FROM $tableName")
+
+ // The optimization for columns with dots should still work
+ checkResultsAndOptimizedPlan(
+ s"SELECT MAX(`Column1.Id`) FROM $tableName",
+ "LocalRelation [none#0]")
+ }
+
+ private def generateRowsDataFrame(source: Dataset[java.lang.Long]):
DataFrame = {
+ import testImplicits._
+
+ source.select(
+ Symbol("id"),
+ Symbol("id").cast("tinyint").as(Symbol("TinyIntColumn")),
+ Symbol("id").cast("smallint").as(Symbol("SmallIntColumn")),
+ Symbol("id").cast("int").as(Symbol("IntColumn")),
+ Symbol("id").cast("bigint").as(Symbol("BigIntColumn")),
+ (Symbol("id") / 3.3).cast("float").as(Symbol("FloatColumn")),
+ (Symbol("id") / 3.3).cast("double").as(Symbol("DoubleColumn")),
+ date_add(lit("2022-08-31").cast("date"),
col("id").cast("int")).as(Symbol("DateColumn")),
+ (Symbol("id") % 2).cast("integer").as(Symbol("group")),
+ Symbol("id").cast("string").as(Symbol("data"))
+ )
+ }
+
+ /**
+ * Validate the results of the query is the same with the flag
+ * DELTA_OPTIMIZE_METADATA_QUERY_ENABLED enabled and disabled. And the
expected Optimized Query
+ * Plan with the flag enabled
+ */
+ private def checkResultsAndOptimizedPlan(query: String,
expectedOptimizedPlan: String): Unit = {
+ checkResultsAndOptimizedPlan(() => spark.sql(query), expectedOptimizedPlan)
+ }
+
+ /**
+ * Validate the results of the query is the same with the flag
+ * DELTA_OPTIMIZE_METADATA_QUERY_ENABLED enabled and disabled. And the
expected Optimized Query
+ * Plan with the flag enabled.
+ */
+ private def checkResultsAndOptimizedPlan(
+ generateQueryDf: () => DataFrame,
+ expectedOptimizedPlan: String): Unit = {
+ var expectedAnswer: scala.Seq[Row] = null
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key ->
"false") {
+ expectedAnswer = generateQueryDf().collect()
+ }
+
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key ->
"true") {
+ val queryDf = generateQueryDf()
+ val optimizedPlan =
queryDf.queryExecution.optimizedPlan.canonicalized.toString()
+
+ assert(queryDf.collect().sameElements(expectedAnswer))
+
+ assertResult(expectedOptimizedPlan.trim) {
+ optimizedPlan.trim
+ }
+ }
+ }
+
+ /**
+ * Verify the query plans and results are the same with/without metadata
query optimization. This
+ * method can be used to verify cases that we shouldn't trigger optimization
or cases that we can
+ * potentially improve.
+ * @param query
+ */
+ private def checkOptimizationIsNotTriggered(query: String): Unit = {
+ var expectedOptimizedPlan: String = null
+ var expectedAnswer: scala.Seq[Row] = null
+
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key ->
"false") {
+
+ val generateQueryDf = spark.sql(query)
+ expectedOptimizedPlan =
generateQueryDf.queryExecution.optimizedPlan.canonicalized.toString()
+ expectedAnswer = generateQueryDf.collect()
+ }
+
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key ->
"true") {
+
+ val generateQueryDf = spark.sql(query)
+ val optimizationEnabledQueryPlan =
+ generateQueryDf.queryExecution.optimizedPlan.canonicalized.toString()
+
+ assert(generateQueryDf.collect().sameElements(expectedAnswer))
+
+ assertResult(expectedOptimizedPlan) {
+ optimizationEnabledQueryPlan
+ }
+ }
+ }
+}
+
+trait OptimizeMetadataOnlyDeltaQueryColumnMappingSuiteBase
+ extends DeltaColumnMappingSelectedTestMixin {
+ override protected def runAllTests = true
+}
+
+@Ignore // FIXME: ID-based mapping.
+class OptimizeMetadataOnlyDeltaQueryIdColumnMappingSuite
+ extends OptimizeMetadataOnlyDeltaQuerySuite
+ with DeltaColumnMappingEnableIdMode
+ with OptimizeMetadataOnlyDeltaQueryColumnMappingSuiteBase
+
+class OptimizeMetadataOnlyDeltaQueryNameColumnMappingSuite
+ extends OptimizeMetadataOnlyDeltaQuerySuite
+ with DeltaColumnMappingEnableNameMode
+ with OptimizeMetadataOnlyDeltaQueryColumnMappingSuiteBase
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizedWritesSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizedWritesSuite.scala
new file mode 100644
index 0000000000..d0156e8d6f
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizedWritesSuite.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.perf
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaOptions,
DeltaTestUtils}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.delta.util.JsonUtils
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{LongType, StructType}
+
+import com.databricks.spark.util.Log4jUsageLogger
+
+import java.io.File
+
+import scala.language.implicitConversions
+
+abstract class OptimizedWritesSuiteBase extends QueryTest with
SharedSparkSession {
+
+ import testImplicits._
+
+ protected def writeTest(testName: String)(f: String => Unit): Unit = {
+ test(testName) {
+ withTempDir {
+ dir =>
+ withSQLConf(DeltaConfigs.OPTIMIZE_WRITE.defaultTablePropertyKey ->
"true") {
+ f(dir.getCanonicalPath)
+ }
+ }
+ }
+ }
+
+ protected def checkResult(df: DataFrame, numFileCheck: Long => Boolean, dir:
String): Unit = {
+ val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, dir)
+ val files = snapshot.numOfFiles
+ assert(numFileCheck(files), s"file check failed: received $files")
+
+ checkAnswer(
+ spark.read.format("delta").load(dir),
+ df
+ )
+ }
+
+ implicit protected def fileToPathString(dir: File): String =
dir.getCanonicalPath
+
+ writeTest("non-partitioned write - table config") {
+ dir =>
+ val df = spark.range(0, 100, 1, 4).toDF()
+ df.write.format("delta").save(dir)
+ checkResult(df, numFileCheck = _ === 1, dir)
+ }
+
+ test("non-partitioned write - table config compatibility") {
+ withTempDir {
+ tempDir =>
+ val dir = tempDir.getCanonicalPath
+ // When table property is not set, we use session conf value.
+ // Writes 1 file instead of 4 when OW is enabled
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED.key -> "true") {
+ val df = spark.range(0, 100, 1, 4).toDF()
+ val commitStats = Log4jUsageLogger
+ .track {
+ df.write.format("delta").mode("append").save(dir)
+ }
+ .filter(_.tags.get("opType") === Some("delta.commit.stats"))
+ assert(commitStats.length >= 1)
+ checkResult(df, numFileCheck = _ === 1, dir)
+ }
+ }
+
+ // Test order of precedence between table property
"delta.autoOptimize.optimizeWrite" and
+ // session conf.
+ for {
+ sqlConf <- DeltaTestUtils.BOOLEAN_DOMAIN
+ tableProperty <- DeltaTestUtils.BOOLEAN_DOMAIN
+ } {
+ withTempDir {
+ tempDir =>
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED.key ->
sqlConf.toString) {
+ val dir = tempDir.getCanonicalPath
+ // Write one file to be able to set tblproperties
+ spark
+ .range(10)
+ .coalesce(1)
+ .write
+ .format("delta")
+ .mode("append")
+ .save(dir)
+
+ sql(
+ s"ALTER TABLE delta.`$dir` SET TBLPROPERTIES" +
+ s" (delta.autoOptimize.optimizeWrite =
${tableProperty.toString})")
+
+ val df = spark.range(0, 100, 1, 4).toDF()
+ // OW adds one file vs non-OW adds 4 files
+ val expectedNumberOfFiles = if (sqlConf) 2 else 5
+ df.write.format("delta").mode("append").save(dir)
+ checkResult(
+ df.union(spark.range(10).toDF()),
+ numFileCheck = _ === expectedNumberOfFiles,
+ dir)
+ }
+ }
+ }
+ }
+
+ test("non-partitioned write - data frame config") {
+ withTempDir {
+ dir =>
+ val df = spark.range(0, 100, 1, 4).toDF()
+ df.write
+ .format("delta")
+ .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "true")
+ .save(dir)
+ checkResult(df, numFileCheck = _ === 1, dir)
+ }
+ }
+
+ writeTest("non-partitioned write - data frame config trumps table config") {
+ dir =>
+ val df = spark.range(0, 100, 1, 4).toDF()
+ df.write.format("delta").option(DeltaOptions.OPTIMIZE_WRITE_OPTION,
"false").save(dir)
+ checkResult(df, numFileCheck = _ === 4, dir)
+ }
+
+ writeTest("partitioned write - table config") {
+ dir =>
+ val df = spark
+ .range(0, 100, 1, 4)
+ .withColumn("part", Symbol("id") % 5)
+
+ df.write.partitionBy("part").format("delta").save(dir)
+ checkResult(df, numFileCheck = _ <= 5, dir)
+ }
+
+ test("partitioned write - data frame config") {
+ withTempDir {
+ dir =>
+ val df = spark
+ .range(0, 100, 1, 4)
+ .withColumn("part", Symbol("id") % 5)
+
+ df.write
+ .partitionBy("part")
+ .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "true")
+ .format("delta")
+ .save(dir)
+
+ checkResult(df, numFileCheck = _ <= 5, dir)
+ }
+ }
+
+ writeTest("partitioned write - data frame config trumps table config") {
+ dir =>
+ val df = spark
+ .range(0, 100, 1, 4)
+ .withColumn("part", Symbol("id") % 5)
+
+ df.write
+ .partitionBy("part")
+ .format("delta")
+ .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "false")
+ .save(dir)
+
+ checkResult(df, numFileCheck = _ === 20, dir)
+ }
+
+ writeTest("multi-partitions - table config") {
+ dir =>
+ val df = spark
+ .range(0, 100, 1, 4)
+ .withColumn("part", Symbol("id") % 5)
+ .withColumn("part2", (Symbol("id") / 20).cast("int"))
+
+ df.write.partitionBy("part", "part2").format("delta").save(dir)
+
+ checkResult(df, numFileCheck = _ <= 25, dir)
+ }
+
+ test("multi-partitions - data frame config") {
+ withTempDir {
+ dir =>
+ val df = spark
+ .range(0, 100, 1, 4)
+ .withColumn("part", Symbol("id") % 5)
+ .withColumn("part2", (Symbol("id") / 20).cast("int"))
+
+ df.write
+ .partitionBy("part", "part2")
+ .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "true")
+ .format("delta")
+ .save(dir)
+
+ checkResult(df, numFileCheck = _ <= 25, dir)
+ }
+ }
+
+ test("optimized writes used if enabled when a stream starts") {
+ withTempDir {
+ f =>
+ // Write some data into the table so it already exists
+ Seq(1).toDF().write.format("delta").save(f)
+
+ // Use optimized writes just when starting the stream
+ val inputData = MemoryStream[Int]
+
+ val df = inputData.toDF().repartition(10)
+ var stream: StreamingQuery = null
+
+ // Start the stream with optimized writes enabled, and then reset the
conf
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED.key -> "true") {
+ val checkpoint = new File(f, "checkpoint").getCanonicalPath
+ stream = df.writeStream.format("delta").option("checkpointLocation",
checkpoint).start(f)
+ }
+ try {
+ inputData.addData(1 to 100)
+ stream.processAllAvailable()
+ } finally {
+ stream.stop()
+ }
+
+ val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, f)
+ assert(snapshot.numOfFiles == 2, "Optimized writes were not used")
+ }
+ }
+
+ writeTest("multi-partitions - data frame config trumps table config") {
+ dir =>
+ val df = spark
+ .range(0, 100, 1, 4)
+ .withColumn("part", Symbol("id") % 5)
+ .withColumn("part2", (Symbol("id") / 20).cast("int"))
+
+ df.write
+ .partitionBy("part", "part2")
+ .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "false")
+ .format("delta")
+ .save(dir)
+
+ checkResult(df, numFileCheck = _ > 25, dir)
+ }
+
+ writeTest("optimize should not leverage optimized writes") {
+ dir =>
+ val df = spark.range(0, 10, 1, 2)
+
+ val logs1 = Log4jUsageLogger
+ .track {
+ df.write.format("delta").mode("append").save(dir)
+ df.write.format("delta").mode("append").save(dir)
+ }
+ .filter(_.metric == "tahoeEvent")
+
+ assert(logs1.count(_.tags.get("opType") ===
Some("delta.optimizeWrite.planned")) === 2)
+
+ val logs2 = Log4jUsageLogger
+ .track {
+ sql(s"optimize delta.`$dir`")
+ }
+ .filter(_.metric == "tahoeEvent")
+
+ assert(logs2.count(_.tags.get("opType") ===
Some("delta.optimizeWrite.planned")) === 0)
+ }
+
+ writeTest("map task with more partitions than target shuffle blocks -
non-partitioned") {
+ dir =>
+ val df = spark.range(0, 20, 1, 4)
+
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS.key -> "2")
{
+ df.write.format("delta").mode("append").save(dir)
+ }
+
+ checkResult(df.toDF(), numFileCheck = _ === 1, dir)
+ }
+
+ writeTest("map task with more partitions than target shuffle blocks -
partitioned") {
+ dir =>
+ val df = spark.range(0, 20, 1, 4).withColumn("part", Symbol("id") % 5)
+
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS.key -> "2")
{
+ df.write.format("delta").partitionBy("part").mode("append").save(dir)
+ }
+
+ checkResult(df, numFileCheck = _ === 5, dir)
+ }
+
+ writeTest("zero partition dataframe write") {
+ dir =>
+ val df = spark.range(0, 20, 1, 4).withColumn("part", Symbol("id") % 5)
+ df.write.format("delta").partitionBy("part").mode("append").save(dir)
+ val schema = new StructType().add("id", LongType).add("part", LongType)
+
+ spark
+ .createDataFrame(sparkContext.emptyRDD[Row], schema)
+ .write
+ .format("delta")
+ .partitionBy("part")
+ .mode("append")
+ .save(dir)
+
+ checkResult(df, numFileCheck = _ === 5, dir)
+ }
+
+ test("OptimizedWriterBlocks is not serializable") {
+ assert(
+ !new OptimizedWriterBlocks(Array.empty).isInstanceOf[Serializable],
+ "The blocks should not be serializable so that they don't get shipped to
executors."
+ )
+ }
+
+ writeTest("single partition dataframe write") {
+ dir =>
+ val df = spark.range(0, 20).repartition(1).withColumn("part",
Symbol("id") % 5)
+ val logs1 = Log4jUsageLogger
+ .track {
+ df.write.format("delta").partitionBy("part").mode("append").save(dir)
+ }
+ .filter(_.metric == "tahoeEvent")
+
+ // doesn't use optimized writes
+ assert(logs1.count(_.tags.get("opType") ===
Some("delta.optimizeWrite.planned")) === 0)
+
+ checkResult(df, numFileCheck = _ === 5, dir)
+ }
+
+ writeTest("do not create tons of shuffle partitions during optimized
writes") {
+ dir =>
+ // 50M shuffle blocks would've led to 25M shuffle partitions
+ withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS.key ->
"50000000") {
+ val df = spark.range(0, 20).repartition(2).withColumn("part",
Symbol("id") % 5)
+ val logs1 = Log4jUsageLogger
+ .track {
+
df.write.format("delta").partitionBy("part").mode("append").save(dir)
+ }
+ .filter(_.metric == "tahoeEvent")
+ .filter(_.tags.get("opType") === Some("delta.optimizeWrite.planned"))
+
+ assert(logs1.length === 1)
+ val blob = JsonUtils.fromJson[Map[String, Any]](logs1.head.blob)
+ assert(blob("outputPartitions") === 5)
+ assert(blob("originalPartitions") === 2)
+ assert(blob("numShuffleBlocks") === 50000000)
+ assert(
+ blob("shufflePartitions") ===
+
spark.conf.get(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS))
+
+ checkResult(df, numFileCheck = _ === 5, dir)
+ }
+ }
+}
+
+class OptimizedWritesSuite extends OptimizedWritesSuiteBase with
DeltaSQLCommandTest {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]