Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22514#discussion_r238706120 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -95,9 +77,127 @@ case class CreateHiveTableAsSelectCommand( Seq.empty[Row] } + // Returns `DataWritingCommand` used to write data when the table exists. + def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand + + // Returns `DataWritingCommand` used to write data when the table doesn't exist. + def writingCommandForNewTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand + override def argString: String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" } } + +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( + tableDesc: CatalogTable, + query: LogicalPlan, + outputColumnNames: Seq[String], + mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { + InsertIntoHiveTable( + tableDesc, + Map.empty, + query, + overwrite = false, + ifPartitionNotExists = false, + outputColumnNames = outputColumnNames) + } + + override def writingCommandForNewTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { + // For CTAS, there is no static partition values to insert. + val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap + InsertIntoHiveTable( + tableDesc, + partition, + query, + overwrite = true, + ifPartitionNotExists = false, + outputColumnNames = outputColumnNames) + } +} + +/** + * Create table and insert the query result into it. This creates Hive table but inserts + * the query result into it by using data source. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class OptimizedCreateHiveTableAsSelectCommand( + tableDesc: CatalogTable, + query: LogicalPlan, + outputColumnNames: Seq[String], + mode: SaveMode) + extends CreateHiveTableAsSelectBase { + + private def getHadoopRelation( + catalog: SessionCatalog, + tableDesc: CatalogTable): HadoopFsRelation = { + val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + val hiveTable = DDLUtils.readHiveTable(tableDesc) + + metastoreCatalog.convert(hiveTable) match { + case LogicalRelation(t: HadoopFsRelation, _, _, _) => t + case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " + + "HadoopFsRelation.") + } + } + + override def writingCommandForExistingTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { + val hadoopRelation = getHadoopRelation(catalog, tableDesc) + InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + false, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.options, + query, + mode, + Some(tableDesc), + Some(hadoopRelation.location), + query.output.map(_.name)) + } + + override def writingCommandForNewTable( + catalog: SessionCatalog, + tableDesc: CatalogTable): DataWritingCommand = { + val hadoopRelation = getHadoopRelation(catalog, tableDesc) + InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + false, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.options, + query, + SaveMode.Overwrite, --- End diff -- if the only difference is this `mode`, maybe we can further deduplicate the code.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org