xushiyan commented on code in PR #6213:
URL: https://github.com/apache/hudi/pull/6213#discussion_r932726718


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala:
##########
@@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig {
    * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession,
-      table: CatalogTable,
-      query: LogicalPlan,
-      insertPartitions: Map[String, Option[String]],
-      overwrite: Boolean,
-      refreshTable: Boolean = true,
-      extraOptions: Map[String, String] = Map.empty): Boolean = {
-
-    val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, 
overwrite, insertPartitions, extraOptions)
-
-    val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+          table: CatalogTable,
+          query: LogicalPlan,
+          partitionSpec: Map[String, Option[String]],
+          overwrite: Boolean,
+          refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
+    val catalogTable = new HoodieCatalogTable(sparkSession, table)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
overwrite, partitionSpec, extraOptions)
+
+    // NOTE: In case of partitioned table we override specified "overwrite" 
parameter
+    //       to instead append to the dataset
+    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
       SaveMode.Overwrite
     } else {
-      // for insert into or insert overwrite partition we use append mode.
       SaveMode.Append
     }
-    val conf = sparkSession.sessionState.conf
-    val alignedQuery = alignOutputFields(query, hoodieCatalogTable, 
insertPartitions, conf)
-    // If we create dataframe using the Dataset.ofRows(sparkSession, 
alignedQuery),
-    // The nullable attribute of fields will lost.
-    // In order to pass the nullable attribute to the inputDF, we specify the 
schema
-    // of the rdd.
-    val inputDF = sparkSession.createDataFrame(
-      Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
-    val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
inputDF)._1
-    if (success) {
-      if (refreshTable) {
-        sparkSession.catalog.refreshTable(table.identifier.unquotedString)
-      }
-      true
-    } else {
-      false
+
+    val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, 
sparkSession.sessionState.conf)
+
+    val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
Dataset.ofRows(sparkSession, alignedQuery))
+
+    if (success && refreshTable) {
+      sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
+
+    success
   }
 
   /**
-   * Aligned the type and name of query's output fields with the result 
table's fields.
-   * @param query The insert query which to aligned.
-   * @param hoodieCatalogTable The result hoodie catalog table.
-   * @param insertPartitions The insert partition map.
-   * @param conf The SQLConf.
-   * @return
+   * Align provided [[query]]'s output with the expected [[catalogTable]] 
schema by
+   *
+   * <ul>
+   *   <li>Performing type coercion (casting corresponding outputs, where 
needed)</li>
+   *   <li>Adding aliases (matching column names) to corresponding outputs 
</li>
+   * </ul>
+   *
+   * @param query target query whose output is to be inserted
+   * @param catalogTable catalog table
+   * @param partitionsSpec partition spec specifying static/dynamic partition 
values
+   * @param conf Spark's [[SQLConf]]
    */
-  private def alignOutputFields(
-    query: LogicalPlan,
-    hoodieCatalogTable: HoodieCatalogTable,
-    insertPartitions: Map[String, Option[String]],
-    conf: SQLConf): LogicalPlan = {
-
-    val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
-    val staticPartitionValues = insertPartitions.filter(p => 
p._2.isDefined).mapValues(_.get)
-    assert(staticPartitionValues.isEmpty ||
-      insertPartitions.size == targetPartitionSchema.size,
-      s"Required partition columns is: ${targetPartitionSchema.json}, Current 
input partitions " +
-        s"is: ${staticPartitionValues.mkString("," + "")}")
-
-    val queryOutputWithoutMetaFields = removeMetaFields(query.output)
-    assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
-      == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
-      s"Required select columns count: 
${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
-        s"Current select columns(including static partition column) count: " +
-        s"${staticPartitionValues.size + 
queryOutputWithoutMetaFields.size},columns: " +
-        s"(${(queryOutputWithoutMetaFields.map(_.name) ++ 
staticPartitionValues.keys).mkString(",")})")
-
-    val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
-      hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name)))
-    val dataProjectsWithoutMetaFields = 
getTableFieldsAlias(queryOutputWithoutMetaFields,
-      dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
-    val partitionProjects = targetPartitionSchema.fields.filter(f => 
staticPartitionValues.contains(f.name))
-      .map(f => {
-        val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
-          s"Missing static partition value for: ${f.name}")
-        val castAttr = castIfNeeded(Literal.create(staticPartitionValue), 
f.dataType, conf)
-        Alias(castAttr, f.name)()
-      })
+  private def alignQueryOutput(query: LogicalPlan,
+                               catalogTable: HoodieCatalogTable,
+                               partitionsSpec: Map[String, Option[String]],
+                               conf: SQLConf): LogicalPlan = {
+
+    val targetPartitionSchema = catalogTable.partitionSchema
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+    validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+    // Make sure we strip out meta-fields from the incoming dataset (these 
will have to be discarded anyway)
+    val cleanedQuery = stripMetaFields(query)
+    // To validate and align properly output of the query, we simply filter 
out partition columns with already
+    // provided static values from the table's schema
+    //
+    // NOTE: This is a crucial step: since coercion might rely on either of a) 
name-based or b) positional-based
+    //       matching it's important to strip out partition columns, having 
static values provided in the partition spec,
+    //       since such columns wouldn't be otherwise specified w/in the query 
itself and therefore couldn't be matched
+    //       positionally for example
+    val expectedQueryColumns = 
catalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name))
+    val coercedQueryOutput = 
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, 
catalogTable, conf)
+
+    val staticPartitionValuesExprs = 
createStaticPartitionValuesExpressions(staticPartitionValues, 
targetPartitionSchema, conf)
+
+    Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, 
coercedQueryOutput)
+  }
+
+  private def coerceQueryOutputColumns(expectedSchema: StructType,
+                                       query: LogicalPlan,
+                                       catalogTable: HoodieCatalogTable,
+                                       conf: SQLConf): LogicalPlan = {
+    val planUtils = sparkAdapter.getCatalystPlanUtils
+    try {
+      planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = true, conf)
+    } catch {
+      // NOTE: In case matching by name didn't match the query output, we will 
attempt positional matching
+      case ae: AnalysisException if ae.getMessage().startsWith("Cannot write 
incompatible data to table") =>
+        planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = false, conf)
+    }
+  }
 
-    Project(dataProjectsWithoutMetaFields ++ partitionProjects, query)
+  private def validate(queryOutputSchema: StructType, partitionsSpec: 
Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = {
+    // Validate that partition-spec has proper format (it could be empty if 
all of the partition values are dynamic,
+    // ie there are no static partition-values specified)
+    if (partitionsSpec.nonEmpty && partitionsSpec.size != 
catalogTable.partitionSchema.size) {

Review Comment:
   `org.apache.hudi.common.util.ValidationUtils`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to