Repository: spark
Updated Branches:
  refs/heads/master 682eb4f2e -> 7453ab024


[SPARK-22745][SQL] read partition stats from Hive

## What changes were proposed in this pull request?

Currently Spark can read table stats (e.g. `totalSize, numRows`) from Hive, we 
can also support to read partition stats from Hive using the same logic.

## How was this patch tested?

Added a new test case and modified an existing test case.

Author: Zhenhua Wang <wangzhen...@huawei.com>
Author: Zhenhua Wang <wzh_...@163.com>

Closes #19932 from wzhfy/read_hive_partition_stats.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7453ab02
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7453ab02
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7453ab02

Branch: refs/heads/master
Commit: 7453ab0243dc04db1b586b0e5f588f9cdc9f72dd
Parents: 682eb4f
Author: Zhenhua Wang <wangzhen...@huawei.com>
Authored: Wed Dec 13 16:27:29 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Dec 13 16:27:29 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveExternalCatalog.scala    |  6 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  | 66 +++++++++++---------
 .../apache/spark/sql/hive/StatisticsSuite.scala | 32 +++++++---
 3 files changed, 62 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7453ab02/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 44e680d..632e3e0 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -668,7 +668,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
     val schema = restoreTableMetadata(rawTable).schema
 
     // convert table statistics to properties so that we can persist them 
through hive client
-    var statsProperties =
+    val statsProperties =
       if (stats.isDefined) {
         statsToProperties(stats.get, schema)
       } else {
@@ -1098,14 +1098,14 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
     val schema = restoreTableMetadata(rawTable).schema
 
     // convert partition statistics to properties so that we can persist them 
through hive api
-    val withStatsProps = lowerCasedParts.map(p => {
+    val withStatsProps = lowerCasedParts.map { p =>
       if (p.stats.isDefined) {
         val statsProperties = statsToProperties(p.stats.get, schema)
         p.copy(parameters = p.parameters ++ statsProperties)
       } else {
         p
       }
-    })
+    }
 
     // Note: Before altering table partitions in Hive, you *must* set the 
current database
     // to the one that contains the table of interest. Otherwise you will end 
up with the

http://git-wip-us.apache.org/repos/asf/spark/blob/7453ab02/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 08eb5c7..7233944 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -414,32 +414,6 @@ private[hive] class HiveClientImpl(
       }
       val comment = properties.get("comment")
 
-      // Here we are reading statistics from Hive.
-      // Note that this statistics could be overridden by Spark's statistics 
if that's available.
-      val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_))
-      val rawDataSize = 
properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_))
-      val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_))
-      // TODO: check if this estimate is valid for tables after partition 
pruning.
-      // NOTE: getting `totalSize` directly from params is kind of hacky, but 
this should be
-      // relatively cheap if parameters for the table are populated into the 
metastore.
-      // Currently, only totalSize, rawDataSize, and rowCount are used to 
build the field `stats`
-      // TODO: stats should include all the other two fields (`numFiles` and 
`numPartitions`).
-      // (see StatsSetupConst in Hive)
-      val stats =
-        // When table is external, `totalSize` is always zero, which will 
influence join strategy.
-        // So when `totalSize` is zero, use `rawDataSize` instead. When 
`rawDataSize` is also zero,
-        // return None.
-        // In Hive, when statistics gathering is disabled, `rawDataSize` and 
`numRows` is always
-        // zero after INSERT command. So they are used here only if they are 
larger than zero.
-        if (totalSize.isDefined && totalSize.get > 0L) {
-          Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = 
rowCount.filter(_ > 0)))
-        } else if (rawDataSize.isDefined && rawDataSize.get > 0) {
-          Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = 
rowCount.filter(_ > 0)))
-        } else {
-          // TODO: still fill the rowCount even if sizeInBytes is empty. Might 
break anything?
-          None
-        }
-
       CatalogTable(
         identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
         tableType = h.getTableType match {
@@ -478,7 +452,7 @@ private[hive] class HiveClientImpl(
         // For EXTERNAL_TABLE, the table properties has a particular field 
"EXTERNAL". This is added
         // in the function toHiveTable.
         properties = filteredProperties,
-        stats = stats,
+        stats = readHiveStats(properties),
         comment = comment,
         // In older versions of Spark(before 2.2.0), we expand the view 
original text and store
         // that into `viewExpandedText`, and that should be used in view 
resolution. So we get
@@ -1011,6 +985,11 @@ private[hive] object HiveClientImpl {
    */
   def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
     val apiPartition = hp.getTPartition
+    val properties: Map[String, String] = if (hp.getParameters != null) {
+      hp.getParameters.asScala.toMap
+    } else {
+      Map.empty
+    }
     CatalogTablePartition(
       spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty),
       storage = CatalogStorageFormat(
@@ -1021,8 +1000,37 @@ private[hive] object HiveClientImpl {
         compressed = apiPartition.getSd.isCompressed,
         properties = Option(apiPartition.getSd.getSerdeInfo.getParameters)
           .map(_.asScala.toMap).orNull),
-        parameters =
-          if (hp.getParameters() != null) hp.getParameters().asScala.toMap 
else Map.empty)
+      parameters = properties,
+      stats = readHiveStats(properties))
+  }
+
+  /**
+   * Reads statistics from Hive.
+   * Note that this statistics could be overridden by Spark's statistics if 
that's available.
+   */
+  private def readHiveStats(properties: Map[String, String]): 
Option[CatalogStatistics] = {
+    val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_))
+    val rawDataSize = 
properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_))
+    val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_))
+    // NOTE: getting `totalSize` directly from params is kind of hacky, but 
this should be
+    // relatively cheap if parameters for the table are populated into the 
metastore.
+    // Currently, only totalSize, rawDataSize, and rowCount are used to build 
the field `stats`
+    // TODO: stats should include all the other two fields (`numFiles` and 
`numPartitions`).
+    // (see StatsSetupConst in Hive)
+
+    // When table is external, `totalSize` is always zero, which will 
influence join strategy.
+    // So when `totalSize` is zero, use `rawDataSize` instead. When 
`rawDataSize` is also zero,
+    // return None.
+    // In Hive, when statistics gathering is disabled, `rawDataSize` and 
`numRows` is always
+    // zero after INSERT command. So they are used here only if they are 
larger than zero.
+    if (totalSize.isDefined && totalSize.get > 0L) {
+      Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = 
rowCount.filter(_ > 0)))
+    } else if (rawDataSize.isDefined && rawDataSize.get > 0) {
+      Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = 
rowCount.filter(_ > 0)))
+    } else {
+      // TODO: still fill the rowCount even if sizeInBytes is empty. Might 
break anything?
+      None
+    }
   }
 
   // Below is the key of table properties for storing Hive-generated statistics

http://git-wip-us.apache.org/repos/asf/spark/blob/7453ab02/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 13f06a2..3af8af0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -213,6 +213,27 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
     }
   }
 
+  test("SPARK-22745 - read Hive's statistics for partition") {
+    val tableName = "hive_stats_part_table"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY 
(ds STRING)")
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2017-01-01') SELECT * 
FROM src")
+      var partition = spark.sessionState.catalog
+        .getPartition(TableIdentifier(tableName), Map("ds" -> "2017-01-01"))
+
+      assert(partition.stats.get.sizeInBytes == 5812)
+      assert(partition.stats.get.rowCount.isEmpty)
+
+      hiveClient
+        .runSqlHive(s"ANALYZE TABLE $tableName PARTITION (ds='2017-01-01') 
COMPUTE STATISTICS")
+      partition = spark.sessionState.catalog
+        .getPartition(TableIdentifier(tableName), Map("ds" -> "2017-01-01"))
+
+      assert(partition.stats.get.sizeInBytes == 5812)
+      assert(partition.stats.get.rowCount == Some(500))
+    }
+  }
+
   test("SPARK-21079 - analyze table with location different than that of 
individual partitions") {
     val tableName = "analyzeTable_part"
     withTable(tableName) {
@@ -353,15 +374,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
       createPartition("2010-01-02", 11,
         "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
 
-      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE 
STATISTICS NOSCAN")
-
-      assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 
2000)
-      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 
2000)
-      assert(queryStats("2010-01-02", "10") === None)
-      assert(queryStats("2010-01-02", "11") === None)
-
-      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE 
STATISTICS NOSCAN")
-
       assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 
2000)
       assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 
2000)
       assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 
2000)
@@ -631,7 +643,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
        """.stripMargin)
     sql(s"INSERT INTO TABLE $tabName SELECT * FROM src")
     if (analyzedBySpark) sql(s"ANALYZE TABLE $tabName COMPUTE STATISTICS")
-    // This is to mimic the scenario in which Hive genrates statistics before 
we reading it
+    // This is to mimic the scenario in which Hive generates statistics before 
we read it
     if (analyzedByHive) hiveClient.runSqlHive(s"ANALYZE TABLE $tabName COMPUTE 
STATISTICS")
     val describeResult1 = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to