Repository: spark Updated Branches: refs/heads/master 682eb4f2e -> 7453ab024
[SPARK-22745][SQL] read partition stats from Hive ## What changes were proposed in this pull request? Currently Spark can read table stats (e.g. `totalSize, numRows`) from Hive, we can also support to read partition stats from Hive using the same logic. ## How was this patch tested? Added a new test case and modified an existing test case. Author: Zhenhua Wang <wangzhen...@huawei.com> Author: Zhenhua Wang <wzh_...@163.com> Closes #19932 from wzhfy/read_hive_partition_stats. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7453ab02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7453ab02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7453ab02 Branch: refs/heads/master Commit: 7453ab0243dc04db1b586b0e5f588f9cdc9f72dd Parents: 682eb4f Author: Zhenhua Wang <wangzhen...@huawei.com> Authored: Wed Dec 13 16:27:29 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Dec 13 16:27:29 2017 +0800 ---------------------------------------------------------------------- .../spark/sql/hive/HiveExternalCatalog.scala | 6 +- .../spark/sql/hive/client/HiveClientImpl.scala | 66 +++++++++++--------- .../apache/spark/sql/hive/StatisticsSuite.scala | 32 +++++++--- 3 files changed, 62 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7453ab02/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 44e680d..632e3e0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -668,7 +668,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val schema = restoreTableMetadata(rawTable).schema // convert table statistics to properties so that we can persist them through hive client - var statsProperties = + val statsProperties = if (stats.isDefined) { statsToProperties(stats.get, schema) } else { @@ -1098,14 +1098,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val schema = restoreTableMetadata(rawTable).schema // convert partition statistics to properties so that we can persist them through hive api - val withStatsProps = lowerCasedParts.map(p => { + val withStatsProps = lowerCasedParts.map { p => if (p.stats.isDefined) { val statsProperties = statsToProperties(p.stats.get, schema) p.copy(parameters = p.parameters ++ statsProperties) } else { p } - }) + } // Note: Before altering table partitions in Hive, you *must* set the current database // to the one that contains the table of interest. Otherwise you will end up with the http://git-wip-us.apache.org/repos/asf/spark/blob/7453ab02/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 08eb5c7..7233944 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -414,32 +414,6 @@ private[hive] class HiveClientImpl( } val comment = properties.get("comment") - // Here we are reading statistics from Hive. - // Note that this statistics could be overridden by Spark's statistics if that's available. - val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_)) - val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_)) - val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)) - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. - // Currently, only totalSize, rawDataSize, and rowCount are used to build the field `stats` - // TODO: stats should include all the other two fields (`numFiles` and `numPartitions`). - // (see StatsSetupConst in Hive) - val stats = - // When table is external, `totalSize` is always zero, which will influence join strategy. - // So when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero, - // return None. - // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always - // zero after INSERT command. So they are used here only if they are larger than zero. - if (totalSize.isDefined && totalSize.get > 0L) { - Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0))) - } else if (rawDataSize.isDefined && rawDataSize.get > 0) { - Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount.filter(_ > 0))) - } else { - // TODO: still fill the rowCount even if sizeInBytes is empty. Might break anything? - None - } - CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -478,7 +452,7 @@ private[hive] class HiveClientImpl( // For EXTERNAL_TABLE, the table properties has a particular field "EXTERNAL". This is added // in the function toHiveTable. properties = filteredProperties, - stats = stats, + stats = readHiveStats(properties), comment = comment, // In older versions of Spark(before 2.2.0), we expand the view original text and store // that into `viewExpandedText`, and that should be used in view resolution. So we get @@ -1011,6 +985,11 @@ private[hive] object HiveClientImpl { */ def fromHivePartition(hp: HivePartition): CatalogTablePartition = { val apiPartition = hp.getTPartition + val properties: Map[String, String] = if (hp.getParameters != null) { + hp.getParameters.asScala.toMap + } else { + Map.empty + } CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( @@ -1021,8 +1000,37 @@ private[hive] object HiveClientImpl { compressed = apiPartition.getSd.isCompressed, properties = Option(apiPartition.getSd.getSerdeInfo.getParameters) .map(_.asScala.toMap).orNull), - parameters = - if (hp.getParameters() != null) hp.getParameters().asScala.toMap else Map.empty) + parameters = properties, + stats = readHiveStats(properties)) + } + + /** + * Reads statistics from Hive. + * Note that this statistics could be overridden by Spark's statistics if that's available. + */ + private def readHiveStats(properties: Map[String, String]): Option[CatalogStatistics] = { + val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE).map(BigInt(_)) + val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE).map(BigInt(_)) + val rowCount = properties.get(StatsSetupConst.ROW_COUNT).map(BigInt(_)) + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. + // Currently, only totalSize, rawDataSize, and rowCount are used to build the field `stats` + // TODO: stats should include all the other two fields (`numFiles` and `numPartitions`). + // (see StatsSetupConst in Hive) + + // When table is external, `totalSize` is always zero, which will influence join strategy. + // So when `totalSize` is zero, use `rawDataSize` instead. When `rawDataSize` is also zero, + // return None. + // In Hive, when statistics gathering is disabled, `rawDataSize` and `numRows` is always + // zero after INSERT command. So they are used here only if they are larger than zero. + if (totalSize.isDefined && totalSize.get > 0L) { + Some(CatalogStatistics(sizeInBytes = totalSize.get, rowCount = rowCount.filter(_ > 0))) + } else if (rawDataSize.isDefined && rawDataSize.get > 0) { + Some(CatalogStatistics(sizeInBytes = rawDataSize.get, rowCount = rowCount.filter(_ > 0))) + } else { + // TODO: still fill the rowCount even if sizeInBytes is empty. Might break anything? + None + } } // Below is the key of table properties for storing Hive-generated statistics http://git-wip-us.apache.org/repos/asf/spark/blob/7453ab02/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 13f06a2..3af8af0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -213,6 +213,27 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("SPARK-22745 - read Hive's statistics for partition") { + val tableName = "hive_stats_part_table" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2017-01-01') SELECT * FROM src") + var partition = spark.sessionState.catalog + .getPartition(TableIdentifier(tableName), Map("ds" -> "2017-01-01")) + + assert(partition.stats.get.sizeInBytes == 5812) + assert(partition.stats.get.rowCount.isEmpty) + + hiveClient + .runSqlHive(s"ANALYZE TABLE $tableName PARTITION (ds='2017-01-01') COMPUTE STATISTICS") + partition = spark.sessionState.catalog + .getPartition(TableIdentifier(tableName), Map("ds" -> "2017-01-01")) + + assert(partition.stats.get.sizeInBytes == 5812) + assert(partition.stats.get.rowCount == Some(500)) + } + } + test("SPARK-21079 - analyze table with location different than that of individual partitions") { val tableName = "analyzeTable_part" withTable(tableName) { @@ -353,15 +374,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto createPartition("2010-01-02", 11, "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") - - assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) - assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) - assert(queryStats("2010-01-02", "10") === None) - assert(queryStats("2010-01-02", "11") === None) - - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN") - assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000) @@ -631,7 +643,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto """.stripMargin) sql(s"INSERT INTO TABLE $tabName SELECT * FROM src") if (analyzedBySpark) sql(s"ANALYZE TABLE $tabName COMPUTE STATISTICS") - // This is to mimic the scenario in which Hive genrates statistics before we reading it + // This is to mimic the scenario in which Hive generates statistics before we read it if (analyzedByHive) hiveClient.runSqlHive(s"ANALYZE TABLE $tabName COMPUTE STATISTICS") val describeResult1 = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org