This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cfd0c1ee34 [HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS 
performance gap (#6213)
cfd0c1ee34 is described below

commit cfd0c1ee34460332053491fd1e68c2607c14e958
Author: Alexey Kudinkin <ale...@infinilake.com>
AuthorDate: Thu Jul 28 15:36:03 2022 -0700

    [HUDI-4081][HUDI-4472] Addressing Spark SQL vs Spark DS performance gap 
(#6213)
---
 .../spark/sql/HoodieCatalystPlansUtils.scala       |  23 +-
 .../hudi/common/table/TableSchemaResolver.java     |  21 ++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  87 ++++----
 .../spark/sql/hudi/HoodieSqlCommonUtils.scala      |   4 +-
 .../command/InsertIntoHoodieTableCommand.scala     | 243 +++++++++++++--------
 .../spark/sql/hudi/HoodieSparkSqlTestBase.scala    |  18 +-
 .../apache/spark/sql/hudi/TestDeleteTable.scala    |  39 +++-
 .../apache/spark/sql/hudi/TestInsertTable.scala    |  31 +--
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala |   2 +-
 .../apache/spark/sql/hudi/TestShowPartitions.scala |  20 +-
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  |  29 ++-
 .../apache/spark/sql/hudi/TestUpdateTable.scala    |  28 ++-
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  |  12 +-
 .../spark/sql/HoodieSpark3CatalystPlanUtils.scala  |  12 +-
 14 files changed, 381 insertions(+), 188 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index c277dcb3e6..7566458b1b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -19,12 +19,33 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.internal.SQLConf
 
 trait HoodieCatalystPlansUtils {
 
+  /**
+   * Resolves output of the provided [[query]] against the [[expected]] list 
of [[Attribute]],
+   * and returns new (reshaped) instance of the [[LogicalPlan]]
+   *
+   * @param tableName used purely for more human-readable error output (if any)
+   * @param expected list of attributes output of the query has to adhere to
+   * @param query query whose output has to be reshaped
+   * @param byName whether the matching should occur by-name or positionally
+   * @param conf instance of [[SQLConf]]
+   * @return [[LogicalPlan]] which output is aligned to match to that of 
[[expected]]
+   */
+  def resolveOutputColumns(tableName: String,
+                           expected: Seq[Attribute],
+                           query: LogicalPlan,
+                           byName: Boolean,
+                           conf: SQLConf): LogicalPlan
+
+  /**
+   * Instantiates an [[Explain]] command
+   */
   def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 5fc989e2e5..4ada97e35c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -316,6 +316,9 @@ public class TableSchemaResolver {
    * @param oldSchema Older schema to check.
    * @param newSchema Newer schema to check.
    * @return True if the schema validation is successful
+   *
+   * TODO revisit this method: it's implemented incorrectly as it might be 
applying different criteria
+   *      to top-level record and nested record (for ex, if that nested record 
is contained w/in an array)
    */
   public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) 
{
     if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == 
Schema.Type.RECORD) {
@@ -366,13 +369,31 @@ public class TableSchemaResolver {
     return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new 
Schema.Parser().parse(newSchema));
   }
 
+  /**
+   * Returns table's latest Avro {@link Schema} iff table is non-empty (ie 
there's at least
+   * a single commit)
+   *
+   * This method differs from {@link #getTableAvroSchema(boolean)} in that it 
won't fallback
+   * to use table's schema used at creation
+   */
+  public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean 
includeMetadataFields) throws Exception {
+    if (metaClient.isTimelineNonEmpty()) {
+      return Option.of(getTableAvroSchemaInternal(includeMetadataFields, 
Option.empty()));
+    }
+
+    return Option.empty();
+  }
+
   /**
    * Get latest schema either from incoming schema or table schema.
    * @param writeSchema incoming batch's write schema.
    * @param convertTableSchemaToAddNamespace {@code true} if table schema 
needs to be converted. {@code false} otherwise.
    * @param converterFn converter function to be called over table schema (to 
add namespace may be). Each caller can decide if any conversion is required.
    * @return the latest schema.
+   *
+   * @deprecated will be removed (HUDI-4472)
    */
+  @Deprecated
   public Schema getLatestSchema(Schema writeSchema, boolean 
convertTableSchemaToAddNamespace,
       Function1<Schema, Schema> converterFn) {
     Schema latestSchema = writeSchema;
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 7324a5ca5b..167001863d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
 import org.apache.hudi.HoodieWriterUtils._
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
@@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.util.{CommitUtils, StringUtils}
+import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieException
@@ -72,8 +72,7 @@ object HoodieSparkSqlWriter {
             hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty,
             hoodieWriteClient: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
             asyncCompactionTriggerFn: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = 
Option.empty,
-            asyncClusteringTriggerFn: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty
-           )
+            asyncClusteringTriggerFn: 
Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = 
Option.empty)
   : (Boolean, common.util.Option[String], common.util.Option[String], 
common.util.Option[String],
     SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
 
@@ -241,39 +240,49 @@ object HoodieSparkSqlWriter {
             sparkContext.getConf.registerKryoClasses(
               Array(classOf[org.apache.avro.generic.GenericData],
                 classOf[org.apache.avro.Schema]))
-            var schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-            val lastestSchema = getLatestTableSchema(fs, basePath, 
sparkContext, schema)
+
+            // TODO(HUDI-4472) revisit and simplify schema handling
+            val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
+            val latestTableSchema = getLatestTableSchema(fs, basePath, 
sparkContext).getOrElse(sourceSchema)
+
+            val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
             var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
-            if (reconcileSchema && 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
-              && internalSchemaOpt.isEmpty) {
-              // force apply full schema evolution.
-              internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(schema))
-            }
-            if (reconcileSchema) {
-              schema = lastestSchema
-            }
-            if (internalSchemaOpt.isDefined) {
-              // Apply schema evolution.
-              val mergedSparkSchema = if (!reconcileSchema) {
-                
AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema,
 lastestSchema))
+
+            val writerSchema: Schema =
+              if (reconcileSchema) {
+                // In case we need to reconcile the schema and schema 
evolution is enabled,
+                // we will force-apply schema evolution to the writer's schema
+                if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
+                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
+                }
+
+                if (internalSchemaOpt.isDefined) {
+                  // Apply schema evolution, by auto-merging write schema and 
read schema
+                  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
+                  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
+                } else if 
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
+                  // In case schema reconciliation is enabled and source and 
latest table schemas
+                  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]], then we will
+                  // pick latest table's schema as the writer's schema
+                  latestTableSchema
+                } else {
+                  // Otherwise fallback to original source's schema
+                  sourceSchema
+                }
               } else {
-                // Auto merge write schema and read schema.
-                val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get)
-                
AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema,
 lastestSchema.getName))
+                // In case reconciliation is disabled, we still have to do 
nullability attributes
+                // (minor) reconciliation, making sure schema of the incoming 
batch is in-line with
+                // the data already committed in the table
+                
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
               }
-              schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema, 
structName, nameSpace)
-            }
 
-            if (reconcileSchema && internalSchemaOpt.isEmpty) {
-              schema = lastestSchema
-            }
-            validateSchemaForHoodieIsDeleted(schema)
-            sparkContext.getConf.registerAvroSchemas(schema)
-            log.info(s"Registered avro schema : ${schema.toString(true)}")
+            validateSchemaForHoodieIsDeleted(writerSchema)
+            sparkContext.getConf.registerAvroSchemas(writerSchema)
+            log.info(s"Registered avro schema : 
${writerSchema.toString(true)}")
 
             // Convert to RDD[HoodieRecord]
             val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
-              org.apache.hudi.common.util.Option.of(schema))
+              org.apache.hudi.common.util.Option.of(writerSchema))
             val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
               operation.equals(WriteOperationType.UPSERT) ||
               
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
@@ -295,10 +304,10 @@ object HoodieSparkSqlWriter {
               hoodieRecord
             }).toJavaRDD()
 
-            val writeSchema = if (dropPartitionColumns) 
generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema
+            val writerDataSchema = if (dropPartitionColumns) 
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else 
writerSchema
             // Create a HoodieWriteClient & issue the write.
 
-            val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, 
writeSchema.toString, path,
+            val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, 
writerDataSchema.toString, path,
               tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, 
internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
 
@@ -388,14 +397,18 @@ object HoodieSparkSqlWriter {
    * @param schema       incoming record's schema.
    * @return Pair of(boolean, table schema), where first entry will be true 
only if schema conversion is required.
    */
-  def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: 
SparkContext, schema: Schema): Schema = {
-    var latestSchema: Schema = schema
+  def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: 
SparkContext): Option[Schema] = {
     if (FSUtils.isTableExists(basePath.toString, fs)) {
-      val tableMetaClient = 
HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build()
+      val tableMetaClient = HoodieTableMetaClient.builder
+        .setConf(sparkContext.hadoopConfiguration)
+        .setBasePath(basePath.toString)
+        .build()
       val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
-      latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null)
+
+      
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
+    } else {
+      None
     }
-    latestSchema
   }
 
   def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext: 
SparkContext, df: Dataset[Row],
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 8328882239..b02881bc3d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -317,8 +317,8 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
   def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): 
Expression = {
     child match {
       case Literal(nul, NullType) => Literal(nul, dataType)
-      case _ => if (child.dataType != dataType)
-        Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child
+      case expr if child.dataType != dataType => Cast(expr, dataType, 
Option(conf.sessionLocalTimeZone))
+      case _ => child
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index be1ad8e9b8..8bd81df3d2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -17,27 +17,39 @@
 
 package org.apache.spark.sql.hudi.command
 
-import org.apache.hudi.HoodieSparkSqlWriter
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, 
NamedExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql._
 
 /**
- * Command for insert into hoodie table.
+ * Command for insert into Hudi table.
+ *
+ * This is correspondent to Spark's native [[InsertIntoStatement]]
+ *
+ * @param logicalRelation the [[LogicalRelation]] representing the table to be 
writing into.
+ * @param query           the logical plan representing data to be written
+ * @param partitionSpec   a map from the partition key to the partition value 
(optional).
+ *                        If the value is missing, dynamic partition insert 
will be performed.
+ *                        As an example, `INSERT INTO tbl PARTITION (a=1, b=2) 
AS` would have
+ *                        Map('a' -> Some('1'), 'b' -> Some('2')),
+ *                        and `INSERT INTO tbl PARTITION (a=1, b) AS ...`
+ *                        would have Map('a' -> Some('1'), 'b' -> None).
+ * @param overwrite       overwrite existing table or partitions.
  */
-case class InsertIntoHoodieTableCommand(
-    logicalRelation: LogicalRelation,
-    query: LogicalPlan,
-    partition: Map[String, Option[String]],
-    overwrite: Boolean)
+case class InsertIntoHoodieTableCommand(logicalRelation: LogicalRelation,
+                                        query: LogicalPlan,
+                                        partitionSpec: Map[String, 
Option[String]],
+                                        overwrite: Boolean)
   extends HoodieLeafRunnableCommand {
   override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
@@ -45,18 +57,19 @@ case class InsertIntoHoodieTableCommand(
     assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")
 
     val table = logicalRelation.catalogTable.get
-    InsertIntoHoodieTableCommand.run(sparkSession, table, query, partition, 
overwrite)
+    InsertIntoHoodieTableCommand.run(sparkSession, table, query, 
partitionSpec, overwrite)
     Seq.empty[Row]
   }
 }
 
-object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {
+object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig 
with SparkAdapterSupport {
+
   /**
    * Run the insert query. We support both dynamic partition insert and static 
partition insert.
    * @param sparkSession The spark session.
    * @param table The insert table.
    * @param query The insert query.
-   * @param insertPartitions The specified insert partition map.
+   * @param partitionSpec The specified insert partition map.
    *                         e.g. "insert into h(dt = '2021') select id, name 
from src"
    *                         "dt" is the key in the map and "2021" is the 
partition value. If the
    *                         partition value has not specified(in the case of 
dynamic partition)
@@ -66,103 +79,139 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig {
    * @param extraOptions Extra options for insert.
    */
   def run(sparkSession: SparkSession,
-      table: CatalogTable,
-      query: LogicalPlan,
-      insertPartitions: Map[String, Option[String]],
-      overwrite: Boolean,
-      refreshTable: Boolean = true,
-      extraOptions: Map[String, String] = Map.empty): Boolean = {
-
-    val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table)
-    val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, 
overwrite, insertPartitions, extraOptions)
-
-    val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) {
-      // insert overwrite non-partition table
+          table: CatalogTable,
+          query: LogicalPlan,
+          partitionSpec: Map[String, Option[String]],
+          overwrite: Boolean,
+          refreshTable: Boolean = true,
+          extraOptions: Map[String, String] = Map.empty): Boolean = {
+    val catalogTable = new HoodieCatalogTable(sparkSession, table)
+    val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
overwrite, partitionSpec, extraOptions)
+
+    // NOTE: In case of partitioned table we override specified "overwrite" 
parameter
+    //       to instead append to the dataset
+    val mode = if (overwrite && catalogTable.partitionFields.isEmpty) {
       SaveMode.Overwrite
     } else {
-      // for insert into or insert overwrite partition we use append mode.
       SaveMode.Append
     }
-    val conf = sparkSession.sessionState.conf
-    val alignedQuery = alignOutputFields(query, hoodieCatalogTable, 
insertPartitions, conf)
-    // If we create dataframe using the Dataset.ofRows(sparkSession, 
alignedQuery),
-    // The nullable attribute of fields will lost.
-    // In order to pass the nullable attribute to the inputDF, we specify the 
schema
-    // of the rdd.
-    val inputDF = sparkSession.createDataFrame(
-      Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema)
-    val success =
-      HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
inputDF)._1
-    if (success) {
-      if (refreshTable) {
-        sparkSession.catalog.refreshTable(table.identifier.unquotedString)
-      }
-      true
-    } else {
-      false
+
+    val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, 
sparkSession.sessionState.conf)
+
+    val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, 
Dataset.ofRows(sparkSession, alignedQuery))
+
+    if (success && refreshTable) {
+      sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
+
+    success
   }
 
   /**
-   * Aligned the type and name of query's output fields with the result 
table's fields.
-   * @param query The insert query which to aligned.
-   * @param hoodieCatalogTable The result hoodie catalog table.
-   * @param insertPartitions The insert partition map.
-   * @param conf The SQLConf.
-   * @return
+   * Align provided [[query]]'s output with the expected [[catalogTable]] 
schema by
+   *
+   * <ul>
+   *   <li>Performing type coercion (casting corresponding outputs, where 
needed)</li>
+   *   <li>Adding aliases (matching column names) to corresponding outputs 
</li>
+   * </ul>
+   *
+   * @param query target query whose output is to be inserted
+   * @param catalogTable catalog table
+   * @param partitionsSpec partition spec specifying static/dynamic partition 
values
+   * @param conf Spark's [[SQLConf]]
    */
-  private def alignOutputFields(
-    query: LogicalPlan,
-    hoodieCatalogTable: HoodieCatalogTable,
-    insertPartitions: Map[String, Option[String]],
-    conf: SQLConf): LogicalPlan = {
-
-    val targetPartitionSchema = hoodieCatalogTable.partitionSchema
-
-    val staticPartitionValues = insertPartitions.filter(p => 
p._2.isDefined).mapValues(_.get)
-    assert(staticPartitionValues.isEmpty ||
-      insertPartitions.size == targetPartitionSchema.size,
-      s"Required partition columns is: ${targetPartitionSchema.json}, Current 
input partitions " +
-        s"is: ${staticPartitionValues.mkString("," + "")}")
-
-    val queryOutputWithoutMetaFields = removeMetaFields(query.output)
-    assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size
-      == hoodieCatalogTable.tableSchemaWithoutMetaFields.size,
-      s"Required select columns count: 
${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " +
-        s"Current select columns(including static partition column) count: " +
-        s"${staticPartitionValues.size + 
queryOutputWithoutMetaFields.size},columns: " +
-        s"(${(queryOutputWithoutMetaFields.map(_.name) ++ 
staticPartitionValues.keys).mkString(",")})")
-
-    val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType(
-      hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name)))
-    val dataProjectsWithoutMetaFields = 
getTableFieldsAlias(queryOutputWithoutMetaFields,
-      dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf)
-
-    val partitionProjects = targetPartitionSchema.fields.filter(f => 
staticPartitionValues.contains(f.name))
-      .map(f => {
-        val staticPartitionValue = staticPartitionValues.getOrElse(f.name,
-          s"Missing static partition value for: ${f.name}")
-        val castAttr = castIfNeeded(Literal.create(staticPartitionValue), 
f.dataType, conf)
-        Alias(castAttr, f.name)()
-      })
+  private def alignQueryOutput(query: LogicalPlan,
+                               catalogTable: HoodieCatalogTable,
+                               partitionsSpec: Map[String, Option[String]],
+                               conf: SQLConf): LogicalPlan = {
+
+    val targetPartitionSchema = catalogTable.partitionSchema
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+
+    validate(removeMetaFields(query.schema), partitionsSpec, catalogTable)
+    // Make sure we strip out meta-fields from the incoming dataset (these 
will have to be discarded anyway)
+    val cleanedQuery = stripMetaFields(query)
+    // To validate and align properly output of the query, we simply filter 
out partition columns with already
+    // provided static values from the table's schema
+    //
+    // NOTE: This is a crucial step: since coercion might rely on either of a) 
name-based or b) positional-based
+    //       matching it's important to strip out partition columns, having 
static values provided in the partition spec,
+    //       since such columns wouldn't be otherwise specified w/in the query 
itself and therefore couldn't be matched
+    //       positionally for example
+    val expectedQueryColumns = 
catalogTable.tableSchemaWithoutMetaFields.filterNot(f => 
staticPartitionValues.contains(f.name))
+    val coercedQueryOutput = 
coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, 
catalogTable, conf)
+
+    val staticPartitionValuesExprs = 
createStaticPartitionValuesExpressions(staticPartitionValues, 
targetPartitionSchema, conf)
+
+    Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, 
coercedQueryOutput)
+  }
+
+  private def coerceQueryOutputColumns(expectedSchema: StructType,
+                                       query: LogicalPlan,
+                                       catalogTable: HoodieCatalogTable,
+                                       conf: SQLConf): LogicalPlan = {
+    val planUtils = sparkAdapter.getCatalystPlanUtils
+    try {
+      planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = true, conf)
+    } catch {
+      // NOTE: In case matching by name didn't match the query output, we will 
attempt positional matching
+      case ae: AnalysisException if ae.getMessage().startsWith("Cannot write 
incompatible data to table") =>
+        planUtils.resolveOutputColumns(catalogTable.catalogTableName, 
expectedSchema.toAttributes, query, byName = false, conf)
+    }
+  }
 
-    Project(dataProjectsWithoutMetaFields ++ partitionProjects, query)
+  private def validate(queryOutputSchema: StructType, partitionsSpec: 
Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = {
+    // Validate that partition-spec has proper format (it could be empty if 
all of the partition values are dynamic,
+    // ie there are no static partition-values specified)
+    if (partitionsSpec.nonEmpty && partitionsSpec.size != 
catalogTable.partitionSchema.size) {
+      throw new HoodieException(s"Required partition schema is: 
${catalogTable.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " +
+        s"partition spec is: ${partitionsSpec.mkString("[", ", ", "]")}")
+    }
+
+    val staticPartitionValues = filterStaticPartitionValues(partitionsSpec)
+    val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++ 
staticPartitionValues.keys.map(StructField(_, StringType)))
+
+    // Assert that query provides all the required columns
+    if (!conforms(fullQueryOutputSchema, 
catalogTable.tableSchemaWithoutMetaFields)) {
+      throw new HoodieException(s"Expected table's schema: 
${catalogTable.tableSchemaWithoutMetaFields.fields.mkString("[", ", ", "]")}, " 
+
+        s"query's output (including static partition values): 
${fullQueryOutputSchema.fields.mkString("[", ", ", "]")}")
+    }
+  }
+
+  private def createStaticPartitionValuesExpressions(staticPartitionValues: 
Map[String, String],
+                                                     partitionSchema: 
StructType,
+                                                     conf: SQLConf): 
Seq[NamedExpression] = {
+    partitionSchema.fields
+      .filter(pf => staticPartitionValues.contains(pf.name))
+      .map(pf => {
+        val staticPartitionValue = staticPartitionValues(pf.name)
+        val castExpr = castIfNeeded(Literal.create(staticPartitionValue), 
pf.dataType, conf)
+
+        Alias(castExpr, pf.name)()
+      })
   }
 
-  private def getTableFieldsAlias(
-     queryOutputWithoutMetaFields: Seq[Attribute],
-     schemaWithoutMetaFields: Seq[StructField],
-     conf: SQLConf): Seq[Alias] = {
-    queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case 
(dataAttr, dataField) =>
-      val targetAttrOption = if (dataAttr.name.startsWith("col")) {
-        None
-      } else {
-        queryOutputWithoutMetaFields.find(_.name.equals(dataField.name))
+  private def conforms(sourceSchema: StructType, targetSchema: StructType): 
Boolean = {
+    if (sourceSchema.fields.length != targetSchema.fields.length) {
+      false
+    } else {
+      targetSchema.fields.zip(sourceSchema).forall {
+        case (targetColumn, sourceColumn) =>
+          // Make sure we can cast source column to the target column type
+          Cast.canCast(sourceColumn.dataType, targetColumn.dataType)
       }
-      val targetAttr = targetAttrOption.getOrElse(dataAttr)
-      val castAttr = 
castIfNeeded(targetAttr.withNullability(dataField.nullable),
-        dataField.dataType, conf)
-      Alias(castAttr, dataField.name)()
     }
   }
+
+  def stripMetaFields(query: LogicalPlan): LogicalPlan = {
+    val filteredOutput = query.output.filterNot(attr => isMetaField(attr.name))
+    if (filteredOutput == query.output) {
+      query
+    } else {
+      Project(filteredOutput, query)
+    }
+  }
+
+  private def filterStaticPartitionValues(partitionsSpec: Map[String, 
Option[String]]): Map[String, String] =
+    partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get)
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index 5e2afd7490..e7848320ff 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -145,11 +145,23 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     assertResult(true)(hasException)
   }
 
+  def dropTypeLiteralPrefix(value: Any): Any = {
+    value match {
+      case s: String =>
+        s.stripPrefix("DATE").stripPrefix("TIMESTAMP").stripPrefix("X")
+      case _ => value
+    }
+  }
 
-  protected def removeQuotes(value: Any): Any = {
+  protected def extractRawValue(value: Any): Any = {
     value match {
-      case s: String => s.stripPrefix("'").stripSuffix("'")
-      case _=> value
+      case s: String =>
+        // We need to strip out data-type prefixes like "DATE", "TIMESTAMP"
+        dropTypeLiteralPrefix(s)
+          .asInstanceOf[String]
+          .stripPrefix("'")
+          .stripSuffix("'")
+      case _ => value
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
index 4c7c626966..3ab52a0bac 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkUtils.isSpark2
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.keygen.SimpleKeyGenerator
 import org.apache.spark.sql.SaveMode
@@ -93,11 +94,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
           """.stripMargin)
 
         // insert data to table
-        spark.sql(
-          s"""
-             |insert into $tableName
-             |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 
30.0, 1000)
-          """.stripMargin)
+        if (isSpark2) {
+          spark.sql(
+            s"""
+               |insert into $tableName
+               |values (1, 'a1', cast(10.0 as double), 1000), (2, 'a2', 
cast(20.0 as double), 1000), (3, 'a2', cast(30.0 as double), 1000)
+               |""".stripMargin)
+        } else {
+          spark.sql(
+            s"""
+               |insert into $tableName
+               |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 
30.0, 1000)
+               |""".stripMargin)
+        }
+
         checkAnswer(s"select id, name, price, ts from $tableName")(
           Seq(1, "a1", 10.0, 1000),
           Seq(2, "a2", 20.0, 1000),
@@ -132,11 +142,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase {
           """.stripMargin)
 
         // insert data to table
-        spark.sql(
-          s"""
-             |insert into $ptTableName
-             |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, 
"2021"), (3, 'a2', 30.0, 1000, "2022")
-          """.stripMargin)
+        if (isSpark2) {
+          spark.sql(
+            s"""
+               |insert into $ptTableName
+               |values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2, 
'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double), 
1000, "2022")
+               |""".stripMargin)
+        } else {
+          spark.sql(
+            s"""
+               |insert into $ptTableName
+               |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, 
"2021"), (3, 'a2', 30.0, 1000, "2022")
+               |""".stripMargin)
+        }
+
         checkAnswer(s"select id, name, price, ts, pt from $ptTableName")(
           Seq(1, "a1", 10.0, 1000, "2021"),
           Seq(2, "a2", 20.0, 1000, "2021"),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 8d21fe32ea..ced6fef72d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.internal.SQLConf
 
 import java.io.File
 
@@ -396,8 +397,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
         ("string", "'1000'"),
         ("int", 1000),
         ("bigint", 10000),
-        ("timestamp", "'2021-05-20 00:00:00'"),
-        ("date", "'2021-05-20'")
+        ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"),
+        ("date", "DATE'2021-05-20'")
       )
       typeAndValue.foreach { case (partitionType, partitionValue) =>
         val tableName = generateTableName
@@ -409,8 +410,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   test("Test TimestampType Partition Column With Consistent Logical Timestamp 
Enabled") {
     withTempDir { tmp =>
       val typeAndValue = Seq(
-        ("timestamp", "'2021-05-20 00:00:00'"),
-        ("date", "'2021-05-20'")
+        ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"),
+        ("date", "DATE'2021-05-20'")
       )
       typeAndValue.foreach { case (partitionType, partitionValue) =>
         val tableName = generateTableName
@@ -433,11 +434,12 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
          | partitioned by (dt)
          | location '${tmp.getCanonicalPath}/$tableName'
        """.stripMargin)
-    spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 
1, 'a1', 10")
+    // NOTE: We have to drop type-literal prefix since Spark doesn't parse 
type literals appropriately
+    spark.sql(s"insert into $tableName partition(dt = 
${dropTypeLiteralPrefix(partitionValue)}) select 1, 'a1', 10")
     spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue")
     checkAnswer(s"select id, name, price, cast(dt as string) from $tableName 
order by id")(
-      Seq(1, "a1", 10, removeQuotes(partitionValue).toString),
-      Seq(2, "a2", 10, removeQuotes(partitionValue).toString)
+      Seq(1, "a1", 10, extractRawValue(partitionValue).toString),
+      Seq(2, "a2", 10, extractRawValue(partitionValue).toString)
     )
   }
 
@@ -481,14 +483,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
          | tblproperties (primaryKey = 'id')
          | partitioned by (dt)
        """.stripMargin)
-    checkException(s"insert into $tableName partition(dt = '2021-06-20')" +
-      s" select 1, 'a1', 10, '2021-06-20'") (
-      "assertion failed: Required select columns count: 4, Current select 
columns(including static partition column)" +
-        " count: 5,columns: (1,a1,10,2021-06-20,dt)"
+    checkException(s"insert into $tableName partition(dt = '2021-06-20') 
select 1, 'a1', 10, '2021-06-20'") (
+      "Expected table's schema: " +
+        "[StructField(id,IntegerType,true), StructField(name,StringType,true), 
StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
+        "query's output (including static partition values): " +
+        "[StructField(1,IntegerType,false), StructField(a1,StringType,false), 
StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false), 
StructField(dt,StringType,true)]"
     )
     checkException(s"insert into $tableName select 1, 'a1', 10")(
-      "assertion failed: Required select columns count: 4, Current select 
columns(including static partition column)" +
-        " count: 3,columns: (1,a1,10)"
+      "Expected table's schema: " +
+        "[StructField(id,IntegerType,true), StructField(name,StringType,true), 
StructField(price,DoubleType,true), StructField(dt,StringType,true)], " +
+        "query's output (including static partition values): " +
+        "[StructField(1,IntegerType,false), StructField(a1,StringType,false), 
StructField(10,IntegerType,false)]"
     )
     spark.sql("set hoodie.sql.bulk.insert.enable = true")
     spark.sql("set hoodie.sql.insert.mode = strict")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index ac11f83d53..58c808d28a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -908,7 +908,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase {
              | when not matched then insert *
              |""".stripMargin)
         checkAnswer(s"select id, name, cast(value as string), ts from 
$tableName")(
-          Seq(1, "a1", removeQuotes(dataValue), 1000)
+          Seq(1, "a1", extractRawValue(dataValue), 1000)
         )
       }
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
index 005d5fed71..59ee642861 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.HoodieSparkUtils.isSpark2
 import 
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
 
 class TestShowPartitions extends HoodieSparkSqlTestBase {
@@ -84,11 +85,22 @@ class TestShowPartitions extends HoodieSparkSqlTestBase {
     checkAnswer(s"show partitions $tableName 
partition(dt='2021-01-02')")(Seq("dt=2021-01-02"))
 
     // Insert into null partition
-    spark.sql(
-      s"""
-         | insert into $tableName
-         | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt
+    if (isSpark2) {
+      // Spark 2 isn't able to convert NullType to any other type w/ 
appropriate nullability, so
+      // explicit cast is required
+      spark.sql(
+        s"""
+           | insert into $tableName
+           | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, cast(null 
as string) as dt
         """.stripMargin)
+    } else {
+      spark.sql(
+        s"""
+           | insert into $tableName
+           | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt
+        """.stripMargin)
+    }
+
     checkAnswer(s"show partitions $tableName")(
       Seq("dt=2021-01-01"), Seq("dt=2021-01-02"), 
Seq("dt=%s".format(DEFAULT_PARTITION_PATH))
     )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index b64d386f1f..65357b903b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -55,11 +55,11 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
     spark.sql(
       s"""
          | insert into $tableName values
-         | 
(1,1,11,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25 
12:01:01',true,'a01','2021-12-25'),
-         | 
(2,2,12,100002,102.02,1002.0002,100002.0002,'a000002','2021-12-25','2021-12-25 
12:02:02',true,'a02','2021-12-25'),
-         | 
(3,3,13,100003,103.03,1003.0003,100003.0003,'a000003','2021-12-25','2021-12-25 
12:03:03',false,'a03','2021-12-25'),
-         | 
(4,4,14,100004,104.04,1004.0004,100004.0004,'a000004','2021-12-26','2021-12-26 
12:04:04',true,'a04','2021-12-26'),
-         | 
(5,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26 
12:05:05',false,'a05','2021-12-26')
+         | 
(1,1,11,100001,101.01,1001.0001,100001.0001,'a000001',DATE'2021-12-25',TIMESTAMP'2021-12-25
 12:01:01',true,X'a01',TIMESTAMP'2021-12-25'),
+         | 
(2,2,12,100002,102.02,1002.0002,100002.0002,'a000002',DATE'2021-12-25',TIMESTAMP'2021-12-25
 12:02:02',true,X'a02',TIMESTAMP'2021-12-25'),
+         | 
(3,3,13,100003,103.03,1003.0003,100003.0003,'a000003',DATE'2021-12-25',TIMESTAMP'2021-12-25
 12:03:03',false,X'a03',TIMESTAMP'2021-12-25'),
+         | 
(4,4,14,100004,104.04,1004.0004,100004.0004,'a000004',DATE'2021-12-26',TIMESTAMP'2021-12-26
 12:04:04',true,X'a04',TIMESTAMP'2021-12-26'),
+         | 
(5,5,15,100005,105.05,1005.0005,100005.0005,'a000005',DATE'2021-12-26',TIMESTAMP'2021-12-26
 12:05:05',false,X'a05',TIMESTAMP'2021-12-26')
          |""".stripMargin)
   }
 
@@ -70,6 +70,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
         if (HoodieSparkUtils.gteqSpark3_1) {
           spark.sql("set hoodie.schema.on.read.enable=true")
+          // NOTE: This is required since as this tests use type coercions 
which were only permitted in Spark 2.x
+          //       and are disallowed now by default in Spark 3.x
+          spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
           createAndPreparePartitionTable(spark, tableName, tablePath, 
tableType)
           // date -> string -> date
           spark.sql(s"alter table $tableName alter column col6 type String")
@@ -138,6 +141,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
         if (HoodieSparkUtils.gteqSpark3_1) {
           spark.sql("set hoodie.schema.on.read.enable=true")
+          // NOTE: This is required since as this tests use type coercions 
which were only permitted in Spark 2.x
+          //       and are disallowed now by default in Spark 3.x
+          spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
           createAndPreparePartitionTable(spark, tableName, tablePath, 
tableType)
           // float -> double -> decimal -> String
           spark.sql(s"alter table $tableName alter column col2 type double")
@@ -172,6 +178,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
         val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
         if (HoodieSparkUtils.gteqSpark3_1) {
           spark.sql("set hoodie.schema.on.read.enable=true")
+          // NOTE: This is required since as this tests use type coercions 
which were only permitted in Spark 2.x
+          //       and are disallowed now by default in Spark 3.x
+          spark.sql("set spark.sql.storeAssignmentPolicy=legacy")
           createAndPreparePartitionTable(spark, tableName, tablePath, 
tableType)
 
           // test set properties
@@ -402,7 +411,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
 
           spark.sql(s"alter table $tableName alter column members.value.a 
first")
 
-          spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1', 
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStruct', 29, 100), 
1000)")
+          spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1', 
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStruct', 29, 100), 
1000)")
 
           // rename column
           spark.sql(s"alter table ${tableName} rename column user to userx")
@@ -424,7 +433,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           checkAnswer(spark.sql(s"select name, userx.name, userx.score from 
${tableName}").collect())(Seq(null, null, null))
 
           // insert again
-          spark.sql(s"insert into ${tableName} values(2 , map('k1', 
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 
101), 'jacknew', 1000)")
+          spark.sql(s"insert into ${tableName} values(2 , map('k1', 
struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 
101), 'jacknew', 1000)")
 
           // check again
           checkAnswer(spark.sql(s"select name, userx.name as uxname, 
userx.score as uxs from ${tableName} order by id").collect())(
@@ -440,9 +449,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
             Seq(291, 2, "jacknew"))
           // test map value type change
           spark.sql(s"alter table ${tableName} add columns(mxp map<String, 
int>)")
-          spark.sql(s"insert into ${tableName} values(2 , map('k1', 
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 
101), 'jacknew', 1000, map('t1', 9))")
+          spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 
1000, map('t1', 9))")
           spark.sql(s"alter table ${tableName} alter column mxp.value type 
double")
-          spark.sql(s"insert into ${tableName} values(2 , map('k1', 
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 
101), 'jacknew', 1000, map('t1', 10))")
+          spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 
1000, map('t1', 10))")
           spark.sql(s"select * from $tableName").show(false)
           checkAnswer(spark.sql(s"select mxp from ${tableName} order by 
id").collect())(
             Seq(null),
@@ -453,7 +462,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
           spark.sql(s"alter table ${tableName} rename column userx to us")
           spark.sql(s"alter table ${tableName} rename column us.age to age1")
 
-          spark.sql(s"insert into ${tableName} values(2 , map('k1', 
struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 
101), 'jacknew', 1000, map('t1', 10))")
+          spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 
'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 
1000, map('t1', 10))")
           spark.sql(s"select mem.value.nn, us.age1 from $tableName order by 
id").show()
           checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName 
order by id").collect())(
             Seq(null, 29),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
index 8c709ab37a..2d8d6ceca7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.HoodieSparkUtils.isSpark2
+
 class TestUpdateTable extends HoodieSparkSqlTestBase {
 
   test("Test Update Table") {
@@ -84,7 +86,12 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
        """.stripMargin)
 
         // insert data to table
-        spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 
'a2', 20.0, 1000)")
+        if (isSpark2) {
+          spark.sql(s"insert into $tableName values (1, 'a1', cast(10.0 as 
double), 1000), (2, 'a2', cast(20.0 as double), 1000)")
+        } else {
+          spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 
'a2', 20.0, 1000)")
+        }
+
         checkAnswer(s"select id, name, price, ts from $tableName")(
           Seq(1, "a1", 10.0, 1000),
           Seq(2, "a2", 20.0, 1000)
@@ -119,11 +126,20 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
           """.stripMargin)
 
         // insert data to table
-        spark.sql(
-          s"""
-             |insert into $ptTableName
-             |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, 
"2021"), (3, 'a2', 30.0, 1000, "2022")
-          """.stripMargin)
+        if (isSpark2) {
+          spark.sql(
+            s"""
+               |insert into $ptTableName
+               |values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2, 
'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double), 
1000, "2022")
+               |""".stripMargin)
+        } else {
+          spark.sql(
+            s"""
+               |insert into $ptTableName
+               |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, 
"2021"), (3, 'a2', 30.0, 1000, "2022")
+               |""".stripMargin)
+        }
+
         checkAnswer(s"select id, name, price, ts, pt from $ptTableName")(
           Seq(1, "a1", 10.0, 1000, "2021"),
           Seq(2, "a2", 20.0, 1000, "2021"),
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 2797b8caa1..cf54504d0d 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -18,14 +18,22 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
+import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, 
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, 
LogicalPlan}
 import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.internal.SQLConf
 
 object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
 
+  def resolveOutputColumns(tableName: String,
+                           expected: Seq[Attribute],
+                           query: LogicalPlan,
+                           byName: Boolean,
+                           conf: SQLConf): LogicalPlan =
+    SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, 
expected, query, byName)
+
   def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
     ExplainCommand(plan, extended = extended)
 
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
index 0cdf5782c0..abece34dea 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala
@@ -18,17 +18,25 @@
 package org.apache.spark.sql
 
 import org.apache.hudi.spark3.internal.ReflectUtil
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.{Expression, Like}
+import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver, 
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, 
JoinHint, LogicalPlan}
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode}
+import org.apache.spark.sql.internal.SQLConf
 
 abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils {
 
+  def resolveOutputColumns(tableName: String,
+                           expected: Seq[Attribute],
+                           query: LogicalPlan,
+                           byName: Boolean,
+                           conf: SQLConf): LogicalPlan =
+    TableOutputResolver.resolveOutputColumns(tableName, expected, query, 
byName, conf)
+
   def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan =
     ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode)
 

Reply via email to