Repository: spark Updated Branches: refs/heads/master 4eb41879c -> 61b5df567
[SPARK-21127][SQL] Update statistics after data changing commands ## What changes were proposed in this pull request? Update stats after the following data changing commands: - InsertIntoHadoopFsRelationCommand - InsertIntoHiveTable - LoadDataCommand - TruncateTableCommand - AlterTableSetLocationCommand - AlterTableDropPartitionCommand ## How was this patch tested? Added new test cases. Author: wangzhenhua <wangzhen...@huawei.com> Author: Zhenhua Wang <wzh_...@163.com> Closes #18334 from wzhfy/changeStatsForOperation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61b5df56 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61b5df56 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61b5df56 Branch: refs/heads/master Commit: 61b5df567eb8ae0df4059cb0e334316fff462de9 Parents: 4eb4187 Author: wangzhenhua <wangzhen...@huawei.com> Authored: Sat Jul 1 10:01:44 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Sat Jul 1 10:01:44 2017 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/internal/SQLConf.scala | 10 + .../sql/execution/command/CommandUtils.scala | 17 +- .../spark/sql/execution/command/ddl.scala | 15 +- .../spark/sql/StatisticsCollectionSuite.scala | 77 +++++--- .../apache/spark/sql/hive/StatisticsSuite.scala | 187 ++++++++++++------- 5 files changed, 207 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c641e4d..25152f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -774,6 +774,14 @@ object SQLConf { .doubleConf .createWithDefault(0.05) + val AUTO_UPDATE_SIZE = + buildConf("spark.sql.statistics.autoUpdate.size") + .doc("Enables automatic update for table size once table's data is changed. Note that if " + + "the total number of files of the table is very large, this can be expensive and slow " + + "down data change commands.") + .booleanConf + .createWithDefault(false) + val CBO_ENABLED = buildConf("spark.sql.cbo.enabled") .doc("Enables CBO for estimation of plan statistics when set true.") @@ -1083,6 +1091,8 @@ class SQLConf extends Serializable with Logging { def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED) + def autoUpdateSize: Boolean = getConf(SQLConf.AUTO_UPDATE_SIZE) + def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED) def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 9239760..fce12cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -36,7 +36,14 @@ object CommandUtils extends Logging { def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = { if (table.stats.nonEmpty) { val catalog = sparkSession.sessionState.catalog - catalog.alterTableStats(table.identifier, None) + if (sparkSession.sessionState.conf.autoUpdateSize) { + val newTable = catalog.getTableMetadata(table.identifier) + val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable) + val newStats = CatalogStatistics(sizeInBytes = newSize) + catalog.alterTableStats(table.identifier, Some(newStats)) + } else { + catalog.alterTableStats(table.identifier, None) + } } } @@ -84,7 +91,9 @@ object CommandUtils extends Logging { size } - locationUri.map { p => + val startTime = System.nanoTime() + logInfo(s"Starting to calculate the total file size under path $locationUri.") + val size = locationUri.map { p => val path = new Path(p) try { val fs = path.getFileSystem(sessionState.newHadoopConf()) @@ -97,6 +106,10 @@ object CommandUtils extends Logging { 0L } }.getOrElse(0L) + val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000) + logInfo(s"It took $durationInMs ms to calculate the total file size under path $locationUri.") + + size } } http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index ac897c1..ba7ca84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -437,7 +437,20 @@ case class AlterTableAddPartitionCommand( } catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) - CommandUtils.updateTableStats(sparkSession, table) + if (table.stats.nonEmpty) { + if (sparkSession.sessionState.conf.autoUpdateSize) { + val addedSize = parts.map { part => + CommandUtils.calculateLocationSize(sparkSession.sessionState, table.identifier, + part.storage.locationUri) + }.sum + if (addedSize > 0) { + val newStats = CatalogStatistics(sizeInBytes = table.stats.get.sizeInBytes + addedSize) + catalog.alterTableStats(table.identifier, Some(newStats)) + } + } else { + catalog.alterTableStats(table.identifier, None) + } + } Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index b031c52..d9392de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.test.SQLTestData.ArrayData import org.apache.spark.sql.types._ @@ -178,36 +178,63 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared test("change stats after set location command") { val table = "change_stats_set_location_table" - withTable(table) { - spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table) - // analyze to get initial stats - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value") - val fetched1 = checkTableStats( - table, hasSizeInBytes = true, expectedRowCounts = Some(100)) - assert(fetched1.get.sizeInBytes > 0) - assert(fetched1.get.colStats.size == 2) - - // set location command - withTempDir { newLocation => - sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'") - checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) { + withTable(table) { + spark.range(100).select($"id", $"id" % 5 as "value").write.saveAsTable(table) + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value") + val fetched1 = checkTableStats( + table, hasSizeInBytes = true, expectedRowCounts = Some(100)) + assert(fetched1.get.sizeInBytes > 0) + assert(fetched1.get.colStats.size == 2) + + // set location command + val initLocation = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table)) + .storage.locationUri.get.toString + withTempDir { newLocation => + sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'") + if (autoUpdate) { + val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) + assert(fetched2.get.sizeInBytes == 0) + assert(fetched2.get.colStats.isEmpty) + + // set back to the initial location + sql(s"ALTER TABLE $table SET LOCATION '$initLocation'") + val fetched3 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) + assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes) + } else { + checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + } + } + } } } } test("change stats after insert command for datasource table") { val table = "change_stats_insert_datasource_table" - withTable(table) { - sql(s"CREATE TABLE $table (i int, j string) USING PARQUET") - // analyze to get initial stats - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") - val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) - assert(fetched1.get.sizeInBytes == 0) - assert(fetched1.get.colStats.size == 2) - - // insert into command - sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'") - checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) { + withTable(table) { + sql(s"CREATE TABLE $table (i int, j string) USING PARQUET") + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(fetched1.get.sizeInBytes == 0) + assert(fetched1.get.colStats.size == 2) + + // insert into command + sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'") + if (autoUpdate) { + val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) + assert(fetched2.get.sizeInBytes > 0) + assert(fetched2.get.colStats.isEmpty) + } else { + checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + } + } + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 5fd266c..c601038 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -444,88 +444,133 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("change stats after insert command for hive table") { val table = s"change_stats_insert_hive_table" - withTable(table) { - sql(s"CREATE TABLE $table (i int, j string)") - // analyze to get initial stats - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") - val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) - assert(fetched1.get.sizeInBytes == 0) - assert(fetched1.get.colStats.size == 2) - - // insert into command - sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'") - assert(getStatsProperties(table).isEmpty) + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) { + withTable(table) { + sql(s"CREATE TABLE $table (i int, j string)") + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(fetched1.get.sizeInBytes == 0) + assert(fetched1.get.colStats.size == 2) + + // insert into command + sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'") + if (autoUpdate) { + val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) + assert(fetched2.get.sizeInBytes > 0) + assert(fetched2.get.colStats.isEmpty) + val statsProp = getStatsProperties(table) + assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes) + } else { + assert(getStatsProperties(table).isEmpty) + } + } + } } } test("change stats after load data command") { val table = "change_stats_load_table" - withTable(table) { - sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET") - // analyze to get initial stats - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") - val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) - assert(fetched1.get.sizeInBytes == 0) - assert(fetched1.get.colStats.size == 2) - - withTempDir { loadPath => - // load data command - val file = new File(loadPath + "/data") - val writer = new PrintWriter(file) - writer.write("2,xyz") - writer.close() - sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table") - assert(getStatsProperties(table).isEmpty) + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) { + withTable(table) { + sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET") + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) + assert(fetched1.get.sizeInBytes == 0) + assert(fetched1.get.colStats.size == 2) + + withTempDir { loadPath => + // load data command + val file = new File(loadPath + "/data") + val writer = new PrintWriter(file) + writer.write("2,xyz") + writer.close() + sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table") + if (autoUpdate) { + val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) + assert(fetched2.get.sizeInBytes > 0) + assert(fetched2.get.colStats.isEmpty) + val statsProp = getStatsProperties(table) + assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes) + } else { + assert(getStatsProperties(table).isEmpty) + } + } + } } } } test("change stats after add/drop partition command") { val table = "change_stats_part_table" - withTable(table) { - sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)") - // table has two partitions initially - for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) { - sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'") - } - // analyze to get initial stats - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") - val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)) - assert(fetched1.get.sizeInBytes > 0) - assert(fetched1.get.colStats.size == 2) - - withTempPaths(numPaths = 2) { case Seq(dir1, dir2) => - val file1 = new File(dir1 + "/data") - val writer1 = new PrintWriter(file1) - writer1.write("1,a") - writer1.close() - - val file2 = new File(dir2 + "/data") - val writer2 = new PrintWriter(file2) - writer2.write("1,a") - writer2.close() - - // add partition command - sql( - s""" - |ALTER TABLE $table ADD - |PARTITION (ds='2008-04-09', hr='11') LOCATION '${dir1.toURI.toString}' - |PARTITION (ds='2008-04-09', hr='12') LOCATION '${dir2.toURI.toString}' - """.stripMargin) - assert(getStatsProperties(table).isEmpty) - - // generate stats again - sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") - val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(4)) - assert(fetched2.get.sizeInBytes > 0) - assert(fetched2.get.colStats.size == 2) - - // drop partition command - sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION (hr='12')") - // only one partition left - assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table)) - .map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11"))) - assert(getStatsProperties(table).isEmpty) + Seq(false, true).foreach { autoUpdate => + withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) { + withTable(table) { + sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, hr STRING)") + // table has two partitions initially + for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) { + sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') SELECT 1, 'a'") + } + // analyze to get initial stats + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)) + assert(fetched1.get.sizeInBytes > 0) + assert(fetched1.get.colStats.size == 2) + + withTempPaths(numPaths = 2) { case Seq(dir1, dir2) => + val file1 = new File(dir1 + "/data") + val writer1 = new PrintWriter(file1) + writer1.write("1,a") + writer1.close() + + val file2 = new File(dir2 + "/data") + val writer2 = new PrintWriter(file2) + writer2.write("1,a") + writer2.close() + + // add partition command + sql( + s""" + |ALTER TABLE $table ADD + |PARTITION (ds='2008-04-09', hr='11') LOCATION '${dir1.toURI.toString}' + |PARTITION (ds='2008-04-09', hr='12') LOCATION '${dir2.toURI.toString}' + """.stripMargin) + if (autoUpdate) { + val fetched2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) + assert(fetched2.get.sizeInBytes > fetched1.get.sizeInBytes) + assert(fetched2.get.colStats.isEmpty) + val statsProp = getStatsProperties(table) + assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes) + } else { + assert(getStatsProperties(table).isEmpty) + } + + // now the table has four partitions, generate stats again + sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j") + val fetched3 = checkTableStats( + table, hasSizeInBytes = true, expectedRowCounts = Some(4)) + assert(fetched3.get.sizeInBytes > 0) + assert(fetched3.get.colStats.size == 2) + + // drop partition command + sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION (hr='12')") + assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table)) + .map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11"))) + // only one partition left + if (autoUpdate) { + val fetched4 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) + assert(fetched4.get.sizeInBytes < fetched1.get.sizeInBytes) + assert(fetched4.get.colStats.isEmpty) + val statsProp = getStatsProperties(table) + assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched4.get.sizeInBytes) + } else { + assert(getStatsProperties(table).isEmpty) + } + } + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org