This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9756fbaa8c9 [SPARK-38573][SQL] Support Auto Partition Statistics 
Collection
9756fbaa8c9 is described below

commit 9756fbaa8c9c4648e8c40a2e687295502d7b1196
Author: Kazuyuki Tanimura <ktanim...@apple.com>
AuthorDate: Fri Apr 15 09:18:07 2022 -0700

    [SPARK-38573][SQL] Support Auto Partition Statistics Collection
    
    ### What changes were proposed in this pull request?
    Currently https://issues.apache.org/jira/browse/SPARK-21127 supports 
storing the aggregated stats automatically at table level with the config 
`spark.sql.statistics.size.autoUpdate.enabled`.
    
    This PR proposes to update partition statistics automatically at the same 
time when the `spark.sql.statistics.size.autoUpdate.enabled` config is enabled.
    
    ### Why are the changes needed?
    Supporting partition level stats are useful to know which partitions are 
outliers (skewed partition) and query optimizer works better with partition 
level stats in case of partition pruning.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Updated unit tests
    
    Closes #36067 from kazuyukitanimura/SPARK-38573.
    
    Authored-by: Kazuyuki Tanimura <ktanim...@apple.com>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../execution/command/AnalyzeColumnCommand.scala   |  2 +-
 .../spark/sql/execution/command/CommandUtils.scala | 28 +++++---
 .../apache/spark/sql/execution/command/ddl.scala   |  1 +
 .../apache/spark/sql/hive/StatisticsSuite.scala    | 83 +++++++++++++++++++++-
 4 files changed, 103 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 5cb347868b1..88bba7f5ec9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -109,7 +109,7 @@ case class AnalyzeColumnCommand(
         throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError()
       }
     } else {
-      val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, 
tableMeta)
+      val (sizeInBytes, _) = CommandUtils.calculateTotalSize(sparkSession, 
tableMeta)
       val relation = sparkSession.table(tableIdent).logicalPlan
       val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, 
columnNames, allColumns)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 312f17543ce..2154a5893dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, 
CatalogTablePartition, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -57,12 +57,15 @@ object CommandUtils extends Logging {
     val catalog = sparkSession.sessionState.catalog
     if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
       val newTable = catalog.getTableMetadata(table.identifier)
-      val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
+      val (newSize, newPartitions) = 
CommandUtils.calculateTotalSize(sparkSession, newTable)
       val isNewStats = newTable.stats.map(newSize != 
_.sizeInBytes).getOrElse(true)
       if (isNewStats) {
         val newStats = CatalogStatistics(sizeInBytes = newSize)
         catalog.alterTableStats(table.identifier, Some(newStats))
       }
+      if (newPartitions.nonEmpty) {
+        catalog.alterPartitions(table.identifier, newPartitions)
+      }
     } else if (table.stats.nonEmpty) {
       catalog.alterTableStats(table.identifier, None)
     } else {
@@ -71,22 +74,29 @@ object CommandUtils extends Logging {
     }
   }
 
-  def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): 
BigInt = {
+  def calculateTotalSize(
+      spark: SparkSession,
+      catalogTable: CatalogTable): (BigInt, Seq[CatalogTablePartition]) = {
     val sessionState = spark.sessionState
     val startTime = System.nanoTime()
-    val totalSize = if (catalogTable.partitionColumnNames.isEmpty) {
-      calculateSingleLocationSize(sessionState, catalogTable.identifier,
-        catalogTable.storage.locationUri)
+    val (totalSize, newPartitions) = if 
(catalogTable.partitionColumnNames.isEmpty) {
+      (calculateSingleLocationSize(sessionState, catalogTable.identifier,
+        catalogTable.storage.locationUri), Seq())
     } else {
       // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
       val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
       logInfo(s"Starting to calculate sizes for ${partitions.length} 
partitions.")
       val paths = partitions.map(_.storage.locationUri)
-      calculateMultipleLocationSizes(spark, catalogTable.identifier, paths).sum
+      val sizes = calculateMultipleLocationSizes(spark, 
catalogTable.identifier, paths)
+      val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
+        val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), 
None)
+        newStats.map(_ => p.copy(stats = newStats))
+      }
+      (sizes.sum, newPartitions)
     }
     logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to 
calculate" +
       s" the total size for table ${catalogTable.identifier}.")
-    totalSize
+    (totalSize, newPartitions)
   }
 
   def calculateSingleLocationSize(
@@ -222,7 +232,7 @@ object CommandUtils extends Logging {
       }
     } else {
       // Compute stats for the whole table
-      val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, 
tableMeta)
+      val (newTotalSize, _) = CommandUtils.calculateTotalSize(sparkSession, 
tableMeta)
       val newRowCount =
         if (noScan) None else 
Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 14d0e9753f2..e9ec98e6d0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -480,6 +480,7 @@ case class AlterTableAddPartitionCommand(
       if (addedSize > 0) {
         val newStats = CatalogStatistics(sizeInBytes = 
table.stats.get.sizeInBytes + addedSize)
         catalog.alterTableStats(table.identifier, Some(newStats))
+        catalog.alterPartitions(table.identifier, parts)
       }
     } else {
       // Re-calculating of table size including all partitions
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 46acc9b2f0a..c689682a46b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -201,7 +201,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
             .getTableMetadata(TableIdentifier(checkSizeTable))
           HiveCatalogMetrics.reset()
           
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
-          val size = CommandUtils.calculateTotalSize(spark, tableMeta)
+          val (size, _) = CommandUtils.calculateTotalSize(spark, tableMeta)
           
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
           assert(size === BigInt(17436))
       }
@@ -984,6 +984,16 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
               assert(fetched2.get.colStats.isEmpty)
               val statsProp = getStatsProperties(table)
               assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == 
fetched2.get.sizeInBytes)
+
+              // SPARK-38573: Support Partition Level Statistics Collection
+              val partStats1 = getPartitionStats(table, Map("ds" -> 
"2008-04-08", "hr" -> "11"))
+              assert(partStats1.sizeInBytes > 0)
+              val partStats2 = getPartitionStats(table, Map("ds" -> 
"2008-04-08", "hr" -> "12"))
+              assert(partStats2.sizeInBytes > 0)
+              val partStats3 = getPartitionStats(table, Map("ds" -> 
"2008-04-09", "hr" -> "11"))
+              assert(partStats3.sizeInBytes > 0)
+              val partStats4 = getPartitionStats(table, Map("ds" -> 
"2008-04-09", "hr" -> "12"))
+              assert(partStats4.sizeInBytes > 0)
             } else {
               assert(getStatsProperties(table).isEmpty)
             }
@@ -1007,6 +1017,10 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
               assert(fetched4.get.colStats.isEmpty)
               val statsProp = getStatsProperties(table)
               assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == 
fetched4.get.sizeInBytes)
+
+              // SPARK-38573: Support Partition Level Statistics Collection
+              val partStats3 = getPartitionStats(table, Map("ds" -> 
"2008-04-09", "hr" -> "11"))
+              assert(partStats3.sizeInBytes > 0)
             } else {
               assert(getStatsProperties(table).isEmpty)
             }
@@ -1529,4 +1543,71 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
       }
     }
   }
+
+  test("SPARK-38573: partition stats auto update for dynamic partitions") {
+    val table = "partition_stats_dynamic_partition"
+    Seq("hive", "parquet").foreach { source =>
+      withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") {
+        withTable(table) {
+          sql(s"CREATE TABLE $table (id INT, sp INT, dp INT) USING $source 
PARTITIONED BY (sp, dp)")
+          sql(s"INSERT INTO $table PARTITION (sp=0, dp) VALUES (0, 0)")
+          sql(s"INSERT OVERWRITE TABLE $table PARTITION (sp=0, dp) SELECT id, 
id FROM range(5)")
+          for (i <- 0 until 5) {
+            val partStats = getPartitionStats(table, Map("sp" -> s"0", "dp" -> 
s"$i"))
+            assert(partStats.sizeInBytes > 0)
+          }
+        }
+      }
+    }
+  }
+
+  test("SPARK-38573: change partition stats after load/set/truncate data 
command") {
+    val table = "partition_stats_load_set_truncate"
+    withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") {
+      withTable(table) {
+        sql(s"CREATE TABLE $table (i INT, j STRING) USING hive " +
+          "PARTITIONED BY (ds STRING, hr STRING)")
+
+        withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
+          val partDir1 = new File(new File(dir1, "ds=2008-04-09"), "hr=11")
+          val file1 = new File(partDir1, "data")
+          file1.getParentFile.mkdirs()
+          Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+            writer.write("1,a")
+          }
+
+          val partDir2 = new File(new File(dir2, "ds=2008-04-09"), "hr=12")
+          val file2 = new File(partDir2, "data")
+          file2.getParentFile.mkdirs()
+          Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+            writer.write("1,a")
+          }
+
+          sql(s"""
+            |LOAD DATA INPATH '${file1.toURI.toString}' INTO TABLE $table
+            |PARTITION (ds='2008-04-09', hr='11')
+            """.stripMargin)
+          sql(s"ALTER TABLE $table ADD PARTITION (ds='2008-04-09', hr='12')")
+          sql(s"""
+            |ALTER TABLE $table PARTITION (ds='2008-04-09', hr='12')
+            |SET LOCATION '${partDir2.toURI.toString}'
+            |""".stripMargin)
+          val partStats1 = getPartitionStats(table, Map("ds" -> "2008-04-09", 
"hr" -> "11"))
+          assert(partStats1.sizeInBytes > 0)
+          val partStats2 = getPartitionStats(table, Map("ds" -> "2008-04-09", 
"hr" -> "12"))
+          assert(partStats2.sizeInBytes > 0)
+
+
+          sql(s"TRUNCATE TABLE $table PARTITION (ds='2008-04-09', hr='11')")
+          val partStats3 = getPartitionStats(table, Map("ds" -> "2008-04-09", 
"hr" -> "11"))
+          assert(partStats3.sizeInBytes == 0)
+          val partStats4 = getPartitionStats(table, Map("ds" -> "2008-04-09", 
"hr" -> "12"))
+          assert(partStats4.sizeInBytes > 0)
+          sql(s"TRUNCATE TABLE $table")
+          val partStats5 = getPartitionStats(table, Map("ds" -> "2008-04-09", 
"hr" -> "12"))
+          assert(partStats5.sizeInBytes == 0)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to