Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16636#discussion_r99458457
  
    --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
---
    @@ -455,4 +462,133 @@ private[spark] object HiveUtils extends Logging {
         case (decimal, DecimalType()) => decimal.toString
         case (other, tpe) if primitiveTypes contains tpe => other.toString
       }
    +
    +  /** Converts the native StructField to Hive's FieldSchema. */
    +  private def toHiveColumn(c: StructField): FieldSchema = {
    +    val typeString = if (c.metadata.contains(HiveUtils.hiveTypeString)) {
    +      c.metadata.getString(HiveUtils.hiveTypeString)
    +    } else {
    +      c.dataType.catalogString
    +    }
    +    new FieldSchema(c.name, typeString, c.getComment.orNull)
    +  }
    +
    +  /** Builds the native StructField from Hive's FieldSchema. */
    +  private def fromHiveColumn(hc: FieldSchema): StructField = {
    +    val columnType = try {
    +      CatalystSqlParser.parseDataType(hc.getType)
    +    } catch {
    +      case e: ParseException =>
    +        throw new SparkException("Cannot recognize hive type string: " + 
hc.getType, e)
    +    }
    +
    +    val metadata = new 
MetadataBuilder().putString(HiveUtils.hiveTypeString, hc.getType).build()
    +    val field = StructField(
    +      name = hc.getName,
    +      dataType = columnType,
    +      nullable = true,
    +      metadata = metadata)
    +    Option(hc.getComment).map(field.withComment).getOrElse(field)
    +  }
    +
    +  // TODO: merge this with HiveClientImpl#toHiveTable
    +  /** Converts the native table metadata representation format 
CatalogTable to Hive's Table. */
    +  def toHiveTable(catalogTable: CatalogTable): HiveTable = {
    +    // We start by constructing an API table as Hive performs several 
important transformations
    +    // internally when converting an API table to a QL table.
    +    val tTable = new org.apache.hadoop.hive.metastore.api.Table()
    +    tTable.setTableName(catalogTable.identifier.table)
    +    tTable.setDbName(catalogTable.database)
    +
    +    val tableParameters = new java.util.HashMap[String, String]()
    +    tTable.setParameters(tableParameters)
    +    catalogTable.properties.foreach { case (k, v) => 
tableParameters.put(k, v) }
    +
    +    tTable.setTableType(catalogTable.tableType match {
    +      case CatalogTableType.EXTERNAL => 
HiveTableType.EXTERNAL_TABLE.toString
    +      case CatalogTableType.MANAGED => HiveTableType.MANAGED_TABLE.toString
    +      case CatalogTableType.VIEW => HiveTableType.VIRTUAL_VIEW.toString
    +    })
    +
    +    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
    +    tTable.setSd(sd)
    +
    +    // Note: In Hive the schema and partition columns must be disjoint sets
    +    val (partCols, schema) = 
catalogTable.schema.map(toHiveColumn).partition { c =>
    +      catalogTable.partitionColumnNames.contains(c.getName)
    +    }
    +    sd.setCols(schema.asJava)
    +    tTable.setPartitionKeys(partCols.asJava)
    +
    +    catalogTable.storage.locationUri.foreach(sd.setLocation)
    +    catalogTable.storage.inputFormat.foreach(sd.setInputFormat)
    +    catalogTable.storage.outputFormat.foreach(sd.setOutputFormat)
    +
    +    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
    +    catalogTable.storage.serde.foreach(serdeInfo.setSerializationLib)
    +    sd.setSerdeInfo(serdeInfo)
    +
    +    val serdeParameters = new java.util.HashMap[String, String]()
    +    catalogTable.storage.properties.foreach { case (k, v) => 
serdeParameters.put(k, v) }
    +    serdeInfo.setParameters(serdeParameters)
    +
    +    new HiveTable(tTable)
    +  }
    +
    +  /**
    +   * Converts the native partition metadata representation format 
CatalogTablePartition to
    +   * Hive's Partition.
    +   */
    +  def toHivePartition(
    +      catalogTable: CatalogTable,
    +      hiveTable: HiveTable,
    +      partition: CatalogTablePartition): HivePartition = {
    +    val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
    +    tPartition.setDbName(catalogTable.database)
    +    tPartition.setTableName(catalogTable.identifier.table)
    +    
tPartition.setValues(catalogTable.partitionColumnNames.map(partition.spec(_)).asJava)
    +
    +    val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
    +    tPartition.setSd(sd)
    +
    +    // Note: In Hive the schema and partition columns must be disjoint sets
    +    val schema = catalogTable.schema.map(toHiveColumn).filter { c =>
    +      !catalogTable.partitionColumnNames.contains(c.getName)
    +    }
    +    sd.setCols(schema.asJava)
    +
    +    partition.storage.locationUri.foreach(sd.setLocation)
    +    partition.storage.inputFormat.foreach(sd.setInputFormat)
    +    partition.storage.outputFormat.foreach(sd.setOutputFormat)
    +
    +    val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
    +    sd.setSerdeInfo(serdeInfo)
    +    // maps and lists should be set only after all elements are ready (see 
HIVE-7975)
    +    partition.storage.serde.foreach(serdeInfo.setSerializationLib)
    +
    +    val serdeParameters = new java.util.HashMap[String, String]()
    +    catalogTable.storage.properties.foreach { case (k, v) => 
serdeParameters.put(k, v) }
    +    partition.storage.properties.foreach { case (k, v) => 
serdeParameters.put(k, v) }
    +    serdeInfo.setParameters(serdeParameters)
    +
    +    new HivePartition(hiveTable, tPartition)
    +  }
    +
    +  /**
    +   * Infers the schema for Hive serde tables and returns the CatalogTable 
with the inferred schema.
    +   * When the tables are data source tables or the schema already exists, 
returns the original
    +   * CatalogTable.
    +   */
    +  def inferSchema(table: CatalogTable): CatalogTable = {
    +    if (DDLUtils.isDatasourceTable(table) || table.schema.nonEmpty) {
    +      table
    +    } else {
    +      val hiveTable = toHiveTable(table)
    +      // Note: Hive separates partition columns and the schema, but for us 
the
    +      // partition columns are part of the schema
    +      val partCols = hiveTable.getPartCols.asScala.map(fromHiveColumn)
    +      val schema = 
StructType(hiveTable.getCols.asScala.map(fromHiveColumn) ++ partCols)
    +      table.copy(schema = schema, partitionColumnNames = 
partCols.map(_.name))
    --- End diff --
    
    I think it should not be changed. Let me remove it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to