This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9756fbaa8c9 [SPARK-38573][SQL] Support Auto Partition Statistics Collection 9756fbaa8c9 is described below commit 9756fbaa8c9c4648e8c40a2e687295502d7b1196 Author: Kazuyuki Tanimura <ktanim...@apple.com> AuthorDate: Fri Apr 15 09:18:07 2022 -0700 [SPARK-38573][SQL] Support Auto Partition Statistics Collection ### What changes were proposed in this pull request? Currently https://issues.apache.org/jira/browse/SPARK-21127 supports storing the aggregated stats automatically at table level with the config `spark.sql.statistics.size.autoUpdate.enabled`. This PR proposes to update partition statistics automatically at the same time when the `spark.sql.statistics.size.autoUpdate.enabled` config is enabled. ### Why are the changes needed? Supporting partition level stats are useful to know which partitions are outliers (skewed partition) and query optimizer works better with partition level stats in case of partition pruning. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Updated unit tests Closes #36067 from kazuyukitanimura/SPARK-38573. Authored-by: Kazuyuki Tanimura <ktanim...@apple.com> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../execution/command/AnalyzeColumnCommand.scala | 2 +- .../spark/sql/execution/command/CommandUtils.scala | 28 +++++--- .../apache/spark/sql/execution/command/ddl.scala | 1 + .../apache/spark/sql/hive/StatisticsSuite.scala | 83 +++++++++++++++++++++- 4 files changed, 103 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 5cb347868b1..88bba7f5ec9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -109,7 +109,7 @@ case class AnalyzeColumnCommand( throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError() } } else { - val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta) + val (sizeInBytes, _) = CommandUtils.calculateTotalSize(sparkSession, tableMeta) val relation = sparkSession.table(tableIdent).logicalPlan val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 312f17543ce..2154a5893dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -57,12 +57,15 @@ object CommandUtils extends Logging { val catalog = sparkSession.sessionState.catalog if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { val newTable = catalog.getTableMetadata(table.identifier) - val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable) + val (newSize, newPartitions) = CommandUtils.calculateTotalSize(sparkSession, newTable) val isNewStats = newTable.stats.map(newSize != _.sizeInBytes).getOrElse(true) if (isNewStats) { val newStats = CatalogStatistics(sizeInBytes = newSize) catalog.alterTableStats(table.identifier, Some(newStats)) } + if (newPartitions.nonEmpty) { + catalog.alterPartitions(table.identifier, newPartitions) + } } else if (table.stats.nonEmpty) { catalog.alterTableStats(table.identifier, None) } else { @@ -71,22 +74,29 @@ object CommandUtils extends Logging { } } - def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable): BigInt = { + def calculateTotalSize( + spark: SparkSession, + catalogTable: CatalogTable): (BigInt, Seq[CatalogTablePartition]) = { val sessionState = spark.sessionState val startTime = System.nanoTime() - val totalSize = if (catalogTable.partitionColumnNames.isEmpty) { - calculateSingleLocationSize(sessionState, catalogTable.identifier, - catalogTable.storage.locationUri) + val (totalSize, newPartitions) = if (catalogTable.partitionColumnNames.isEmpty) { + (calculateSingleLocationSize(sessionState, catalogTable.identifier, + catalogTable.storage.locationUri), Seq()) } else { // Calculate table size as a sum of the visible partitions. See SPARK-21079 val partitions = sessionState.catalog.listPartitions(catalogTable.identifier) logInfo(s"Starting to calculate sizes for ${partitions.length} partitions.") val paths = partitions.map(_.storage.locationUri) - calculateMultipleLocationSizes(spark, catalogTable.identifier, paths).sum + val sizes = calculateMultipleLocationSizes(spark, catalogTable.identifier, paths) + val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) => + val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx), None) + newStats.map(_ => p.copy(stats = newStats)) + } + (sizes.sum, newPartitions) } logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to calculate" + s" the total size for table ${catalogTable.identifier}.") - totalSize + (totalSize, newPartitions) } def calculateSingleLocationSize( @@ -222,7 +232,7 @@ object CommandUtils extends Logging { } } else { // Compute stats for the whole table - val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta) + val (newTotalSize, _) = CommandUtils.calculateTotalSize(sparkSession, tableMeta) val newRowCount = if (noScan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 14d0e9753f2..e9ec98e6d0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -480,6 +480,7 @@ case class AlterTableAddPartitionCommand( if (addedSize > 0) { val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize) catalog.alterTableStats(table.identifier, Some(newStats)) + catalog.alterPartitions(table.identifier, parts) } } else { // Re-calculating of table size including all partitions diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 46acc9b2f0a..c689682a46b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -201,7 +201,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto .getTableMetadata(TableIdentifier(checkSizeTable)) HiveCatalogMetrics.reset() assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) - val size = CommandUtils.calculateTotalSize(spark, tableMeta) + val (size, _) = CommandUtils.calculateTotalSize(spark, tableMeta) assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1) assert(size === BigInt(17436)) } @@ -984,6 +984,16 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(fetched2.get.colStats.isEmpty) val statsProp = getStatsProperties(table) assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes) + + // SPARK-38573: Support Partition Level Statistics Collection + val partStats1 = getPartitionStats(table, Map("ds" -> "2008-04-08", "hr" -> "11")) + assert(partStats1.sizeInBytes > 0) + val partStats2 = getPartitionStats(table, Map("ds" -> "2008-04-08", "hr" -> "12")) + assert(partStats2.sizeInBytes > 0) + val partStats3 = getPartitionStats(table, Map("ds" -> "2008-04-09", "hr" -> "11")) + assert(partStats3.sizeInBytes > 0) + val partStats4 = getPartitionStats(table, Map("ds" -> "2008-04-09", "hr" -> "12")) + assert(partStats4.sizeInBytes > 0) } else { assert(getStatsProperties(table).isEmpty) } @@ -1007,6 +1017,10 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(fetched4.get.colStats.isEmpty) val statsProp = getStatsProperties(table) assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched4.get.sizeInBytes) + + // SPARK-38573: Support Partition Level Statistics Collection + val partStats3 = getPartitionStats(table, Map("ds" -> "2008-04-09", "hr" -> "11")) + assert(partStats3.sizeInBytes > 0) } else { assert(getStatsProperties(table).isEmpty) } @@ -1529,4 +1543,71 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } } + + test("SPARK-38573: partition stats auto update for dynamic partitions") { + val table = "partition_stats_dynamic_partition" + Seq("hive", "parquet").foreach { source => + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") { + withTable(table) { + sql(s"CREATE TABLE $table (id INT, sp INT, dp INT) USING $source PARTITIONED BY (sp, dp)") + sql(s"INSERT INTO $table PARTITION (sp=0, dp) VALUES (0, 0)") + sql(s"INSERT OVERWRITE TABLE $table PARTITION (sp=0, dp) SELECT id, id FROM range(5)") + for (i <- 0 until 5) { + val partStats = getPartitionStats(table, Map("sp" -> s"0", "dp" -> s"$i")) + assert(partStats.sizeInBytes > 0) + } + } + } + } + } + + test("SPARK-38573: change partition stats after load/set/truncate data command") { + val table = "partition_stats_load_set_truncate" + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") { + withTable(table) { + sql(s"CREATE TABLE $table (i INT, j STRING) USING hive " + + "PARTITIONED BY (ds STRING, hr STRING)") + + withTempPaths(numPaths = 2) { case Seq(dir1, dir2) => + val partDir1 = new File(new File(dir1, "ds=2008-04-09"), "hr=11") + val file1 = new File(partDir1, "data") + file1.getParentFile.mkdirs() + Utils.tryWithResource(new PrintWriter(file1)) { writer => + writer.write("1,a") + } + + val partDir2 = new File(new File(dir2, "ds=2008-04-09"), "hr=12") + val file2 = new File(partDir2, "data") + file2.getParentFile.mkdirs() + Utils.tryWithResource(new PrintWriter(file2)) { writer => + writer.write("1,a") + } + + sql(s""" + |LOAD DATA INPATH '${file1.toURI.toString}' INTO TABLE $table + |PARTITION (ds='2008-04-09', hr='11') + """.stripMargin) + sql(s"ALTER TABLE $table ADD PARTITION (ds='2008-04-09', hr='12')") + sql(s""" + |ALTER TABLE $table PARTITION (ds='2008-04-09', hr='12') + |SET LOCATION '${partDir2.toURI.toString}' + |""".stripMargin) + val partStats1 = getPartitionStats(table, Map("ds" -> "2008-04-09", "hr" -> "11")) + assert(partStats1.sizeInBytes > 0) + val partStats2 = getPartitionStats(table, Map("ds" -> "2008-04-09", "hr" -> "12")) + assert(partStats2.sizeInBytes > 0) + + + sql(s"TRUNCATE TABLE $table PARTITION (ds='2008-04-09', hr='11')") + val partStats3 = getPartitionStats(table, Map("ds" -> "2008-04-09", "hr" -> "11")) + assert(partStats3.sizeInBytes == 0) + val partStats4 = getPartitionStats(table, Map("ds" -> "2008-04-09", "hr" -> "12")) + assert(partStats4.sizeInBytes > 0) + sql(s"TRUNCATE TABLE $table") + val partStats5 = getPartitionStats(table, Map("ds" -> "2008-04-09", "hr" -> "12")) + assert(partStats5.sizeInBytes == 0) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org