Repository: spark
Updated Branches:
  refs/heads/master 4eb41879c -> 61b5df567


[SPARK-21127][SQL] Update statistics after data changing commands

## What changes were proposed in this pull request?

Update stats after the following data changing commands:

- InsertIntoHadoopFsRelationCommand
- InsertIntoHiveTable
- LoadDataCommand
- TruncateTableCommand
- AlterTableSetLocationCommand
- AlterTableDropPartitionCommand

## How was this patch tested?
Added new test cases.

Author: wangzhenhua <wangzhen...@huawei.com>
Author: Zhenhua Wang <wzh_...@163.com>

Closes #18334 from wzhfy/changeStatsForOperation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61b5df56
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61b5df56
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61b5df56

Branch: refs/heads/master
Commit: 61b5df567eb8ae0df4059cb0e334316fff462de9
Parents: 4eb4187
Author: wangzhenhua <wangzhen...@huawei.com>
Authored: Sat Jul 1 10:01:44 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Sat Jul 1 10:01:44 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  10 +
 .../sql/execution/command/CommandUtils.scala    |  17 +-
 .../spark/sql/execution/command/ddl.scala       |  15 +-
 .../spark/sql/StatisticsCollectionSuite.scala   |  77 +++++---
 .../apache/spark/sql/hive/StatisticsSuite.scala | 187 ++++++++++++-------
 5 files changed, 207 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c641e4d..25152f3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -774,6 +774,14 @@ object SQLConf {
       .doubleConf
       .createWithDefault(0.05)
 
+  val AUTO_UPDATE_SIZE =
+    buildConf("spark.sql.statistics.autoUpdate.size")
+      .doc("Enables automatic update for table size once table's data is 
changed. Note that if " +
+        "the total number of files of the table is very large, this can be 
expensive and slow " +
+        "down data change commands.")
+      .booleanConf
+      .createWithDefault(false)
+
   val CBO_ENABLED =
     buildConf("spark.sql.cbo.enabled")
       .doc("Enables CBO for estimation of plan statistics when set true.")
@@ -1083,6 +1091,8 @@ class SQLConf extends Serializable with Logging {
 
   def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
 
+  def autoUpdateSize: Boolean = getConf(SQLConf.AUTO_UPDATE_SIZE)
+
   def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)
 
   def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)

http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 9239760..fce12cc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -36,7 +36,14 @@ object CommandUtils extends Logging {
   def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit 
= {
     if (table.stats.nonEmpty) {
       val catalog = sparkSession.sessionState.catalog
-      catalog.alterTableStats(table.identifier, None)
+      if (sparkSession.sessionState.conf.autoUpdateSize) {
+        val newTable = catalog.getTableMetadata(table.identifier)
+        val newSize = 
CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable)
+        val newStats = CatalogStatistics(sizeInBytes = newSize)
+        catalog.alterTableStats(table.identifier, Some(newStats))
+      } else {
+        catalog.alterTableStats(table.identifier, None)
+      }
     }
   }
 
@@ -84,7 +91,9 @@ object CommandUtils extends Logging {
       size
     }
 
-    locationUri.map { p =>
+    val startTime = System.nanoTime()
+    logInfo(s"Starting to calculate the total file size under path 
$locationUri.")
+    val size = locationUri.map { p =>
       val path = new Path(p)
       try {
         val fs = path.getFileSystem(sessionState.newHadoopConf())
@@ -97,6 +106,10 @@ object CommandUtils extends Logging {
           0L
       }
     }.getOrElse(0L)
+    val durationInMs = (System.nanoTime() - startTime) / (1000 * 1000)
+    logInfo(s"It took $durationInMs ms to calculate the total file size under 
path $locationUri.")
+
+    size
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index ac897c1..ba7ca84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -437,7 +437,20 @@ case class AlterTableAddPartitionCommand(
     }
     catalog.createPartitions(table.identifier, parts, ignoreIfExists = 
ifNotExists)
 
-    CommandUtils.updateTableStats(sparkSession, table)
+    if (table.stats.nonEmpty) {
+      if (sparkSession.sessionState.conf.autoUpdateSize) {
+        val addedSize = parts.map { part =>
+          CommandUtils.calculateLocationSize(sparkSession.sessionState, 
table.identifier,
+            part.storage.locationUri)
+        }.sum
+        if (addedSize > 0) {
+          val newStats = CatalogStatistics(sizeInBytes = 
table.stats.get.sizeInBytes + addedSize)
+          catalog.alterTableStats(table.identifier, Some(newStats))
+        }
+      } else {
+        catalog.alterTableStats(table.identifier, None)
+      }
+    }
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index b031c52..d9392de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.test.SQLTestData.ArrayData
 import org.apache.spark.sql.types._
@@ -178,36 +178,63 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
 
   test("change stats after set location command") {
     val table = "change_stats_set_location_table"
-    withTable(table) {
-      spark.range(100).select($"id", $"id" % 5 as 
"value").write.saveAsTable(table)
-      // analyze to get initial stats
-      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value")
-      val fetched1 = checkTableStats(
-        table, hasSizeInBytes = true, expectedRowCounts = Some(100))
-      assert(fetched1.get.sizeInBytes > 0)
-      assert(fetched1.get.colStats.size == 2)
-
-      // set location command
-      withTempDir { newLocation =>
-        sql(s"ALTER TABLE $table SET LOCATION '${newLocation.toURI.toString}'")
-        checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = 
None)
+    Seq(false, true).foreach { autoUpdate =>
+      withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+        withTable(table) {
+          spark.range(100).select($"id", $"id" % 5 as 
"value").write.saveAsTable(table)
+          // analyze to get initial stats
+          sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS id, value")
+          val fetched1 = checkTableStats(
+            table, hasSizeInBytes = true, expectedRowCounts = Some(100))
+          assert(fetched1.get.sizeInBytes > 0)
+          assert(fetched1.get.colStats.size == 2)
+
+          // set location command
+          val initLocation = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier(table))
+            .storage.locationUri.get.toString
+          withTempDir { newLocation =>
+            sql(s"ALTER TABLE $table SET LOCATION 
'${newLocation.toURI.toString}'")
+            if (autoUpdate) {
+              val fetched2 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = None)
+              assert(fetched2.get.sizeInBytes == 0)
+              assert(fetched2.get.colStats.isEmpty)
+
+              // set back to the initial location
+              sql(s"ALTER TABLE $table SET LOCATION '$initLocation'")
+              val fetched3 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = None)
+              assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes)
+            } else {
+              checkTableStats(table, hasSizeInBytes = false, expectedRowCounts 
= None)
+            }
+          }
+        }
       }
     }
   }
 
   test("change stats after insert command for datasource table") {
     val table = "change_stats_insert_datasource_table"
-    withTable(table) {
-      sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
-      // analyze to get initial stats
-      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
-      val fetched1 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(0))
-      assert(fetched1.get.sizeInBytes == 0)
-      assert(fetched1.get.colStats.size == 2)
-
-      // insert into command
-      sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
-      checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None)
+    Seq(false, true).foreach { autoUpdate =>
+      withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+        withTable(table) {
+          sql(s"CREATE TABLE $table (i int, j string) USING PARQUET")
+          // analyze to get initial stats
+          sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+          val fetched1 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(0))
+          assert(fetched1.get.sizeInBytes == 0)
+          assert(fetched1.get.colStats.size == 2)
+
+          // insert into command
+          sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
+          if (autoUpdate) {
+            val fetched2 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = None)
+            assert(fetched2.get.sizeInBytes > 0)
+            assert(fetched2.get.colStats.isEmpty)
+          } else {
+            checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = 
None)
+          }
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/61b5df56/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 5fd266c..c601038 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -444,88 +444,133 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 
   test("change stats after insert command for hive table") {
     val table = s"change_stats_insert_hive_table"
-    withTable(table) {
-      sql(s"CREATE TABLE $table (i int, j string)")
-      // analyze to get initial stats
-      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
-      val fetched1 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(0))
-      assert(fetched1.get.sizeInBytes == 0)
-      assert(fetched1.get.colStats.size == 2)
-
-      // insert into command
-      sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
-      assert(getStatsProperties(table).isEmpty)
+    Seq(false, true).foreach { autoUpdate =>
+      withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+        withTable(table) {
+          sql(s"CREATE TABLE $table (i int, j string)")
+          // analyze to get initial stats
+          sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+          val fetched1 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(0))
+          assert(fetched1.get.sizeInBytes == 0)
+          assert(fetched1.get.colStats.size == 2)
+
+          // insert into command
+          sql(s"INSERT INTO TABLE $table SELECT 1, 'abc'")
+          if (autoUpdate) {
+            val fetched2 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = None)
+            assert(fetched2.get.sizeInBytes > 0)
+            assert(fetched2.get.colStats.isEmpty)
+            val statsProp = getStatsProperties(table)
+            assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == 
fetched2.get.sizeInBytes)
+          } else {
+            assert(getStatsProperties(table).isEmpty)
+          }
+        }
+      }
     }
   }
 
   test("change stats after load data command") {
     val table = "change_stats_load_table"
-    withTable(table) {
-      sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET")
-      // analyze to get initial stats
-      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
-      val fetched1 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(0))
-      assert(fetched1.get.sizeInBytes == 0)
-      assert(fetched1.get.colStats.size == 2)
-
-      withTempDir { loadPath =>
-        // load data command
-        val file = new File(loadPath + "/data")
-        val writer = new PrintWriter(file)
-        writer.write("2,xyz")
-        writer.close()
-        sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE $table")
-        assert(getStatsProperties(table).isEmpty)
+    Seq(false, true).foreach { autoUpdate =>
+      withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+        withTable(table) {
+          sql(s"CREATE TABLE $table (i INT, j STRING) STORED AS PARQUET")
+          // analyze to get initial stats
+          sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+          val fetched1 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(0))
+          assert(fetched1.get.sizeInBytes == 0)
+          assert(fetched1.get.colStats.size == 2)
+
+          withTempDir { loadPath =>
+            // load data command
+            val file = new File(loadPath + "/data")
+            val writer = new PrintWriter(file)
+            writer.write("2,xyz")
+            writer.close()
+            sql(s"LOAD DATA INPATH '${loadPath.toURI.toString}' INTO TABLE 
$table")
+            if (autoUpdate) {
+              val fetched2 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = None)
+              assert(fetched2.get.sizeInBytes > 0)
+              assert(fetched2.get.colStats.isEmpty)
+              val statsProp = getStatsProperties(table)
+              assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == 
fetched2.get.sizeInBytes)
+            } else {
+              assert(getStatsProperties(table).isEmpty)
+            }
+          }
+        }
       }
     }
   }
 
   test("change stats after add/drop partition command") {
     val table = "change_stats_part_table"
-    withTable(table) {
-      sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds STRING, 
hr STRING)")
-      // table has two partitions initially
-      for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) {
-        sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') 
SELECT 1, 'a'")
-      }
-      // analyze to get initial stats
-      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
-      val fetched1 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(2))
-      assert(fetched1.get.sizeInBytes > 0)
-      assert(fetched1.get.colStats.size == 2)
-
-      withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
-        val file1 = new File(dir1 + "/data")
-        val writer1 = new PrintWriter(file1)
-        writer1.write("1,a")
-        writer1.close()
-
-        val file2 = new File(dir2 + "/data")
-        val writer2 = new PrintWriter(file2)
-        writer2.write("1,a")
-        writer2.close()
-
-        // add partition command
-        sql(
-          s"""
-             |ALTER TABLE $table ADD
-             |PARTITION (ds='2008-04-09', hr='11') LOCATION 
'${dir1.toURI.toString}'
-             |PARTITION (ds='2008-04-09', hr='12') LOCATION 
'${dir2.toURI.toString}'
-        """.stripMargin)
-        assert(getStatsProperties(table).isEmpty)
-
-        // generate stats again
-        sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
-        val fetched2 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(4))
-        assert(fetched2.get.sizeInBytes > 0)
-        assert(fetched2.get.colStats.size == 2)
-
-        // drop partition command
-        sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), PARTITION 
(hr='12')")
-        // only one partition left
-        
assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table))
-          .map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> "11")))
-        assert(getStatsProperties(table).isEmpty)
+    Seq(false, true).foreach { autoUpdate =>
+      withSQLConf(SQLConf.AUTO_UPDATE_SIZE.key -> autoUpdate.toString) {
+        withTable(table) {
+          sql(s"CREATE TABLE $table (i INT, j STRING) PARTITIONED BY (ds 
STRING, hr STRING)")
+          // table has two partitions initially
+          for (ds <- Seq("2008-04-08"); hr <- Seq("11", "12")) {
+            sql(s"INSERT OVERWRITE TABLE $table PARTITION (ds='$ds',hr='$hr') 
SELECT 1, 'a'")
+          }
+          // analyze to get initial stats
+          sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+          val fetched1 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = Some(2))
+          assert(fetched1.get.sizeInBytes > 0)
+          assert(fetched1.get.colStats.size == 2)
+
+          withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
+            val file1 = new File(dir1 + "/data")
+            val writer1 = new PrintWriter(file1)
+            writer1.write("1,a")
+            writer1.close()
+
+            val file2 = new File(dir2 + "/data")
+            val writer2 = new PrintWriter(file2)
+            writer2.write("1,a")
+            writer2.close()
+
+            // add partition command
+            sql(
+              s"""
+                 |ALTER TABLE $table ADD
+                 |PARTITION (ds='2008-04-09', hr='11') LOCATION 
'${dir1.toURI.toString}'
+                 |PARTITION (ds='2008-04-09', hr='12') LOCATION 
'${dir2.toURI.toString}'
+            """.stripMargin)
+            if (autoUpdate) {
+              val fetched2 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = None)
+              assert(fetched2.get.sizeInBytes > fetched1.get.sizeInBytes)
+              assert(fetched2.get.colStats.isEmpty)
+              val statsProp = getStatsProperties(table)
+              assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == 
fetched2.get.sizeInBytes)
+            } else {
+              assert(getStatsProperties(table).isEmpty)
+            }
+
+            // now the table has four partitions, generate stats again
+            sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS i, j")
+            val fetched3 = checkTableStats(
+              table, hasSizeInBytes = true, expectedRowCounts = Some(4))
+            assert(fetched3.get.sizeInBytes > 0)
+            assert(fetched3.get.colStats.size == 2)
+
+            // drop partition command
+            sql(s"ALTER TABLE $table DROP PARTITION (ds='2008-04-08'), 
PARTITION (hr='12')")
+            
assert(spark.sessionState.catalog.listPartitions(TableIdentifier(table))
+              .map(_.spec).toSet == Set(Map("ds" -> "2008-04-09", "hr" -> 
"11")))
+            // only one partition left
+            if (autoUpdate) {
+              val fetched4 = checkTableStats(table, hasSizeInBytes = true, 
expectedRowCounts = None)
+              assert(fetched4.get.sizeInBytes < fetched1.get.sizeInBytes)
+              assert(fetched4.get.colStats.isEmpty)
+              val statsProp = getStatsProperties(table)
+              assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == 
fetched4.get.sizeInBytes)
+            } else {
+              assert(getStatsProperties(table).isEmpty)
+            }
+          }
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to