Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16313#discussion_r92960480
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 ---
    @@ -157,39 +156,71 @@ case class CreateDataSourceTableAsSelectCommand(
               // Since the table already exists and the save mode is Ignore, 
we will just return.
               return Seq.empty[Row]
             case SaveMode.Append =>
    +          val existingTable = 
sessionState.catalog.getTableMetadata(tableIdentWithDB)
    +
    +          if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
    +            throw new AnalysisException(s"Saving data in the Hive serde 
table $tableName is " +
    +              s"not supported yet. Please use the insertInto() API as an 
alternative.")
    +          }
    +
               // Check if the specified data source match the data source of 
the existing table.
    -          val existingProvider = DataSource.lookupDataSource(provider)
    +          val existingProvider = 
DataSource.lookupDataSource(existingTable.provider.get)
    +          val specifiedProvider = 
DataSource.lookupDataSource(table.provider.get)
               // TODO: Check that options from the resolved relation match the 
relation that we are
               // inserting into (i.e. using the same compression).
    +          if (existingProvider != specifiedProvider) {
    +            throw new AnalysisException(s"The format of the existing table 
$tableName is " +
    +              s"`${existingProvider.getSimpleName}`. It doesn't match the 
specified format " +
    +              s"`${specifiedProvider.getSimpleName}`.")
    +          }
    +
    +          if (query.schema.length != existingTable.schema.length) {
    +            throw new AnalysisException(
    +              s"The column number of the existing table $tableName" +
    +                s"(${existingTable.schema.catalogString}) doesn't match 
the data schema" +
    +                s"(${query.schema.catalogString})")
    +          }
     
    -          // Pass a table identifier with database part, so that 
`lookupRelation` won't get temp
    -          // views unexpectedly.
    -          
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) 
match {
    -            case l @ LogicalRelation(_: InsertableRelation | _: 
HadoopFsRelation, _, _) =>
    -              // check if the file formats match
    -              l.relation match {
    -                case r: HadoopFsRelation if r.fileFormat.getClass != 
existingProvider =>
    -                  throw new AnalysisException(
    -                    s"The file format of the existing table $tableName is 
" +
    -                      s"`${r.fileFormat.getClass.getName}`. It doesn't 
match the specified " +
    -                      s"format `$provider`")
    -                case _ =>
    -              }
    -              if (query.schema.size != l.schema.size) {
    -                throw new AnalysisException(
    -                  s"The column number of the existing schema[${l.schema}] 
" +
    -                    s"doesn't match the data schema[${query.schema}]'s")
    -              }
    -              existingSchema = Some(l.schema)
    -            case s: SimpleCatalogRelation if 
DDLUtils.isDatasourceTable(s.metadata) =>
    -              existingSchema = Some(s.metadata.schema)
    -            case c: CatalogRelation if c.catalogTable.provider == 
Some(DDLUtils.HIVE_PROVIDER) =>
    -              throw new AnalysisException("Saving data in the Hive serde 
table " +
    -                s"${c.catalogTable.identifier} is not supported yet. 
Please use the " +
    -                "insertInto() API as an alternative..")
    -            case o =>
    -              throw new AnalysisException(s"Saving data in ${o.toString} 
is not supported.")
    +          val resolver = sessionState.conf.resolver
    +          val tableCols = existingTable.schema.map(_.name)
    +
    +          reorderedColumns = Some(existingTable.schema.map { f =>
    +            query.resolve(Seq(f.name), resolver).getOrElse {
    +              val inputColumns = query.schema.map(_.name).mkString(", ")
    +              throw new AnalysisException(
    +                s"cannot resolve '${f.name}' given input columns: 
[$inputColumns]")
    +            }
    +          })
    +
    +          // Check if the specified partition columns match the existing 
table.
    +          val specifiedPartCols = CatalogUtils.normalizePartCols(
    +            tableName, tableCols, table.partitionColumnNames, resolver)
    +          if (specifiedPartCols != existingTable.partitionColumnNames) {
    +            throw new AnalysisException(
    +              s"""
    +                |Specified partitioning does not match the existing table 
$tableName.
    +                |Specified partition columns: 
[${specifiedPartCols.mkString(", ")}]
    +                |Existing partition columns: 
[${existingTable.partitionColumnNames.mkString(", ")}]
    +              """.stripMargin)
    +          }
    +
    +          // Check if the specified bucketing match the existing table.
    +          val specifiedBucketSpec = table.bucketSpec.map { bucketSpec =>
    +            CatalogUtils.normalizeBucketSpec(tableName, tableCols, 
bucketSpec, resolver)
    +          }
    +          if (specifiedBucketSpec != existingTable.bucketSpec) {
    +            val specifiedBucketString =
    +              specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
    +            val existingBucketString =
    +              existingTable.bucketSpec.map(_.toString).getOrElse("not 
bucketed")
    +            throw new AnalysisException(
    +              s"""
    +                |Specified bucketing does not match the existing table 
$tableName.
    --- End diff --
    
    Nit: The same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to