Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18421#discussion_r128269600
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
    @@ -1028,25 +994,115 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
         currentFullPath
       }
     
    +  private def statsToProperties(
    +      stats: CatalogStatistics,
    +      schema: StructType): Map[String, String] = {
    +
    +    var statsProperties: Map[String, String] =
    +      Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
    +    if (stats.rowCount.isDefined) {
    +      statsProperties += STATISTICS_NUM_ROWS -> 
stats.rowCount.get.toString()
    +    }
    +
    +    val colNameTypeMap: Map[String, DataType] =
    +      schema.fields.map(f => (f.name, f.dataType)).toMap
    +    stats.colStats.foreach { case (colName, colStat) =>
    +      colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, 
v) =>
    +        statsProperties += (columnStatKeyPropName(colName, k) -> v)
    +      }
    +    }
    +
    +    statsProperties
    +  }
    +
    +  private def statsFromProperties(
    +      properties: Map[String, String],
    +      table: String,
    +      schema: StructType): Option[CatalogStatistics] = {
    +
    +    val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
    +    if (statsProps.isEmpty) {
    +      None
    +    } else {
    +
    +      val colStats = new mutable.HashMap[String, ColumnStat]
    +
    +      // For each column, recover its column stats. Note that this is 
currently a O(n^2) operation,
    +      // but given the number of columns it usually not enormous, this is 
probably OK as a start.
    +      // If we want to map this a linear operation, we'd need a stronger 
contract between the
    +      // naming convention used for serialization.
    +      schema.foreach { field =>
    +        if (statsProps.contains(columnStatKeyPropName(field.name, 
ColumnStat.KEY_VERSION))) {
    +          // If "version" field is defined, then the column stat is 
defined.
    +          val keyPrefix = columnStatKeyPropName(field.name, "")
    +          val colStatMap = 
statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
    +            (k.drop(keyPrefix.length), v)
    +          }
    +
    +          ColumnStat.fromMap(table, field, colStatMap).foreach {
    +            colStat => colStats += field.name -> colStat
    +          }
    +        }
    +      }
    +
    +      Some(CatalogStatistics(
    +        sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)),
    +        rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
    +        colStats = colStats.toMap))
    +    }
    +  }
    +
       override def alterPartitions(
           db: String,
           table: String,
           newParts: Seq[CatalogTablePartition]): Unit = withClient {
         val lowerCasedParts = newParts.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
    +
    +    val rawTable = getRawTable(db, table)
    +
    +    // convert partition statistics to properties so that we can persist 
them through hive api
    +    val withStatsProps = lowerCasedParts.map(p => {
    +      if (p.stats.isDefined) {
    +        val statsProperties = statsToProperties(p.stats.get, 
rawTable.schema)
    +        p.copy(parameters = p.parameters ++ statsProperties)
    +      } else {
    +        p
    +      }
    +    })
    +
         // Note: Before altering table partitions in Hive, you *must* set the 
current database
         // to the one that contains the table of interest. Otherwise you will 
end up with the
         // most helpful error message ever: "Unable to alter partition. alter 
is not possible."
         // See HIVE-2742 for more detail.
         client.setCurrentDatabase(db)
    -    client.alterPartitions(db, table, lowerCasedParts)
    +    client.alterPartitions(db, table, withStatsProps)
       }
     
       override def getPartition(
           db: String,
           table: String,
           spec: TablePartitionSpec): CatalogTablePartition = withClient {
         val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
    -    part.copy(spec = restorePartitionSpec(part.spec, getTable(db, 
table).partitionColumnNames))
    +    restorePartitionMetadata(part, getTable(db, table))
    +  }
    +
    +  private def restorePartitionMetadata(
    +      partition: CatalogTablePartition,
    +      table: CatalogTable): CatalogTablePartition = {
    +    val restoredSpec = restorePartitionSpec(partition.spec, 
table.partitionColumnNames)
    +
    +    // construct Spark's statistics from information in Hive metastore
    --- End diff --
    
    Here, we only respect Spark own statistics. Please also clarify it here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to