This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5382556a87 [GLUTEN-11106][VL] Spark 3.5 / Delta 3.3: Add several 
suites for delta optimizations (#11430)
5382556a87 is described below

commit 5382556a87e97b90a0187b04921cd8cfc9be845f
Author: Hongze Zhang <[email protected]>
AuthorDate: Tue Jan 20 10:27:44 2026 +0000

    [GLUTEN-11106][VL] Spark 3.5 / Delta 3.3: Add several suites for delta 
optimizations (#11430)
---
 .../org/apache/spark/sql/delta/DeltaSuite.scala    |   42 +-
 .../perf/OptimizeMetadataOnlyDeltaQuerySuite.scala | 1121 ++++++++++++++++++++
 .../sql/delta/perf/OptimizedWritesSuite.scala      |  369 +++++++
 3 files changed, 1512 insertions(+), 20 deletions(-)

diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
index e41df6180f..7f9e3db230 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala
@@ -99,7 +99,7 @@ class DeltaSuite
         // Generate two files in two partitions
         spark
           .range(2)
-          .withColumn("part", $"id" % 2)
+          .withColumn("part", Symbol("id") % 2)
           .write
           .format("delta")
           .partitionBy("part")
@@ -227,7 +227,7 @@ class DeltaSuite
         val basePath = dir.getAbsolutePath
         spark
           .range(10)
-          .withColumn("part", 'id % 3)
+          .withColumn("part", Symbol("id") % 3)
           .write
           .format("delta")
           .partitionBy("part")
@@ -1287,7 +1287,7 @@ class DeltaSuite
 
         spark
           .range(100)
-          .select('id, ('id % 4).as("by4"), ('id % 8).as("by8"))
+          .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id") % 
8).as("by8"))
           .write
           .format("delta")
           .partitionBy("by4", "by8")
@@ -1310,7 +1310,7 @@ class DeltaSuite
 
         spark
           .range(100)
-          .select('id, ('id % 4).as("by4"))
+          .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
           .write
           .format("delta")
           .partitionBy("by4")
@@ -1341,7 +1341,7 @@ class DeltaSuite
 
         val dfw = spark
           .range(100)
-          .select('id, ('id % 4).as("by,4"))
+          .select(Symbol("id"), (Symbol("id") % 4).as("by,4"))
           .write
           .format("delta")
           .partitionBy("by,4")
@@ -1372,7 +1372,7 @@ class DeltaSuite
 
         spark
           .range(100)
-          .select('id, ('id % 4).as("by4"), ('id % 8).as("by8"))
+          .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id") % 
8).as("by8"))
           .write
           .format("delta")
           .partitionBy("by4", "by8")
@@ -1381,7 +1381,7 @@ class DeltaSuite
         val e = intercept[AnalysisException] {
           spark
             .range(100)
-            .select('id, ('id % 4).as("by4"))
+            .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
             .write
             .format("delta")
             .partitionBy("by4")
@@ -1401,7 +1401,7 @@ class DeltaSuite
 
         spark
           .range(100)
-          .select('id, ('id * 3).cast("string").as("value"))
+          .select(Symbol("id"), (Symbol("id") * 3).cast("string").as("value"))
           .write
           .format("delta")
           .save(tempDir.toString)
@@ -1409,7 +1409,7 @@ class DeltaSuite
         val e = intercept[AnalysisException] {
           spark
             .range(100)
-            .select('id, ('id * 3).as("value"))
+            .select(Symbol("id"), (Symbol("id") * 3).as("value"))
             .write
             .format("delta")
             .mode("append")
@@ -1431,7 +1431,7 @@ class DeltaSuite
 
         spark
           .range(100)
-          .select('id, ('id % 4).as("by4"))
+          .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
           .write
           .format("delta")
           .partitionBy("by4")
@@ -1444,7 +1444,7 @@ class DeltaSuite
 
         spark
           .range(101, 200)
-          .select('id, ('id % 4).as("by4"), ('id % 8).as("by8"))
+          .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id") % 
8).as("by8"))
           .write
           .format("delta")
           .option(DeltaOptions.MERGE_SCHEMA_OPTION, "true")
@@ -1453,7 +1453,9 @@ class DeltaSuite
 
         checkAnswer(
           spark.read.format("delta").load(tempDir.toString),
-          spark.range(101, 200).select('id, ('id % 4).as("by4"), ('id % 
8).as("by8")))
+          spark
+            .range(101, 200)
+            .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id") 
% 8).as("by8")))
     }
   }
 
@@ -1466,7 +1468,7 @@ class DeltaSuite
 
         spark
           .range(100)
-          .select('id, ('id % 4).as("by4"))
+          .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
           .write
           .format("delta")
           .partitionBy("by4")
@@ -1480,7 +1482,7 @@ class DeltaSuite
         val e = intercept[AnalysisException] {
           spark
             .range(101, 200)
-            .select('id, ('id % 4).as("by4"), ('id % 8).as("by8"))
+            .select(Symbol("id"), (Symbol("id") % 4).as("by4"), (Symbol("id") 
% 8).as("by8"))
             .write
             .format("delta")
             .partitionBy("by4", "by8")
@@ -1504,7 +1506,7 @@ class DeltaSuite
             val e = intercept[AnalysisException] {
               spark
                 .range(100)
-                .select('id, ('id % 4).as("by4"))
+                .select(Symbol("id"), (Symbol("id") % 4).as("by4"))
                 .write
                 .format("delta")
                 .partitionBy("by4", "id")
@@ -1702,14 +1704,14 @@ class DeltaSuite
           }
 
           withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-            testDf('aBc)
+            testDf(Symbol("aBc"))
 
             intercept[AnalysisException] {
-              testDf('abc)
+              testDf(Symbol("abc"))
             }
           }
-          testDf('aBc)
-          testDf('abc)
+          testDf(Symbol("aBc"))
+          testDf(Symbol("abc"))
       }
     }
   }
@@ -3303,7 +3305,7 @@ class DeltaNameColumnMappingSuite extends DeltaSuite with 
DeltaColumnMappingEnab
         // create partitioned table
         spark
           .range(100)
-          .withColumn("part", 'id % 10)
+          .withColumn("part", Symbol("id") % 10)
           .write
           .format("delta")
           .partitionBy("part")
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuerySuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuerySuite.scala
new file mode 100644
index 0000000000..d2265c8a15
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizeMetadataOnlyDeltaQuerySuite.scala
@@ -0,0 +1,1121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.perf
+
+import org.apache.spark.sql.{DataFrame, Dataset, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.delta.{DeletionVectorsTestUtils, 
DeltaColumnMappingEnableIdMode, DeltaColumnMappingEnableNameMode, DeltaLog, 
DeltaTestUtils}
+import org.apache.spark.sql.delta.catalog.DeltaTableV2
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.stats.PrepareDeltaScanBase
+import org.apache.spark.sql.delta.stats.StatisticsCollection
+import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.delta.test.DeltaTestImplicits._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+
+import io.delta.tables.DeltaTable
+import org.apache.hadoop.fs.Path
+import org.scalatest.{BeforeAndAfterAll, Ignore}
+
+import scala.collection.mutable
+
+class OptimizeMetadataOnlyDeltaQuerySuite
+  extends QueryTest
+  with SharedSparkSession
+  with BeforeAndAfterAll
+  with DeltaSQLCommandTest
+  with DeletionVectorsTestUtils {
+  val testTableName = "table_basic"
+  val noStatsTableName = " table_nostats"
+  val mixedStatsTableName = " table_mixstats"
+
+  var dfPart1: DataFrame = null
+  var dfPart2: DataFrame = null
+
+  var totalRows: Long = -1
+  var minId: Long = -1
+  var maxId: Long = -1
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    dfPart1 = generateRowsDataFrame(spark.range(1L, 6L))
+    dfPart2 = generateRowsDataFrame(spark.range(6L, 11L))
+
+    withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false") {
+      
dfPart1.write.format("delta").mode(SaveMode.Overwrite).saveAsTable(noStatsTableName)
+      
dfPart1.write.format("delta").mode(SaveMode.Overwrite).saveAsTable(mixedStatsTableName)
+
+      spark.sql(s"DELETE FROM $noStatsTableName WHERE id = 1")
+      spark.sql(s"DELETE FROM $mixedStatsTableName WHERE id = 1")
+
+      
dfPart2.write.format("delta").mode("append").saveAsTable(noStatsTableName)
+    }
+
+    withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> "true") {
+      
dfPart1.write.format("delta").mode(SaveMode.Overwrite).saveAsTable(testTableName)
+
+      spark.sql(s"DELETE FROM $testTableName WHERE id = 1")
+
+      
dfPart2.write.format("delta").mode(SaveMode.Append).saveAsTable(testTableName)
+      
dfPart2.write.format("delta").mode(SaveMode.Append).saveAsTable(mixedStatsTableName)
+
+      // Run updates to generate more Delta Log and trigger a checkpoint
+      // and make sure stats works after checkpoints
+      for (a <- 1 to 10) {
+        spark.sql(s"UPDATE $testTableName SET data='$a' WHERE id = 7")
+      }
+      spark.sql(s"UPDATE $testTableName SET data=NULL WHERE id = 7")
+
+      // Creates an empty (numRecords == 0) AddFile record
+      generateRowsDataFrame(spark.range(11L, 12L)).write
+        .format("delta")
+        .mode("append")
+        .saveAsTable(testTableName)
+      spark.sql(s"DELETE FROM $testTableName WHERE id = 11")
+    }
+
+    withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> 
"false") {
+      val result = spark.sql(s"SELECT COUNT(*), MIN(id), MAX(id) FROM 
$testTableName").head
+      totalRows = result.getLong(0)
+      minId = result.getLong(1)
+      maxId = result.getLong(2)
+    }
+  }
+
+  /** Class to hold test parameters */
+  case class ScalaTestParams(name: String, queryScala: () => DataFrame, 
expectedPlan: String)
+
+  Seq(
+    new ScalaTestParams(
+      name = "count - simple query",
+      queryScala = () =>
+        spark.read
+          .format("delta")
+          .table(testTableName)
+          .agg(count(col("*"))),
+      expectedPlan = "LocalRelation [none#0L]"
+    ),
+    new ScalaTestParams(
+      name = "min-max - simple query",
+      queryScala = () =>
+        spark.read
+          .format("delta")
+          .table(testTableName)
+          .agg(
+            min(col("id")),
+            max(col("id")),
+            min(col("TinyIntColumn")),
+            max(col("TinyIntColumn")),
+            min(col("SmallIntColumn")),
+            max(col("SmallIntColumn")),
+            min(col("IntColumn")),
+            max(col("IntColumn")),
+            min(col("BigIntColumn")),
+            max(col("BigIntColumn")),
+            min(col("FloatColumn")),
+            max(col("FloatColumn")),
+            min(col("DoubleColumn")),
+            max(col("DoubleColumn")),
+            min(col("DateColumn")),
+            max(col("DateColumn"))
+          ),
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2, none#3, none#4, 
none#5, none#6" +
+        ", none#7, none#8L, none#9L, none#10, none#11, none#12, none#13, 
none#14, none#15]"
+    )
+  )
+    .foreach {
+      testParams =>
+        test(s"optimization supported - Scala - ${testParams.name}") {
+          checkResultsAndOptimizedPlan(testParams.queryScala, 
testParams.expectedPlan)
+        }
+    }
+
+  /** Class to hold test parameters */
+  case class SqlTestParams(
+      name: String,
+      querySql: String,
+      expectedPlan: String,
+      querySetup: Option[Seq[String]] = None)
+
+  Seq(
+    new SqlTestParams(
+      name = "count - simple query",
+      querySql = s"SELECT COUNT(*) FROM $testTableName",
+      expectedPlan = "LocalRelation [none#0L]"),
+    new SqlTestParams(
+      name = "min-max - simple query",
+      querySql = s"SELECT MIN(id), MAX(id)" +
+        s", MIN(TinyIntColumn), MAX(TinyIntColumn)" +
+        s", MIN(SmallIntColumn), MAX(SmallIntColumn)" +
+        s", MIN(IntColumn), MAX(IntColumn)" +
+        s", MIN(BigIntColumn), MAX(BigIntColumn)" +
+        s", MIN(FloatColumn), MAX(FloatColumn)" +
+        s", MIN(DoubleColumn), MAX(DoubleColumn)" +
+        s", MIN(DateColumn), MAX(DateColumn)" +
+        s"FROM $testTableName",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2, none#3, none#4, 
none#5, none#6" +
+        ", none#7, none#8L, none#9L, none#10, none#11, none#12, none#13, 
none#14, none#15]"
+    ),
+    new SqlTestParams(
+      name = "min-max - column name non-matching case",
+      querySql = s"SELECT MIN(ID), MAX(iD)" +
+        s", MIN(tINYINTCOLUMN), MAX(tinyintcolumN)" +
+        s", MIN(sMALLINTCOLUMN), MAX(smallintcolumN)" +
+        s", MIN(iNTCOLUMN), MAX(intcolumN)" +
+        s", MIN(bIGINTCOLUMN), MAX(bigintcolumN)" +
+        s", MIN(fLOATCOLUMN), MAX(floatcolumN)" +
+        s", MIN(dOUBLECOLUMN), MAX(doublecolumN)" +
+        s", MIN(dATECOLUMN), MAX(datecolumN)" +
+        s"FROM $testTableName",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2, none#3, none#4, 
none#5, none#6" +
+        ", none#7, none#8L, none#9L, none#10, none#11, none#12, none#13, 
none#14, none#15]"
+    ),
+    new SqlTestParams(
+      name = "count with column name alias",
+      querySql = s"SELECT COUNT(*) as MyCount FROM $testTableName",
+      expectedPlan = "LocalRelation [none#0L]"),
+    new SqlTestParams(
+      name = "count-min-max with column name alias",
+      querySql = s"SELECT COUNT(*) as MyCount, MIN(id) as MyMinId, MAX(id) as 
MyMaxId" +
+        s" FROM $testTableName",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+    ),
+    new SqlTestParams(
+      name = "count-min-max - table name with alias",
+      querySql = s"SELECT COUNT(*), MIN(id), MAX(id) FROM $testTableName 
MyTable",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+    ),
+    new SqlTestParams(
+      name = "count-min-max - query using time travel - version 0",
+      querySql = s"SELECT COUNT(*), MIN(id), MAX(id) " +
+        s"FROM $testTableName VERSION AS OF 0",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+    ),
+    new SqlTestParams(
+      name = "count-min-max - query using time travel - version 1",
+      querySql = s"SELECT COUNT(*), MIN(id), MAX(id) " +
+        s"FROM $testTableName VERSION AS OF 1",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+    ),
+    new SqlTestParams(
+      name = "count-min-max - query using time travel - version 2",
+      querySql = s"SELECT COUNT(*), MIN(id), MAX(id) " +
+        s"FROM $testTableName VERSION AS OF 2",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+    ),
+    new SqlTestParams(
+      name = "count - sub-query",
+      querySql = s"SELECT (SELECT COUNT(*) FROM $testTableName)",
+      expectedPlan = "Project [scalar-subquery#0 [] AS #0L]\n" +
+        ":  +- LocalRelation [none#0L]\n+- OneRowRelation"
+    ),
+    new SqlTestParams(
+      name = "min - sub-query",
+      querySql = s"SELECT (SELECT MIN(id) FROM $testTableName)",
+      expectedPlan = "Project [scalar-subquery#0 [] AS #0L]\n" +
+        ":  +- LocalRelation [none#0L]\n+- OneRowRelation"
+    ),
+    new SqlTestParams(
+      name = "max - sub-query",
+      querySql = s"SELECT (SELECT MAX(id) FROM $testTableName)",
+      expectedPlan = "Project [scalar-subquery#0 [] AS #0L]\n" +
+        ":  +- LocalRelation [none#0L]\n+- OneRowRelation"
+    ),
+    new SqlTestParams(
+      name = "count - sub-query filter",
+      querySql = s"SELECT 'ABC' WHERE" +
+        s" (SELECT COUNT(*) FROM $testTableName) = $totalRows",
+      expectedPlan = "Project [ABC AS #0]\n+- Filter (scalar-subquery#0 [] = " 
+
+        totalRows + ")\n   :  +- LocalRelation [none#0L]\n   +- OneRowRelation"
+    ),
+    new SqlTestParams(
+      name = "min - sub-query filter",
+      querySql = s"SELECT 'ABC' WHERE" +
+        s" (SELECT MIN(id) FROM $testTableName) = $minId",
+      expectedPlan = "Project [ABC AS #0]\n+- Filter (scalar-subquery#0 [] = " 
+
+        minId + ")\n   :  +- LocalRelation [none#0L]\n   +- OneRowRelation"
+    ),
+    new SqlTestParams(
+      name = "max - sub-query filter",
+      querySql = s"SELECT 'ABC' WHERE" +
+        s" (SELECT MAX(id) FROM $testTableName) = $maxId",
+      expectedPlan = "Project [ABC AS #0]\n+- Filter (scalar-subquery#0 [] = " 
+
+        maxId + ")\n   :  +- LocalRelation [none#0L]\n   +- OneRowRelation"
+    ),
+    // Limit doesn't affect aggregation results
+    new SqlTestParams(
+      name = "count-min-max - query with limit",
+      querySql = s"SELECT COUNT(*), MIN(id), MAX(id) FROM $testTableName LIMIT 
3",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2L]"
+    ),
+    new SqlTestParams(
+      name = "count-min-max - duplicated functions",
+      querySql = s"SELECT COUNT(*), COUNT(*), MIN(id), MIN(id), MAX(id), 
MAX(id)" +
+        s" FROM $testTableName",
+      expectedPlan = "LocalRelation [none#0L, none#1L, none#2L, none#3L, 
none#4L, none#5L]"
+    ),
+    new SqlTestParams(
+      name = "count - empty table",
+      querySetup = Some(Seq("CREATE TABLE TestEmpty (c1 int) USING DELTA")),
+      querySql = "SELECT COUNT(*) FROM TestEmpty",
+      expectedPlan = "LocalRelation [none#0L]"
+    ),
+    /**
+     * Dates are stored as Int in literals. This test make sure Date columns 
works and NULL are
+     * handled correctly
+     */
+    new SqlTestParams(
+      name = "min-max - date columns",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestDateValues" +
+            " (Column1 DATE, Column2 DATE, Column3 DATE) USING DELTA;",
+          "INSERT INTO TestDateValues" +
+            " (Column1, Column2, Column3) VALUES (NULL, current_date(), 
current_date());",
+          "INSERT INTO TestDateValues" +
+            " (Column1, Column2, Column3) VALUES (NULL, NULL, current_date());"
+        )),
+      querySql = "SELECT COUNT(*), MIN(Column1), MAX(Column1), MIN(Column2)" +
+        ", MAX(Column2), MIN(Column3), MAX(Column3) FROM TestDateValues",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4, 
none#5, none#6]"
+    ),
+    new SqlTestParams(
+      name = "min-max - floating point infinity",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestFloatInfinity (FloatColumn Float, DoubleColumn 
Double) USING DELTA",
+          "INSERT INTO TestFloatInfinity (FloatColumn, DoubleColumn) VALUES 
(1, 1);",
+          "INSERT INTO TestFloatInfinity (FloatColumn, DoubleColumn) VALUES 
(NULL, NULL);",
+          "INSERT INTO TestFloatInfinity (FloatColumn, DoubleColumn) VALUES " +
+            "(float('inf'), double('inf'))" +
+            ", (float('+inf'), double('+inf'))" +
+            ", (float('infinity'), double('infinity'))" +
+            ", (float('+infinity'), double('+infinity'))" +
+            ", (float('-inf'), double('-inf'))" +
+            ", (float('-infinity'), double('-infinity'))"
+        )),
+      querySql = "SELECT COUNT(*), MIN(FloatColumn), MAX(FloatColumn), 
MIN(DoubleColumn)" +
+        ", MAX(DoubleColumn) FROM TestFloatInfinity",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+    ),
+    // NaN is larger than any other value, including Infinity
+    new SqlTestParams(
+      name = "min-max - floating point NaN values",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestFloatNaN (FloatColumn Float, DoubleColumn Double) 
USING DELTA",
+          "INSERT INTO TestFloatNaN (FloatColumn, DoubleColumn) VALUES (1, 
1);",
+          "INSERT INTO TestFloatNaN (FloatColumn, DoubleColumn) VALUES (NULL, 
NULL);",
+          "INSERT INTO TestFloatNaN (FloatColumn, DoubleColumn) VALUES " +
+            "(float('inf'), double('inf'))" +
+            ", (float('+inf'), double('+inf'))" +
+            ", (float('infinity'), double('infinity'))" +
+            ", (float('+infinity'), double('+infinity'))" +
+            ", (float('-inf'), double('-inf'))" +
+            ", (float('-infinity'), double('-infinity'))",
+          "INSERT INTO TestFloatNaN (FloatColumn, DoubleColumn) VALUES " +
+            "(float('NaN'), double('NaN'));"
+        )),
+      querySql = "SELECT COUNT(*), MIN(FloatColumn), MAX(FloatColumn), 
MIN(DoubleColumn)" +
+        ", MAX(DoubleColumn) FROM TestFloatNaN",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+    ),
+    new SqlTestParams(
+      name = "min-max - floating point min positive value",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestFloatPrecision (FloatColumn Float, DoubleColumn 
Double) USING DELTA",
+          "INSERT INTO TestFloatPrecision (FloatColumn, DoubleColumn) VALUES " 
+
+            "(CAST('1.4E-45' as FLOAT), CAST('4.9E-324' as DOUBLE))" +
+            ", (CAST('-1.4E-45' as FLOAT), CAST('-4.9E-324' as DOUBLE))" +
+            ", (0, 0);"
+        )),
+      querySql = "SELECT COUNT(*), MIN(FloatColumn), MAX(FloatColumn), 
MIN(DoubleColumn)" +
+        ", MAX(DoubleColumn) FROM TestFloatPrecision",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+    ),
+    new SqlTestParams(
+      name = "min-max - NULL and non-NULL values",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestNullValues (Column1 INT, Column2 INT, Column3 INT) 
USING DELTA",
+          "INSERT INTO TestNullValues (Column1, Column2, Column3) VALUES 
(NULL, 1, 1);",
+          "INSERT INTO TestNullValues (Column1, Column2, Column3) VALUES 
(NULL, NULL, 1);"
+        )),
+      querySql = "SELECT COUNT(*), MIN(Column1), MAX(Column1)," +
+        "MIN(Column2), MAX(Column2) FROM TestNullValues",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+    ),
+    new SqlTestParams(
+      name = "min-max - only NULL values",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestOnlyNullValues (Column1 INT, Column2 INT, Column3 
INT) USING DELTA",
+          "INSERT INTO TestOnlyNullValues (Column1, Column2, Column3) VALUES 
(NULL, NULL, 1);",
+          "INSERT INTO TestOnlyNullValues (Column1, Column2, Column3) VALUES 
(NULL, NULL, 2);"
+        )),
+      querySql = "SELECT COUNT(*), MIN(Column1), MAX(Column1), MIN(Column2), 
MAX(Column2), " +
+        "MIN(Column3), MAX(Column3) FROM TestOnlyNullValues",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4, 
none#5, none#6]"
+    ),
+    new SqlTestParams(
+      name = "min-max - all supported data types",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestMinMaxValues (" +
+            "TINYINTColumn TINYINT, SMALLINTColumn SMALLINT, INTColumn INT, 
BIGINTColumn BIGINT, " +
+            "FLOATColumn FLOAT, DOUBLEColumn DOUBLE, DATEColumn DATE) USING 
DELTA",
+          "INSERT INTO TestMinMaxValues (TINYINTColumn, SMALLINTColumn, 
INTColumn, BIGINTColumn," +
+            " FLOATColumn, DOUBLEColumn, DATEColumn)" +
+            " VALUES (-128, -32768, -2147483648, -9223372036854775808," +
+            " -3.4028235E38, -1.7976931348623157E308, CAST('1582-10-15' AS 
DATE));",
+          "INSERT INTO TestMinMaxValues (TINYINTColumn, SMALLINTColumn, 
INTColumn, BIGINTColumn," +
+            " FLOATColumn, DOUBLEColumn, DATEColumn)" +
+            " VALUES (127, 32767, 2147483647, 9223372036854775807," +
+            " 3.4028235E38, 1.7976931348623157E308, CAST('9999-12-31' AS 
DATE));"
+        )),
+      querySql = "SELECT COUNT(*)," +
+        "MIN(TINYINTColumn), MAX(TINYINTColumn)" +
+        ", MIN(SMALLINTColumn), MAX(SMALLINTColumn)" +
+        ", MIN(INTColumn), MAX(INTColumn)" +
+        ", MIN(BIGINTColumn), MAX(BIGINTColumn)" +
+        ", MIN(FLOATColumn), MAX(FLOATColumn)" +
+        ", MIN(DOUBLEColumn), MAX(DOUBLEColumn)" +
+        ", MIN(DATEColumn), MAX(DATEColumn)" +
+        " FROM TestMinMaxValues",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4, 
none#5, none#6" +
+        ", none#7L, none#8L, none#9, none#10, none#11, none#12, none#13, 
none#14]"
+    ),
+    new SqlTestParams(
+      name = "count-min-max - partitioned table - simple query",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestPartitionedTable (Column1 INT, Column2 INT, 
Column3 INT, Column4 INT)" +
+            " USING DELTA PARTITIONED BY (Column2, Column3)",
+          "INSERT INTO TestPartitionedTable" +
+            " (Column1, Column2, Column3, Column4) VALUES (1, 2, 3, 4);",
+          "INSERT INTO TestPartitionedTable" +
+            " (Column1, Column2, Column3, Column4) VALUES (2, 2, 3, 5);",
+          "INSERT INTO TestPartitionedTable" +
+            " (Column1, Column2, Column3, Column4) VALUES (3, 3, 2, 6);",
+          "INSERT INTO TestPartitionedTable" +
+            " (Column1, Column2, Column3, Column4) VALUES (4, 3, 2, 7);"
+        )),
+      querySql = "SELECT COUNT(*)" +
+        ", MIN(Column1), MAX(Column1)" +
+        ", MIN(Column2), MAX(Column2)" +
+        ", MIN(Column3), MAX(Column3)" +
+        ", MIN(Column4), MAX(Column4)" +
+        " FROM TestPartitionedTable",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3," +
+        " none#4, none#5, none#6, none#7, none#8]"
+    ),
+    /**
+     * Partitioned columns should be able to return MIN and MAX data even when 
there are no column
+     * stats
+     */
+    new SqlTestParams(
+      name = "count-min-max - partitioned table - no stats",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestPartitionedTableNoStats" +
+            " (Column1 INT, Column2 INT, Column3 INT, Column4 INT)" +
+            " USING DELTA PARTITIONED BY (Column2, Column3)" +
+            " TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 0)",
+          "INSERT INTO TestPartitionedTableNoStats" +
+            " (Column1, Column2, Column3, Column4) VALUES (1, 2, 3, 4);",
+          "INSERT INTO TestPartitionedTableNoStats" +
+            " (Column1, Column2, Column3, Column4) VALUES (2, 2, 3, 5);",
+          "INSERT INTO TestPartitionedTableNoStats" +
+            " (Column1, Column2, Column3, Column4) VALUES (3, 3, 2, 6);",
+          "INSERT INTO TestPartitionedTableNoStats" +
+            " (Column1, Column2, Column3, Column4) VALUES (4, 3, 2, 7);"
+        )),
+      querySql = "SELECT COUNT(*)" +
+        ", MIN(Column2), MAX(Column2)" +
+        ", MIN(Column3), MAX(Column3)" +
+        " FROM TestPartitionedTableNoStats",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+    ),
+    new SqlTestParams(
+      name = "min-max - partitioned table - all supported data types",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestAllTypesPartitionedTable (" +
+            "TINYINTColumn TINYINT, SMALLINTColumn SMALLINT, INTColumn INT, 
BIGINTColumn BIGINT, " +
+            "FLOATColumn FLOAT, DOUBLEColumn DOUBLE, DATEColumn DATE, Data 
INT) USING DELTA" +
+            "  PARTITIONED BY (TINYINTColumn, SMALLINTColumn, INTColumn, 
BIGINTColumn," +
+            " FLOATColumn, DOUBLEColumn, DATEColumn)",
+          "INSERT INTO TestAllTypesPartitionedTable" +
+            " (TINYINTColumn, SMALLINTColumn, INTColumn, BIGINTColumn," +
+            " FLOATColumn, DOUBLEColumn, DATEColumn, Data)" +
+            " VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);",
+          "INSERT INTO TestAllTypesPartitionedTable" +
+            " (TINYINTColumn, SMALLINTColumn, INTColumn, BIGINTColumn," +
+            " FLOATColumn, DOUBLEColumn, DATEColumn, Data)" +
+            " VALUES (-128, -32768, -2147483648, -9223372036854775808," +
+            " -3.4028235E38, -1.7976931348623157E308, CAST('1582-10-15' AS 
DATE), 1);"
+        )),
+      querySql = "SELECT COUNT(*)," +
+        "MIN(TINYINTColumn), MAX(TINYINTColumn)" +
+        ", MIN(SMALLINTColumn), MAX(SMALLINTColumn)" +
+        ", MIN(INTColumn), MAX(INTColumn)" +
+        ", MIN(BIGINTColumn), MAX(BIGINTColumn)" +
+        ", MIN(FLOATColumn), MAX(FLOATColumn)" +
+        ", MIN(DOUBLEColumn), MAX(DOUBLEColumn)" +
+        ", MIN(DATEColumn), MAX(DATEColumn)" +
+        ", MIN(Data), MAX(Data)" +
+        " FROM TestAllTypesPartitionedTable",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4, 
none#5, none#6, " +
+        "none#7L, none#8L, none#9, none#10, none#11, none#12, none#13, 
none#14, none#15, none#16]"
+    ),
+    new SqlTestParams(
+      name = "min-max - partitioned table - only NULL values",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestOnlyNullValuesPartitioned (" +
+            "TINYINTColumn TINYINT, SMALLINTColumn SMALLINT, INTColumn INT, 
BIGINTColumn BIGINT, " +
+            "FLOATColumn FLOAT, DOUBLEColumn DOUBLE, DATEColumn DATE, Data 
INT) USING DELTA" +
+            "  PARTITIONED BY (TINYINTColumn, SMALLINTColumn, INTColumn, 
BIGINTColumn," +
+            " FLOATColumn, DOUBLEColumn, DATEColumn)",
+          "INSERT INTO TestOnlyNullValuesPartitioned" +
+            " (TINYINTColumn, SMALLINTColumn, INTColumn, BIGINTColumn," +
+            " FLOATColumn, DOUBLEColumn, DATEColumn, Data)" +
+            " VALUES (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);"
+        )),
+      querySql = "SELECT COUNT(*)," +
+        "MIN(TINYINTColumn), MAX(TINYINTColumn)" +
+        ", MIN(SMALLINTColumn), MAX(SMALLINTColumn)" +
+        ", MIN(INTColumn), MAX(INTColumn)" +
+        ", MIN(BIGINTColumn), MAX(BIGINTColumn)" +
+        ", MIN(FLOATColumn), MAX(FLOATColumn)" +
+        ", MIN(DOUBLEColumn), MAX(DOUBLEColumn)" +
+        ", MIN(DATEColumn), MAX(DATEColumn)" +
+        ", MIN(Data), MAX(Data)" +
+        " FROM TestOnlyNullValuesPartitioned",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4, 
none#5, none#6, " +
+        "none#7L, none#8L, none#9, none#10, none#11, none#12, none#13, 
none#14, none#15, none#16]"
+    ),
+    new SqlTestParams(
+      name = "min-max - partitioned table - NULL and NON-NULL values",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestNullPartitioned (Column1 INT, Column2 INT, Column3 
INT)" +
+            " USING DELTA PARTITIONED BY (Column2, Column3)",
+          "INSERT INTO TestNullPartitioned (Column1, Column2, Column3) VALUES 
(NULL, NULL, 1);",
+          "INSERT INTO TestNullPartitioned (Column1, Column2, Column3) VALUES 
(NULL, NULL, NULL);",
+          "INSERT INTO TestNullPartitioned (Column1, Column2, Column3) VALUES 
(NULL, NULL, 2);"
+        )),
+      querySql = "SELECT COUNT(*), MIN(Column1), MAX(Column1), MIN(Column2), 
MAX(Column2), " +
+        "MIN(Column3), MAX(Column3) FROM TestNullPartitioned",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3, none#4, 
none#5, none#6]"
+    ),
+    new SqlTestParams(
+      name = "min-max - column name containing punctuation",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestPunctuationColumnName (`My.!?Column` INT) USING 
DELTA",
+          "INSERT INTO TestPunctuationColumnName (`My.!?Column`) VALUES (1), 
(2), (3);"
+        )),
+      querySql = "SELECT COUNT(*), MIN(`My.!?Column`), MAX(`My.!?Column`)" +
+        " FROM TestPunctuationColumnName",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2]"
+    ),
+    new SqlTestParams(
+      name = "min-max - partitioned table - column name containing 
punctuation",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestPartitionedPunctuationColumnName (`My.!?Column` 
INT, Data INT)" +
+            " USING DELTA PARTITIONED BY (`My.!?Column`)",
+          "INSERT INTO TestPartitionedPunctuationColumnName" +
+            " (`My.!?Column`, Data) VALUES (1, 1), (2, 1), (3, 1);"
+        )),
+      querySql = "SELECT COUNT(*), MIN(`My.!?Column`), MAX(`My.!?Column`)" +
+        " FROM TestPartitionedPunctuationColumnName",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2]"
+    ),
+    new SqlTestParams(
+      name = "min-max - partitioned table - special characters in column name",
+      querySetup = Some(
+        Seq(
+          "CREATE TABLE TestColumnMappingPartitioned" +
+            " (Column1 INT, Column2 INT, `Column3 .,;{}()\n\t=` INT, Column4 
INT)" +
+            " USING DELTA PARTITIONED BY (Column2, `Column3 .,;{}()\n\t=`)" +
+            " TBLPROPERTIES('delta.columnMapping.mode' = 'name')",
+          "INSERT INTO TestColumnMappingPartitioned" +
+            " (Column1, Column2, `Column3 .,;{}()\n\t=`, Column4)" +
+            " VALUES (1, 2, 3, 4);",
+          "INSERT INTO TestColumnMappingPartitioned" +
+            " (Column1, Column2, `Column3 .,;{}()\n\t=`, Column4)" +
+            " VALUES (2, 2, 3, 5);",
+          "INSERT INTO TestColumnMappingPartitioned" +
+            " (Column1, Column2, `Column3 .,;{}()\n\t=`, Column4)" +
+            " VALUES (3, 3, 2, 6);",
+          "INSERT INTO TestColumnMappingPartitioned" +
+            " (Column1, Column2, `Column3 .,;{}()\n\t=`, Column4)" +
+            " VALUES (4, 3, 2, 7);"
+        )),
+      querySql = "SELECT COUNT(*)" +
+        ", MIN(Column1), MAX(Column1)" +
+        ", MIN(Column2), MAX(Column2)" +
+        ", MIN(`Column3 .,;{}()\n\t=`), MAX(`Column3 .,;{}()\n\t=`)" +
+        ", MIN(Column4), MAX(Column4)" +
+        " FROM TestColumnMappingPartitioned",
+      expectedPlan = "LocalRelation [none#0L, none#1, none#2, none#3," +
+        " none#4, none#5, none#6, none#7, none#8]"
+    )
+  )
+    .foreach {
+      testParams =>
+        test(s"optimization supported - SQL - ${testParams.name}") {
+          if (testParams.querySetup.isDefined) {
+            testParams.querySetup.get.foreach(spark.sql)
+          }
+          checkResultsAndOptimizedPlan(testParams.querySql, 
testParams.expectedPlan)
+        }
+    }
+
+  test("count-min-max - external table") {
+    withTempDir {
+      dir =>
+        val testTablePath = dir.getCanonicalPath
+        dfPart1.write.format("delta").mode("overwrite").save(testTablePath)
+        DeltaTable.forPath(spark, testTablePath).delete("id = 1")
+        dfPart2.write.format("delta").mode(SaveMode.Append).save(testTablePath)
+
+        checkResultsAndOptimizedPlan(
+          s"SELECT COUNT(*), MIN(id), MAX(id) FROM delta.`$testTablePath`",
+          "LocalRelation [none#0L, none#1L, none#2L]")
+    }
+  }
+
+  test("min-max - partitioned column stats disabled") {
+    withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false") {
+      val tableName = "TestPartitionedNoStats"
+
+      spark.sql(
+        s"CREATE TABLE $tableName (Column1 INT, Column2 INT)" +
+          " USING DELTA PARTITIONED BY (Column2)")
+
+      spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 3);")
+      spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (2, 4);")
+
+      // Has no stats, including COUNT
+      checkOptimizationIsNotTriggered(
+        s"SELECT COUNT(*), MIN(Column2), MAX(Column2) FROM $tableName")
+
+      // Should work for partitioned columns even without stats
+      checkResultsAndOptimizedPlan(
+        s"SELECT MIN(Column2), MAX(Column2) FROM $tableName",
+        "LocalRelation [none#0, none#1]")
+    }
+  }
+
+  test("min-max - recompute column missing stats") {
+    val tableName = "TestRecomputeMissingStat"
+
+    spark.sql(
+      s"CREATE TABLE $tableName (Column1 INT, Column2 INT) USING DELTA" +
+        s" TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 0)")
+
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 4);")
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (2, 5);")
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (3, 6);")
+
+    checkOptimizationIsNotTriggered(s"SELECT COUNT(*), MIN(Column1), 
MAX(Column1) FROM $tableName")
+
+    spark.sql(s"ALTER TABLE $tableName SET 
TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 1);")
+
+    StatisticsCollection.recompute(
+      spark,
+      DeltaLog.forTable(spark, TableIdentifier(tableName)),
+      DeltaTableV2(spark, TableIdentifier(tableName)).catalogTable)
+
+    checkResultsAndOptimizedPlan(
+      s"SELECT COUNT(*), MIN(Column1), MAX(Column1) FROM $tableName",
+      "LocalRelation [none#0L, none#1, none#2]")
+
+    checkOptimizationIsNotTriggered(s"SELECT COUNT(*), MIN(Column2), 
MAX(Column2) FROM $tableName")
+
+    spark.sql(s"ALTER TABLE $tableName SET 
TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 2);")
+
+    StatisticsCollection.recompute(
+      spark,
+      DeltaLog.forTable(spark, TableIdentifier(tableName)),
+      DeltaTableV2(spark, TableIdentifier(tableName)).catalogTable)
+
+    checkResultsAndOptimizedPlan(
+      s"SELECT COUNT(*), MIN(Column2), MAX(Column2) FROM $tableName",
+      "LocalRelation [none#0L, none#1, none#2]")
+  }
+
+  test("min-max - recompute added column") {
+    val tableName = "TestRecomputeAddedColumn"
+
+    spark.sql(s"CREATE TABLE $tableName (Column1 INT) USING DELTA")
+    spark.sql(s"INSERT INTO $tableName (Column1) VALUES (1);")
+
+    spark.sql(s"ALTER TABLE $tableName ADD COLUMN (Column2 INT)")
+
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (2, 5);")
+
+    checkResultsAndOptimizedPlan(
+      s"SELECT COUNT(*), MIN(Column1), MAX(Column1) FROM $tableName",
+      "LocalRelation [none#0L, none#1, none#2]")
+
+    checkOptimizationIsNotTriggered(
+      s"SELECT COUNT(*), " +
+        s"MIN(Column1), MAX(Column1), MIN(Column2), MAX(Column2) FROM 
$tableName")
+
+    StatisticsCollection.recompute(
+      spark,
+      DeltaLog.forTable(spark, TableIdentifier(tableName)),
+      DeltaTableV2(spark, TableIdentifier(tableName)).catalogTable)
+
+    checkResultsAndOptimizedPlan(
+      s"SELECT COUNT(*), " +
+        s"MIN(Column1), MAX(Column1), MIN(Column2), MAX(Column2) FROM 
$tableName",
+      "LocalRelation [none#0L, none#1, none#2, none#3, none#4]"
+    )
+  }
+
+  test("Select Count: snapshot isolation") {
+    sql(s"CREATE TABLE TestSnapshotIsolation (c1 int) USING DELTA")
+    spark.sql("INSERT INTO TestSnapshotIsolation VALUES (1)")
+
+    val scannedVersions = mutable.ArrayBuffer[Long]()
+    val query = "SELECT (SELECT COUNT(*) FROM TestSnapshotIsolation), " +
+      "(SELECT COUNT(*) FROM TestSnapshotIsolation)"
+
+    checkResultsAndOptimizedPlan(
+      query,
+      "Project [scalar-subquery#0 [] AS #0L, scalar-subquery#0 [] AS #1L]\n" +
+        ":  :- LocalRelation [none#0L]\n" +
+        ":  +- LocalRelation [none#0L]\n" +
+        "+- OneRowRelation"
+    )
+
+    PrepareDeltaScanBase.withCallbackOnGetDeltaScanGenerator(
+      scanGenerator => {
+        // Record the scanned version and make changes to the table. We will 
verify changes in the
+        // middle of the query are not visible to the query.
+        scannedVersions += scanGenerator.snapshotToScan.version
+        // Insert a row after each call to get scanGenerator
+        // to test if the count doesn't change in the same query
+        spark.sql("INSERT INTO TestSnapshotIsolation VALUES (1)")
+      }) {
+      val result = spark.sql(query).collect()(0)
+      val c1 = result.getLong(0)
+      val c2 = result.getLong(1)
+      assertResult(c1, "Snapshot isolation should guarantee the results are 
always the same")(c2)
+      assert(
+        scannedVersions.toSet.size == 1,
+        s"Scanned multiple versions of the same table in one query: 
${scannedVersions.toSet}")
+    }
+  }
+
+  test(".collect() and .show() both use this optimization") {
+    var resultRow: Row = null
+    withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> 
"false") {
+      resultRow = spark.sql(s"SELECT COUNT(*), MIN(id), MAX(id) FROM 
$testTableName").head
+    }
+
+    val totalRows = resultRow.getLong(0)
+    val minId = resultRow.getLong(1)
+    val maxId = resultRow.getLong(2)
+
+    val collectPlans = DeltaTestUtils.withLogicalPlansCaptured(spark, 
optimizedPlan = true) {
+      spark.sql(s"SELECT COUNT(*) FROM $testTableName").collect()
+    }
+    val collectResultData = collectPlans.collect { case x: LocalRelation => 
x.data }
+    assert(collectResultData.size === 1)
+    assert(collectResultData.head.head.getLong(0) === totalRows)
+
+    val showPlans = DeltaTestUtils.withLogicalPlansCaptured(spark, 
optimizedPlan = true) {
+      spark.sql(s"SELECT COUNT(*) FROM $testTableName").show()
+    }
+    val showResultData = showPlans.collect { case x: LocalRelation => x.data }
+    assert(showResultData.size === 1)
+    assert(showResultData.head.head.getString(0).toLong === totalRows)
+
+    val showMultAggPlans = DeltaTestUtils.withLogicalPlansCaptured(spark, 
optimizedPlan = true) {
+      spark.sql(s"SELECT COUNT(*), MIN(id), MAX(id) FROM 
$testTableName").show()
+    }
+
+    val showMultipleAggResultData = showMultAggPlans.collect { case x: 
LocalRelation => x.data }
+    assert(showMultipleAggResultData.size === 1)
+    val firstRow = showMultipleAggResultData.head.head
+    assert(firstRow.getString(0).toLong === totalRows)
+    assert(firstRow.getString(1).toLong === minId)
+    assert(firstRow.getString(2).toLong === maxId)
+  }
+
+  test("min-max .show() - only NULL values") {
+    val tableName = "TestOnlyNullValuesShow"
+
+    spark.sql(s"CREATE TABLE $tableName (Column1 INT) USING DELTA")
+    spark.sql(s"INSERT INTO $tableName (Column1) VALUES (NULL);")
+
+    val showMultAggPlans = DeltaTestUtils.withLogicalPlansCaptured(spark, 
optimizedPlan = true) {
+      spark.sql(s"SELECT MIN(Column1), MAX(Column1) FROM $tableName").show()
+    }
+
+    val showMultipleAggResultData = showMultAggPlans.collect { case x: 
LocalRelation => x.data }
+    assert(showMultipleAggResultData.size === 1)
+    val firstRow = showMultipleAggResultData.head.head
+    assert(firstRow.getString(0) === "NULL")
+    assert(firstRow.getString(1) === "NULL")
+  }
+
+  test("min-max .show() - Date Columns") {
+    val tableName = "TestDateColumnsShow"
+
+    spark.sql(s"CREATE TABLE $tableName (Column1 DATE, Column2 DATE) USING 
DELTA")
+    spark.sql(
+      s"INSERT INTO $tableName (Column1, Column2) VALUES " +
+        s"(CAST('1582-10-15' AS DATE), NULL);")
+
+    val showMultAggPlans = DeltaTestUtils.withLogicalPlansCaptured(spark, 
optimizedPlan = true) {
+      spark.sql(s"SELECT MIN(Column1), MIN(Column2) FROM $tableName").show()
+    }
+
+    val showMultipleAggResultData = showMultAggPlans.collect { case x: 
LocalRelation => x.data }
+    assert(showMultipleAggResultData.size === 1)
+    val firstRow = showMultipleAggResultData.head.head
+    assert(firstRow.getString(0) === "1582-10-15")
+    assert(firstRow.getString(1) === "NULL")
+  }
+
+  test("count - dv-enabled") {
+    withTempDir {
+      dir =>
+        val tempPath = dir.getCanonicalPath
+        spark.range(1, 10, 1, 1).write.format("delta").save(tempPath)
+
+        enableDeletionVectorsInTable(new Path(tempPath), true)
+        DeltaTable.forPath(spark, tempPath).delete("id = 1")
+        assert(!getFilesWithDeletionVectors(DeltaLog.forTable(spark, new 
Path(tempPath))).isEmpty)
+
+        checkResultsAndOptimizedPlan(
+          s"SELECT COUNT(*) FROM delta.`$tempPath`",
+          "LocalRelation [none#0L]")
+    }
+  }
+
+  test("count - zero rows AddFile") {
+    withTempDir {
+      dir =>
+        val tempPath = dir.getCanonicalPath
+        val df = spark.range(1, 10)
+        val expectedResult = df.count()
+        df.write.format("delta").save(tempPath)
+
+        // Creates AddFile entries with non-existing files
+        // The query should read only the delta log and not the parquet files
+        val log = DeltaLog.forTable(spark, tempPath)
+        val txn = log.startTransaction()
+        txn.commitManually(
+          DeltaTestUtils.createTestAddFile(
+            encodedPath = "1.parquet",
+            stats = "{\"numRecords\": 0}"),
+          DeltaTestUtils.createTestAddFile(
+            encodedPath = "2.parquet",
+            stats = "{\"numRecords\": 0}"),
+          DeltaTestUtils.createTestAddFile(encodedPath = "3.parquet", stats = 
"{\"numRecords\": 0}")
+        )
+
+        withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> 
"true") {
+          val queryDf = spark.sql(s"SELECT COUNT(*) FROM delta.`$tempPath`")
+          val optimizedPlan = 
queryDf.queryExecution.optimizedPlan.canonicalized.toString()
+
+          assert(queryDf.head().getLong(0) === expectedResult)
+
+          assertResult("LocalRelation [none#0L]") {
+            optimizedPlan.trim
+          }
+        }
+    }
+  }
+
+  // Tests to validate the optimizer won't incorrectly change queries it can't 
correctly handle
+
+  Seq(
+    (s"SELECT COUNT(*) FROM $mixedStatsTableName", "missing stats"),
+    (s"SELECT COUNT(*) FROM $noStatsTableName", "missing stats"),
+    (s"SELECT MIN(id), MAX(id) FROM $mixedStatsTableName", "missing stats"),
+    (s"SELECT MIN(id), MAX(id) FROM $noStatsTableName", "missing stats"),
+    (s"SELECT group, COUNT(*) FROM $testTableName GROUP BY group", "group by"),
+    (s"SELECT group, MIN(id), MAX(id) FROM $testTableName GROUP BY group", 
"group by"),
+    (s"SELECT COUNT(*) + 1 FROM $testTableName", "plus literal"),
+    (s"SELECT MAX(id) + 1 FROM $testTableName", "plus literal"),
+    (s"SELECT COUNT(DISTINCT data) FROM $testTableName", "distinct count"),
+    (s"SELECT COUNT(*) FROM $testTableName WHERE id > 0", "filter"),
+    (s"SELECT MAX(id) FROM $testTableName WHERE id > 0", "filter"),
+    (s"SELECT (SELECT COUNT(*) FROM $testTableName WHERE id > 0)", "sub-query 
with filter"),
+    (s"SELECT (SELECT MAX(id) FROM $testTableName WHERE id > 0)", "sub-query 
with filter"),
+    (s"SELECT COUNT(ALL data) FROM $testTableName", "count non-null"),
+    (s"SELECT COUNT(data) FROM $testTableName", "count non-null"),
+    (s"SELECT COUNT(*) FROM $testTableName A, $testTableName B", "join"),
+    (s"SELECT MAX(A.id) FROM $testTableName A, $testTableName B", "join"),
+    (s"SELECT COUNT(*) OVER() FROM $testTableName LIMIT 1", "over"),
+    (s"SELECT MAX(id) OVER() FROM $testTableName LIMIT 1", "over")
+  )
+    .foreach {
+      case (query, desc) =>
+        test(s"optimization not supported - $desc - $query") {
+          checkOptimizationIsNotTriggered(query)
+        }
+    }
+
+  test("optimization not supported - min-max unsupported data types") {
+    val tableName = "TestUnsupportedTypes"
+
+    spark.sql(
+      s"CREATE TABLE $tableName " +
+        s"(STRINGColumn STRING, DECIMALColumn DECIMAL(38,0)" +
+        s", TIMESTAMPColumn TIMESTAMP, BINARYColumn BINARY, " +
+        s"BOOLEANColumn BOOLEAN, ARRAYColumn ARRAY<INT>, MAPColumn MAP<INT, 
INT>, " +
+        s"STRUCTColumn STRUCT<Id: INT, Name: STRING>) USING DELTA")
+
+    spark.sql(s"INSERT INTO $tableName" +
+      s" (STRINGColumn, DECIMALColumn, TIMESTAMPColumn, BINARYColumn" +
+      s", BOOLEANColumn, ARRAYColumn, MAPColumn, STRUCTColumn) VALUES " +
+      s"('A', -99999999999999999999999999999999999999, CAST('1900-01-01 
00:00:00.0' AS TIMESTAMP)" +
+      s", X'1ABF', TRUE, ARRAY(1, 2, 3), MAP(1, 10, 2, 20), STRUCT(1, 
'Spark'));")
+
+    val columnNames = List(
+      "STRINGColumn",
+      "DECIMALColumn",
+      "TIMESTAMPColumn",
+      "BINARYColumn",
+      "BOOLEANColumn",
+      "ARRAYColumn",
+      "STRUCTColumn")
+
+    columnNames.foreach(
+      colName => checkOptimizationIsNotTriggered(s"SELECT MAX($colName) FROM 
$tableName"))
+  }
+
+  test("optimization not supported - min-max column without stats") {
+    val tableName = "TestColumnWithoutStats"
+
+    spark.sql(
+      s"CREATE TABLE $tableName (Column1 INT, Column2 INT) USING DELTA" +
+        s" TBLPROPERTIES('delta.dataSkippingNumIndexedCols' = 1)")
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 2);")
+
+    checkOptimizationIsNotTriggered(s"SELECT MAX(Column2) FROM $tableName")
+  }
+
+  // For empty tables the stats won't be found and the query should not be 
optimized
+  test("optimization not supported - min-max empty table") {
+    val tableName = "TestMinMaxEmptyTable"
+
+    spark.sql(s"CREATE TABLE $tableName (Column1 INT) USING DELTA")
+
+    checkOptimizationIsNotTriggered(s"SELECT MIN(Column1), MAX(Column1) FROM 
$tableName")
+  }
+
+  test("optimization not supported - min-max dv-enabled") {
+    withTempDir {
+      dir =>
+        val tempPath = dir.getCanonicalPath
+        spark.range(1, 10, 1, 1).write.format("delta").save(tempPath)
+        val querySql = s"SELECT MIN(id), MAX(id) FROM delta.`$tempPath`"
+        checkResultsAndOptimizedPlan(querySql, "LocalRelation [none#0L, 
none#1L]")
+
+        enableDeletionVectorsInTable(new Path(tempPath), true)
+        DeltaTable.forPath(spark, tempPath).delete("id = 1")
+        assert(!getFilesWithDeletionVectors(DeltaLog.forTable(spark, new 
Path(tempPath))).isEmpty)
+        checkOptimizationIsNotTriggered(querySql)
+    }
+  }
+
+  test("optimization not supported - filter on partitioned column") {
+    val tableName = "TestPartitionedFilter"
+
+    spark.sql(
+      s"CREATE TABLE $tableName (Column1 INT, Column2 INT)" +
+        " USING DELTA PARTITIONED BY (Column2)")
+
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (1, 2);")
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (2, 2);")
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (3, 3);")
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2) VALUES (4, 3);")
+
+    // Filter by partition column
+    checkOptimizationIsNotTriggered(
+      "SELECT COUNT(*)" +
+        ", MIN(Column1), MAX(Column1)" +
+        ", MIN(Column2), MAX(Column2)" +
+        s" FROM $tableName WHERE Column2 = 2")
+
+    // Filter both partition and data columns
+    checkOptimizationIsNotTriggered(
+      "SELECT COUNT(*)" +
+        ", MIN(Column1), MAX(Column1)" +
+        ", MIN(Column2), MAX(Column2)" +
+        s" FROM $tableName WHERE Column1 = 2 AND Column2 = 2")
+  }
+
+  test("optimization not supported - sub-query with column alias") {
+    val tableName = "TestColumnAliasSubQuery"
+
+    spark.sql(s"CREATE TABLE $tableName (Column1 INT, Column2 INT, Column3 
INT) USING DELTA")
+
+    spark.sql(s"INSERT INTO $tableName (Column1, Column2, Column3) VALUES (1, 
2, 3);")
+
+    checkOptimizationIsNotTriggered(
+      s"SELECT MAX(Column2) FROM (SELECT Column1 AS Column2 FROM $tableName)")
+
+    checkOptimizationIsNotTriggered(
+      s"SELECT MAX(Column1), MAX(Column2), MAX(Column3) FROM " +
+        s"(SELECT Column1 AS Column2, Column2 AS Column3, Column3 AS Column1 
FROM $tableName)")
+  }
+
+  test("optimization not supported - nested columns") {
+    val tableName = "TestNestedColumns"
+
+    spark.sql(
+      s"CREATE TABLE $tableName " +
+        s"(Column1 STRUCT<Id: INT>, " +
+        s"`Column1.Id` INT) USING DELTA")
+
+    spark.sql(
+      s"INSERT INTO $tableName" +
+        s" (Column1, `Column1.Id`) VALUES " +
+        s"(STRUCT(1), 2);")
+
+    // Nested Column
+    checkOptimizationIsNotTriggered(s"SELECT MAX(Column1.Id) FROM $tableName")
+
+    checkOptimizationIsNotTriggered(s"SELECT MAX(Column1.Id) AS XYZ FROM 
$tableName")
+
+    // Validate the scenario where all the columns are read
+    // since it creates a different query plan
+    checkOptimizationIsNotTriggered(
+      s"SELECT MAX(Column1.Id), " +
+        s"MAX(`Column1.Id`) FROM $tableName")
+
+    // The optimization for columns with dots should still work
+    checkResultsAndOptimizedPlan(
+      s"SELECT MAX(`Column1.Id`) FROM $tableName",
+      "LocalRelation [none#0]")
+  }
+
+  private def generateRowsDataFrame(source: Dataset[java.lang.Long]): 
DataFrame = {
+    import testImplicits._
+
+    source.select(
+      Symbol("id"),
+      Symbol("id").cast("tinyint").as(Symbol("TinyIntColumn")),
+      Symbol("id").cast("smallint").as(Symbol("SmallIntColumn")),
+      Symbol("id").cast("int").as(Symbol("IntColumn")),
+      Symbol("id").cast("bigint").as(Symbol("BigIntColumn")),
+      (Symbol("id") / 3.3).cast("float").as(Symbol("FloatColumn")),
+      (Symbol("id") / 3.3).cast("double").as(Symbol("DoubleColumn")),
+      date_add(lit("2022-08-31").cast("date"), 
col("id").cast("int")).as(Symbol("DateColumn")),
+      (Symbol("id") % 2).cast("integer").as(Symbol("group")),
+      Symbol("id").cast("string").as(Symbol("data"))
+    )
+  }
+
+  /**
+   * Validate the results of the query is the same with the flag
+   * DELTA_OPTIMIZE_METADATA_QUERY_ENABLED enabled and disabled. And the 
expected Optimized Query
+   * Plan with the flag enabled
+   */
+  private def checkResultsAndOptimizedPlan(query: String, 
expectedOptimizedPlan: String): Unit = {
+    checkResultsAndOptimizedPlan(() => spark.sql(query), expectedOptimizedPlan)
+  }
+
+  /**
+   * Validate the results of the query is the same with the flag
+   * DELTA_OPTIMIZE_METADATA_QUERY_ENABLED enabled and disabled. And the 
expected Optimized Query
+   * Plan with the flag enabled.
+   */
+  private def checkResultsAndOptimizedPlan(
+      generateQueryDf: () => DataFrame,
+      expectedOptimizedPlan: String): Unit = {
+    var expectedAnswer: scala.Seq[Row] = null
+    withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> 
"false") {
+      expectedAnswer = generateQueryDf().collect()
+    }
+
+    withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> 
"true") {
+      val queryDf = generateQueryDf()
+      val optimizedPlan = 
queryDf.queryExecution.optimizedPlan.canonicalized.toString()
+
+      assert(queryDf.collect().sameElements(expectedAnswer))
+
+      assertResult(expectedOptimizedPlan.trim) {
+        optimizedPlan.trim
+      }
+    }
+  }
+
+  /**
+   * Verify the query plans and results are the same with/without metadata 
query optimization. This
+   * method can be used to verify cases that we shouldn't trigger optimization 
or cases that we can
+   * potentially improve.
+   * @param query
+   */
+  private def checkOptimizationIsNotTriggered(query: String): Unit = {
+    var expectedOptimizedPlan: String = null
+    var expectedAnswer: scala.Seq[Row] = null
+
+    withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> 
"false") {
+
+      val generateQueryDf = spark.sql(query)
+      expectedOptimizedPlan = 
generateQueryDf.queryExecution.optimizedPlan.canonicalized.toString()
+      expectedAnswer = generateQueryDf.collect()
+    }
+
+    withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> 
"true") {
+
+      val generateQueryDf = spark.sql(query)
+      val optimizationEnabledQueryPlan =
+        generateQueryDf.queryExecution.optimizedPlan.canonicalized.toString()
+
+      assert(generateQueryDf.collect().sameElements(expectedAnswer))
+
+      assertResult(expectedOptimizedPlan) {
+        optimizationEnabledQueryPlan
+      }
+    }
+  }
+}
+
+trait OptimizeMetadataOnlyDeltaQueryColumnMappingSuiteBase
+  extends DeltaColumnMappingSelectedTestMixin {
+  override protected def runAllTests = true
+}
+
+@Ignore // FIXME: ID-based mapping.
+class OptimizeMetadataOnlyDeltaQueryIdColumnMappingSuite
+  extends OptimizeMetadataOnlyDeltaQuerySuite
+  with DeltaColumnMappingEnableIdMode
+  with OptimizeMetadataOnlyDeltaQueryColumnMappingSuiteBase
+
+class OptimizeMetadataOnlyDeltaQueryNameColumnMappingSuite
+  extends OptimizeMetadataOnlyDeltaQuerySuite
+  with DeltaColumnMappingEnableNameMode
+  with OptimizeMetadataOnlyDeltaQueryColumnMappingSuiteBase
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizedWritesSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizedWritesSuite.scala
new file mode 100644
index 0000000000..d0156e8d6f
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/perf/OptimizedWritesSuite.scala
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.perf
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaOptions, 
DeltaTestUtils}
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.delta.util.JsonUtils
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{LongType, StructType}
+
+import com.databricks.spark.util.Log4jUsageLogger
+
+import java.io.File
+
+import scala.language.implicitConversions
+
+abstract class OptimizedWritesSuiteBase extends QueryTest with 
SharedSparkSession {
+
+  import testImplicits._
+
+  protected def writeTest(testName: String)(f: String => Unit): Unit = {
+    test(testName) {
+      withTempDir {
+        dir =>
+          withSQLConf(DeltaConfigs.OPTIMIZE_WRITE.defaultTablePropertyKey -> 
"true") {
+            f(dir.getCanonicalPath)
+          }
+      }
+    }
+  }
+
+  protected def checkResult(df: DataFrame, numFileCheck: Long => Boolean, dir: 
String): Unit = {
+    val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, dir)
+    val files = snapshot.numOfFiles
+    assert(numFileCheck(files), s"file check failed: received $files")
+
+    checkAnswer(
+      spark.read.format("delta").load(dir),
+      df
+    )
+  }
+
+  implicit protected def fileToPathString(dir: File): String = 
dir.getCanonicalPath
+
+  writeTest("non-partitioned write - table config") {
+    dir =>
+      val df = spark.range(0, 100, 1, 4).toDF()
+      df.write.format("delta").save(dir)
+      checkResult(df, numFileCheck = _ === 1, dir)
+  }
+
+  test("non-partitioned write - table config compatibility") {
+    withTempDir {
+      tempDir =>
+        val dir = tempDir.getCanonicalPath
+        // When table property is not set, we use session conf value.
+        // Writes 1 file instead of 4 when OW is enabled
+        withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED.key -> "true") {
+          val df = spark.range(0, 100, 1, 4).toDF()
+          val commitStats = Log4jUsageLogger
+            .track {
+              df.write.format("delta").mode("append").save(dir)
+            }
+            .filter(_.tags.get("opType") === Some("delta.commit.stats"))
+          assert(commitStats.length >= 1)
+          checkResult(df, numFileCheck = _ === 1, dir)
+        }
+    }
+
+    // Test order of precedence between table property 
"delta.autoOptimize.optimizeWrite" and
+    // session conf.
+    for {
+      sqlConf <- DeltaTestUtils.BOOLEAN_DOMAIN
+      tableProperty <- DeltaTestUtils.BOOLEAN_DOMAIN
+    } {
+      withTempDir {
+        tempDir =>
+          withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED.key -> 
sqlConf.toString) {
+            val dir = tempDir.getCanonicalPath
+            // Write one file to be able to set tblproperties
+            spark
+              .range(10)
+              .coalesce(1)
+              .write
+              .format("delta")
+              .mode("append")
+              .save(dir)
+
+            sql(
+              s"ALTER TABLE delta.`$dir` SET TBLPROPERTIES" +
+                s" (delta.autoOptimize.optimizeWrite = 
${tableProperty.toString})")
+
+            val df = spark.range(0, 100, 1, 4).toDF()
+            // OW adds one file vs non-OW adds 4 files
+            val expectedNumberOfFiles = if (sqlConf) 2 else 5
+            df.write.format("delta").mode("append").save(dir)
+            checkResult(
+              df.union(spark.range(10).toDF()),
+              numFileCheck = _ === expectedNumberOfFiles,
+              dir)
+          }
+      }
+    }
+  }
+
+  test("non-partitioned write - data frame config") {
+    withTempDir {
+      dir =>
+        val df = spark.range(0, 100, 1, 4).toDF()
+        df.write
+          .format("delta")
+          .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "true")
+          .save(dir)
+        checkResult(df, numFileCheck = _ === 1, dir)
+    }
+  }
+
+  writeTest("non-partitioned write - data frame config trumps table config") {
+    dir =>
+      val df = spark.range(0, 100, 1, 4).toDF()
+      df.write.format("delta").option(DeltaOptions.OPTIMIZE_WRITE_OPTION, 
"false").save(dir)
+      checkResult(df, numFileCheck = _ === 4, dir)
+  }
+
+  writeTest("partitioned write - table config") {
+    dir =>
+      val df = spark
+        .range(0, 100, 1, 4)
+        .withColumn("part", Symbol("id") % 5)
+
+      df.write.partitionBy("part").format("delta").save(dir)
+      checkResult(df, numFileCheck = _ <= 5, dir)
+  }
+
+  test("partitioned write - data frame config") {
+    withTempDir {
+      dir =>
+        val df = spark
+          .range(0, 100, 1, 4)
+          .withColumn("part", Symbol("id") % 5)
+
+        df.write
+          .partitionBy("part")
+          .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "true")
+          .format("delta")
+          .save(dir)
+
+        checkResult(df, numFileCheck = _ <= 5, dir)
+    }
+  }
+
+  writeTest("partitioned write - data frame config trumps table config") {
+    dir =>
+      val df = spark
+        .range(0, 100, 1, 4)
+        .withColumn("part", Symbol("id") % 5)
+
+      df.write
+        .partitionBy("part")
+        .format("delta")
+        .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "false")
+        .save(dir)
+
+      checkResult(df, numFileCheck = _ === 20, dir)
+  }
+
+  writeTest("multi-partitions - table config") {
+    dir =>
+      val df = spark
+        .range(0, 100, 1, 4)
+        .withColumn("part", Symbol("id") % 5)
+        .withColumn("part2", (Symbol("id") / 20).cast("int"))
+
+      df.write.partitionBy("part", "part2").format("delta").save(dir)
+
+      checkResult(df, numFileCheck = _ <= 25, dir)
+  }
+
+  test("multi-partitions - data frame config") {
+    withTempDir {
+      dir =>
+        val df = spark
+          .range(0, 100, 1, 4)
+          .withColumn("part", Symbol("id") % 5)
+          .withColumn("part2", (Symbol("id") / 20).cast("int"))
+
+        df.write
+          .partitionBy("part", "part2")
+          .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "true")
+          .format("delta")
+          .save(dir)
+
+        checkResult(df, numFileCheck = _ <= 25, dir)
+    }
+  }
+
+  test("optimized writes used if enabled when a stream starts") {
+    withTempDir {
+      f =>
+        // Write some data into the table so it already exists
+        Seq(1).toDF().write.format("delta").save(f)
+
+        // Use optimized writes just when starting the stream
+        val inputData = MemoryStream[Int]
+
+        val df = inputData.toDF().repartition(10)
+        var stream: StreamingQuery = null
+
+        // Start the stream with optimized writes enabled, and then reset the 
conf
+        withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED.key -> "true") {
+          val checkpoint = new File(f, "checkpoint").getCanonicalPath
+          stream = df.writeStream.format("delta").option("checkpointLocation", 
checkpoint).start(f)
+        }
+        try {
+          inputData.addData(1 to 100)
+          stream.processAllAvailable()
+        } finally {
+          stream.stop()
+        }
+
+        val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, f)
+        assert(snapshot.numOfFiles == 2, "Optimized writes were not used")
+    }
+  }
+
+  writeTest("multi-partitions - data frame config trumps table config") {
+    dir =>
+      val df = spark
+        .range(0, 100, 1, 4)
+        .withColumn("part", Symbol("id") % 5)
+        .withColumn("part2", (Symbol("id") / 20).cast("int"))
+
+      df.write
+        .partitionBy("part", "part2")
+        .option(DeltaOptions.OPTIMIZE_WRITE_OPTION, "false")
+        .format("delta")
+        .save(dir)
+
+      checkResult(df, numFileCheck = _ > 25, dir)
+  }
+
+  writeTest("optimize should not leverage optimized writes") {
+    dir =>
+      val df = spark.range(0, 10, 1, 2)
+
+      val logs1 = Log4jUsageLogger
+        .track {
+          df.write.format("delta").mode("append").save(dir)
+          df.write.format("delta").mode("append").save(dir)
+        }
+        .filter(_.metric == "tahoeEvent")
+
+      assert(logs1.count(_.tags.get("opType") === 
Some("delta.optimizeWrite.planned")) === 2)
+
+      val logs2 = Log4jUsageLogger
+        .track {
+          sql(s"optimize delta.`$dir`")
+        }
+        .filter(_.metric == "tahoeEvent")
+
+      assert(logs2.count(_.tags.get("opType") === 
Some("delta.optimizeWrite.planned")) === 0)
+  }
+
+  writeTest("map task with more partitions than target shuffle blocks - 
non-partitioned") {
+    dir =>
+      val df = spark.range(0, 20, 1, 4)
+
+      withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS.key -> "2") 
{
+        df.write.format("delta").mode("append").save(dir)
+      }
+
+      checkResult(df.toDF(), numFileCheck = _ === 1, dir)
+  }
+
+  writeTest("map task with more partitions than target shuffle blocks - 
partitioned") {
+    dir =>
+      val df = spark.range(0, 20, 1, 4).withColumn("part", Symbol("id") % 5)
+
+      withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS.key -> "2") 
{
+        df.write.format("delta").partitionBy("part").mode("append").save(dir)
+      }
+
+      checkResult(df, numFileCheck = _ === 5, dir)
+  }
+
+  writeTest("zero partition dataframe write") {
+    dir =>
+      val df = spark.range(0, 20, 1, 4).withColumn("part", Symbol("id") % 5)
+      df.write.format("delta").partitionBy("part").mode("append").save(dir)
+      val schema = new StructType().add("id", LongType).add("part", LongType)
+
+      spark
+        .createDataFrame(sparkContext.emptyRDD[Row], schema)
+        .write
+        .format("delta")
+        .partitionBy("part")
+        .mode("append")
+        .save(dir)
+
+      checkResult(df, numFileCheck = _ === 5, dir)
+  }
+
+  test("OptimizedWriterBlocks is not serializable") {
+    assert(
+      !new OptimizedWriterBlocks(Array.empty).isInstanceOf[Serializable],
+      "The blocks should not be serializable so that they don't get shipped to 
executors."
+    )
+  }
+
+  writeTest("single partition dataframe write") {
+    dir =>
+      val df = spark.range(0, 20).repartition(1).withColumn("part", 
Symbol("id") % 5)
+      val logs1 = Log4jUsageLogger
+        .track {
+          df.write.format("delta").partitionBy("part").mode("append").save(dir)
+        }
+        .filter(_.metric == "tahoeEvent")
+
+      // doesn't use optimized writes
+      assert(logs1.count(_.tags.get("opType") === 
Some("delta.optimizeWrite.planned")) === 0)
+
+      checkResult(df, numFileCheck = _ === 5, dir)
+  }
+
+  writeTest("do not create tons of shuffle partitions during optimized 
writes") {
+    dir =>
+      // 50M shuffle blocks would've led to 25M shuffle partitions
+      withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS.key -> 
"50000000") {
+        val df = spark.range(0, 20).repartition(2).withColumn("part", 
Symbol("id") % 5)
+        val logs1 = Log4jUsageLogger
+          .track {
+            
df.write.format("delta").partitionBy("part").mode("append").save(dir)
+          }
+          .filter(_.metric == "tahoeEvent")
+          .filter(_.tags.get("opType") === Some("delta.optimizeWrite.planned"))
+
+        assert(logs1.length === 1)
+        val blob = JsonUtils.fromJson[Map[String, Any]](logs1.head.blob)
+        assert(blob("outputPartitions") === 5)
+        assert(blob("originalPartitions") === 2)
+        assert(blob("numShuffleBlocks") === 50000000)
+        assert(
+          blob("shufflePartitions") ===
+            
spark.conf.get(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS))
+
+        checkResult(df, numFileCheck = _ === 5, dir)
+      }
+  }
+}
+
+class OptimizedWritesSuite extends OptimizedWritesSuiteBase with 
DeltaSQLCommandTest {}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to