Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22514#discussion_r238707304
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
    @@ -95,9 +77,127 @@ case class CreateHiveTableAsSelectCommand(
         Seq.empty[Row]
       }
     
    +  // Returns `DataWritingCommand` used to write data when the table exists.
    +  def writingCommandForExistingTable(
    +    catalog: SessionCatalog,
    +    tableDesc: CatalogTable): DataWritingCommand
    +
    +  // Returns `DataWritingCommand` used to write data when the table 
doesn't exist.
    +  def writingCommandForNewTable(
    +    catalog: SessionCatalog,
    +    tableDesc: CatalogTable): DataWritingCommand
    +
       override def argString: String = {
         s"[Database:${tableDesc.database}, " +
         s"TableName: ${tableDesc.identifier.table}, " +
         s"InsertIntoHiveTable]"
       }
     }
    +
    +/**
    + * Create table and insert the query result into it.
    + *
    + * @param tableDesc the table description, which may contain serde, 
storage handler etc.
    + * @param query the query whose result will be insert into the new relation
    + * @param mode SaveMode
    + */
    +case class CreateHiveTableAsSelectCommand(
    +    tableDesc: CatalogTable,
    +    query: LogicalPlan,
    +    outputColumnNames: Seq[String],
    +    mode: SaveMode)
    +  extends CreateHiveTableAsSelectBase {
    +
    +  override def writingCommandForExistingTable(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): DataWritingCommand = {
    +    InsertIntoHiveTable(
    +      tableDesc,
    +      Map.empty,
    +      query,
    +      overwrite = false,
    +      ifPartitionNotExists = false,
    +      outputColumnNames = outputColumnNames)
    +  }
    +
    +  override def writingCommandForNewTable(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): DataWritingCommand = {
    +    // For CTAS, there is no static partition values to insert.
    +    val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
    +    InsertIntoHiveTable(
    +      tableDesc,
    +      partition,
    +      query,
    +      overwrite = true,
    +      ifPartitionNotExists = false,
    +      outputColumnNames = outputColumnNames)
    +  }
    +}
    +
    +/**
    + * Create table and insert the query result into it. This creates Hive 
table but inserts
    + * the query result into it by using data source.
    + *
    + * @param tableDesc the table description, which may contain serde, 
storage handler etc.
    + * @param query the query whose result will be insert into the new relation
    + * @param mode SaveMode
    + */
    +case class OptimizedCreateHiveTableAsSelectCommand(
    +    tableDesc: CatalogTable,
    +    query: LogicalPlan,
    +    outputColumnNames: Seq[String],
    +    mode: SaveMode)
    +  extends CreateHiveTableAsSelectBase {
    +
    +  private def getHadoopRelation(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): HadoopFsRelation = {
    +    val metastoreCatalog = 
catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
    +    val hiveTable = DDLUtils.readHiveTable(tableDesc)
    +
    +    metastoreCatalog.convert(hiveTable) match {
    +      case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
    +      case _ => throw new AnalysisException(s"$tableIdentifier should be 
converted to " +
    +        "HadoopFsRelation.")
    +    }
    +  }
    +
    +  override def writingCommandForExistingTable(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): DataWritingCommand = {
    +    val hadoopRelation = getHadoopRelation(catalog, tableDesc)
    +    InsertIntoHadoopFsRelationCommand(
    +      hadoopRelation.location.rootPaths.head,
    +      Map.empty, // We don't support to convert partitioned table.
    +      false,
    +      Seq.empty, // We don't support to convert partitioned table.
    +      hadoopRelation.bucketSpec,
    +      hadoopRelation.fileFormat,
    +      hadoopRelation.options,
    +      query,
    +      mode,
    +      Some(tableDesc),
    +      Some(hadoopRelation.location),
    +      query.output.map(_.name))
    +  }
    +
    +  override def writingCommandForNewTable(
    +      catalog: SessionCatalog,
    +      tableDesc: CatalogTable): DataWritingCommand = {
    +    val hadoopRelation = getHadoopRelation(catalog, tableDesc)
    +    InsertIntoHadoopFsRelationCommand(
    +      hadoopRelation.location.rootPaths.head,
    +      Map.empty, // We don't support to convert partitioned table.
    +      false,
    +      Seq.empty, // We don't support to convert partitioned table.
    +      hadoopRelation.bucketSpec,
    +      hadoopRelation.fileFormat,
    +      hadoopRelation.options,
    +      query,
    +      SaveMode.Overwrite,
    --- End diff --
    
    ok :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to