Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14155#discussion_r75431426
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 ---
    @@ -233,226 +229,21 @@ case class CreateDataSourceTableAsSelectCommand(
           // We will use the schema of resolved.relation as the schema of the 
table (instead of
           // the schema of df). It is important since the nullability may be 
changed by the relation
           // provider (for example, see 
org.apache.spark.sql.parquet.DefaultSource).
    -      CreateDataSourceTableUtils.createDataSourceTable(
    -        sparkSession = sparkSession,
    -        tableIdent = tableIdent,
    -        schema = result.schema,
    -        partitionColumns = partitionColumns,
    -        bucketSpec = bucketSpec,
    -        provider = provider,
    -        options = optionsWithPath,
    -        isExternal = isExternal)
    +      val schema = result.schema
    +      val table = CatalogTable(
    +        identifier = tableIdent,
    +        tableType = if (isExternal) CatalogTableType.EXTERNAL else 
CatalogTableType.MANAGED,
    +        storage = CatalogStorageFormat.empty.copy(properties = 
optionsWithPath),
    +        schema = schema,
    +        provider = Some(provider),
    +        partitionColumnNames = partitionColumns,
    +        bucketSpec = bucketSpec
    +      )
    +      sessionState.catalog.createTable(table, ignoreIfExists = false)
         }
     
         // Refresh the cache of the table in the catalog.
         sessionState.catalog.refreshTable(tableIdent)
         Seq.empty[Row]
       }
     }
    -
    -
    -object CreateDataSourceTableUtils extends Logging {
    -
    -  val DATASOURCE_PREFIX = "spark.sql.sources."
    -  val DATASOURCE_PROVIDER = DATASOURCE_PREFIX + "provider"
    -  val DATASOURCE_WRITEJOBUUID = DATASOURCE_PREFIX + "writeJobUUID"
    -  val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
    -  val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
    -  val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
    -  val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
    -  val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + 
"numPartCols"
    -  val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + 
"numSortCols"
    -  val DATASOURCE_SCHEMA_NUMBUCKETS = DATASOURCE_SCHEMA_PREFIX + 
"numBuckets"
    -  val DATASOURCE_SCHEMA_NUMBUCKETCOLS = DATASOURCE_SCHEMA_PREFIX + 
"numBucketCols"
    -  val DATASOURCE_SCHEMA_PART_PREFIX = DATASOURCE_SCHEMA_PREFIX + "part."
    -  val DATASOURCE_SCHEMA_PARTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + 
"partCol."
    -  val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + 
"bucketCol."
    -  val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + 
"sortCol."
    -
    -  def createDataSourceTable(
    -      sparkSession: SparkSession,
    -      tableIdent: TableIdentifier,
    -      schema: StructType,
    -      partitionColumns: Array[String],
    -      bucketSpec: Option[BucketSpec],
    -      provider: String,
    -      options: Map[String, String],
    -      isExternal: Boolean): Unit = {
    -    val tableProperties = new mutable.HashMap[String, String]
    -    tableProperties.put(DATASOURCE_PROVIDER, provider)
    -
    -    // Serialized JSON schema string may be too long to be stored into a 
single metastore table
    -    // property. In this case, we split the JSON string and store each 
part as a separate table
    -    // property.
    -    val threshold = 
sparkSession.sessionState.conf.schemaStringLengthThreshold
    -    val schemaJsonString = schema.json
    -    // Split the JSON string.
    -    val parts = schemaJsonString.grouped(threshold).toSeq
    -    tableProperties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
    -    parts.zipWithIndex.foreach { case (part, index) =>
    -      tableProperties.put(s"$DATASOURCE_SCHEMA_PART_PREFIX$index", part)
    -    }
    -
    -    if (partitionColumns.length > 0) {
    -      tableProperties.put(DATASOURCE_SCHEMA_NUMPARTCOLS, 
partitionColumns.length.toString)
    -      partitionColumns.zipWithIndex.foreach { case (partCol, index) =>
    -        tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", 
partCol)
    -      }
    -    }
    -
    -    if (bucketSpec.isDefined) {
    -      val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = 
bucketSpec.get
    -
    -      tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETS, 
numBuckets.toString)
    -      tableProperties.put(DATASOURCE_SCHEMA_NUMBUCKETCOLS, 
bucketColumnNames.length.toString)
    -      bucketColumnNames.zipWithIndex.foreach { case (bucketCol, index) =>
    -        tableProperties.put(s"$DATASOURCE_SCHEMA_BUCKETCOL_PREFIX$index", 
bucketCol)
    -      }
    -
    -      if (sortColumnNames.nonEmpty) {
    -        tableProperties.put(DATASOURCE_SCHEMA_NUMSORTCOLS, 
sortColumnNames.length.toString)
    -        sortColumnNames.zipWithIndex.foreach { case (sortCol, index) =>
    -          tableProperties.put(s"$DATASOURCE_SCHEMA_SORTCOL_PREFIX$index", 
sortCol)
    -        }
    -      }
    -    }
    -
    -    val tableType = if (isExternal) {
    -      tableProperties.put("EXTERNAL", "TRUE")
    -      CatalogTableType.EXTERNAL
    -    } else {
    -      tableProperties.put("EXTERNAL", "FALSE")
    -      CatalogTableType.MANAGED
    -    }
    -
    -    val maybeSerDe = HiveSerDe.sourceToSerDe(provider, 
sparkSession.sessionState.conf)
    -    val dataSource =
    -      DataSource(
    -        sparkSession,
    -        userSpecifiedSchema = Some(schema),
    -        partitionColumns = partitionColumns,
    -        bucketSpec = bucketSpec,
    -        className = provider,
    -        options = options)
    -
    -    def newSparkSQLSpecificMetastoreTable(): CatalogTable = {
    -      CatalogTable(
    -        identifier = tableIdent,
    -        tableType = tableType,
    -        schema = new StructType,
    -        provider = Some(provider),
    -        storage = CatalogStorageFormat(
    -          locationUri = None,
    -          inputFormat = None,
    -          outputFormat = None,
    -          serde = None,
    -          compressed = false,
    -          properties = options
    -        ),
    -        properties = tableProperties.toMap)
    -    }
    -
    -    def newHiveCompatibleMetastoreTable(
    -        relation: HadoopFsRelation,
    -        serde: HiveSerDe): CatalogTable = {
    -      assert(partitionColumns.isEmpty)
    -      assert(relation.partitionSchema.isEmpty)
    -
    -      CatalogTable(
    -        identifier = tableIdent,
    -        tableType = tableType,
    -        storage = CatalogStorageFormat(
    -          locationUri = 
Some(relation.location.paths.map(_.toUri.toString).head),
    -          inputFormat = serde.inputFormat,
    -          outputFormat = serde.outputFormat,
    -          serde = serde.serde,
    -          compressed = false,
    -          properties = options
    -        ),
    -        schema = relation.schema,
    -        provider = Some(provider),
    -        properties = tableProperties.toMap,
    -        viewText = None)
    -    }
    -
    -    // TODO: Support persisting partitioned data source relations in Hive 
compatible format
    -    val qualifiedTableName = tableIdent.quotedString
    -    val skipHiveMetadata = options.getOrElse("skipHiveMetadata", 
"false").toBoolean
    -    val resolvedRelation = dataSource.resolveRelation(checkPathExist = 
false)
    -    val (hiveCompatibleTable, logMessage) = (maybeSerDe, resolvedRelation) 
match {
    -      case _ if skipHiveMetadata =>
    -        val message =
    -          s"Persisting partitioned data source relation 
$qualifiedTableName into " +
    -            "Hive metastore in Spark SQL specific format, which is NOT 
compatible with Hive."
    -        (None, message)
    -
    -      case (Some(serde), relation: HadoopFsRelation) if 
relation.location.paths.length == 1 &&
    -        relation.partitionSchema.isEmpty && relation.bucketSpec.isEmpty =>
    -        val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
    -        val message =
    -          s"Persisting data source relation $qualifiedTableName with a 
single input path " +
    -            s"into Hive metastore in Hive compatible format. Input path: " 
+
    -            s"${relation.location.paths.head}."
    -        (Some(hiveTable), message)
    -
    -      case (Some(serde), relation: HadoopFsRelation) if 
relation.partitionSchema.nonEmpty =>
    --- End diff --
    
    Unable to find this case in the latest code changes. Does that mean we have 
already supported persisting partitioned data source relations in Hive 
compatible format?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to