This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d51921d5d2b [HUDI-8898] Support INSERT SQL statement with a subset of 
columns in Spark 3.5 (#12692)
d51921d5d2b is described below

commit d51921d5d2b3145a1deda19a1f7242e6d513e86c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Jan 24 18:08:41 2025 -0800

    [HUDI-8898] Support INSERT SQL statement with a subset of columns in Spark 
3.5 (#12692)
---
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |   5 +
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 133 ++++++++++++++++++
 .../sql/hudi/analysis/HoodieSpark35Analysis.scala  | 151 ++++++++++++++++++++-
 3 files changed, 286 insertions(+), 3 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 69ce7ceb130..b4eaef47914 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -75,6 +75,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
     // leading to all relations resolving as V2 instead of current expectation 
of them being resolved as V1)
     rules ++= Seq(dataSourceV2ToV1Fallback, spark3ResolveReferences)
 
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      rules += (_ => instantiateKlass(
+        
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))
+    }
+
     val resolveAlterTableCommandsClass =
       if (HoodieSparkUtils.gteqSpark3_5) {
         "org.apache.spark.sql.hudi.Spark35ResolveHudiAlterTableCommand"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 52a9e4f20e2..3942af2b145 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -43,6 +43,139 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 class TestInsertTable extends HoodieSparkSqlTestBase {
 
+  test("Test Insert Into with subset of columns") {
+    // This is only supported by Spark 3.5
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      Seq("cow", "mor").foreach(tableType =>
+        Seq(true, false).foreach(isPartitioned => withTempDir { tmp =>
+          testInsertIntoWithSubsetOfColumns(
+            "hudi", tableType, s"${tmp.getCanonicalPath}/hudi_table", 
isPartitioned)
+        }))
+    }
+  }
+
+  test("Test Insert Into with subset of columns on Parquet table") {
+    // This is only supported by Spark 3.5
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      // Make sure parquet tables are not affected by the custom rules for
+      // INSERT INTO statements on Hudi tables
+      Seq(true, false).foreach(isPartitioned => withTempDir { tmp =>
+        testInsertIntoWithSubsetOfColumns(
+          "parquet", "", s"${tmp.getCanonicalPath}/parquet_table", 
isPartitioned)
+      })
+    }
+  }
+
+  private def testInsertIntoWithSubsetOfColumns(format: String,
+                                                tableType: String,
+                                                tablePath: String,
+                                                isPartitioned: Boolean): Unit 
= {
+    val tableName = generateTableName
+    val createTablePartitionClause = if (isPartitioned) "partitioned by (dt)" 
else ""
+    // Create a partitioned table
+    spark.sql(
+      s"""
+         |create table $tableName (
+         |  id int,
+         |  dt string,
+         |  name string,
+         |  price double,
+         |  ts long
+         |) using $format
+         | tblproperties (
+         | type = '$tableType',
+         | primaryKey = 'id'
+         | )
+         | $createTablePartitionClause
+         | location '$tablePath'
+       """.stripMargin)
+
+    // INSERT INTO with all columns
+    // Same ordering of columns as the schema
+    spark.sql(
+      s"""
+         | insert into $tableName (id, name, price, ts, dt)
+         | values (1, 'a1', 10, 1000, '2025-01-01'),
+         | (2, 'a2', 20, 2000, '2025-01-02')
+        """.stripMargin)
+    checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+      Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+      Seq(2, "a2", 20.0, 2000, "2025-01-02")
+    )
+
+    // Different ordering of columns compared to the schema
+    spark.sql(
+      s"""
+         | insert into $tableName (dt, name, id, price, ts)
+         | values ('2025-01-03', 'a3', 3, 30, 3000)
+        """.stripMargin)
+    checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+      Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+      Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+      Seq(3, "a3", 30.0, 3000, "2025-01-03")
+    )
+
+    // INSERT INTO with a subset of columns
+    // Using different ordering of subset of columns in user-specified columns,
+    // and VALUES without column names
+    spark.sql(
+      s"""
+         | insert into $tableName (dt, ts, name, id)
+         | values ('2025-01-04', 4000, 'a4', 4)
+        """.stripMargin)
+    checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+      Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+      Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+      Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+      Seq(4, "a4", null, 4000, "2025-01-04")
+    )
+
+    spark.sql(
+      s"""
+         | insert into $tableName (id, price, ts, dt)
+         | values (5, 50.0, 5000, '2025-01-05')
+        """.stripMargin)
+    checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+      Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+      Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+      Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+      Seq(4, "a4", null, 4000, "2025-01-04"),
+      Seq(5, null, 50.0, 5000, "2025-01-05")
+    )
+
+    // Using a subset of columns in user-specified columns, and VALUES with 
column names
+    spark.sql(
+      s"""
+         | insert into $tableName (dt, ts, id, name)
+         | values ('2025-01-06' as dt, 6000 as ts, 6 as id, 'a6' as name)
+        """.stripMargin)
+    checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+      Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+      Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+      Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+      Seq(4, "a4", null, 4000, "2025-01-04"),
+      Seq(5, null, 50.0, 5000, "2025-01-05"),
+      Seq(6, "a6", null, 6000, "2025-01-06")
+    )
+
+    if (isPartitioned) {
+      spark.sql(
+        s"""
+           | insert into $tableName partition(dt='2025-01-07') (ts, id, name)
+           | values (7000, 7, 'a7')
+        """.stripMargin)
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+        Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+        Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+        Seq(4, "a4", null, 4000, "2025-01-04"),
+        Seq(5, null, 50.0, 5000, "2025-01-05"),
+        Seq(6, "a6", null, 6000, "2025-01-06"),
+        Seq(7, "a7", null, 7000, "2025-01-07")
+      )
+    }
+  }
+
   test("Test table type name incase-sensitive test") {
     withTempDir { tmp =>
       val targetTable = generateTableName
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
index f137c9dea6c..3e5e0028cf9 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
@@ -17,15 +17,23 @@
 
 package org.apache.spark.sql.hudi.analysis
 
-import org.apache.hudi.DefaultSource
+import org.apache.hudi.{DefaultSource, EmptyRelation, HoodieBaseRelation}
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
 
+import org.apache.spark.sql.{AnalysisException, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.{ResolveInsertionBase, 
TableOutputResolver}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation, PreprocessTableInsertion}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
 import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
 
 /**
  * NOTE: PLEASE READ CAREFULLY
@@ -64,3 +72,140 @@ case class 
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
     LogicalRelation(relation, output, catalogTable, isStreaming = false)
   }
 }
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to 
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for 
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after 
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified 
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns 
specified by user
+ * to work, this custom resolution rule 
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * The implementation is copied and adapted from [[PreprocessTableInsertion]]
+ * 
https://github.com/apache/spark/blob/d061aadf25fd258d2d3e7332a489c9c24a2b5530/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L373
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for 
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends 
ResolveInsertionBase {
+  // NOTE: This is copied from [[PreprocessTableInsertion]] with additional 
handling of Hudi relations
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case i@InsertIntoStatement(table, _, _, query, _, _, _)
+        if table.resolved && query.resolved
+          && i.userSpecifiedCols.nonEmpty && 
i.table.isInstanceOf[LogicalRelation]
+          && 
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
 =>
+        table match {
+          case relation: HiveTableRelation =>
+            val metadata = relation.tableMeta
+            preprocess(i, metadata.identifier.quotedString, 
metadata.partitionSchema,
+              Some(metadata))
+          case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) =>
+            preprocess(i, catalogTable, h.partitionSchema)
+          case LogicalRelation(_: InsertableRelation, _, catalogTable, _) =>
+            preprocess(i, catalogTable, new StructType())
+          // The two conditions below are adapted to Hudi relations
+          case LogicalRelation(_: EmptyRelation, _, catalogTable, _) =>
+            preprocess(i, catalogTable)
+          case LogicalRelation(_: HoodieBaseRelation, _, catalogTable, _) =>
+            preprocess(i, catalogTable)
+          case _ => i
+        }
+      case _ => plan
+    }
+  }
+
+  private def preprocess(insert: InsertIntoStatement,
+                         catalogTable: Option[CatalogTable]): 
InsertIntoStatement = {
+    preprocess(insert, catalogTable, 
catalogTable.map(_.partitionSchema).getOrElse(new StructType()))
+  }
+
+  private def preprocess(insert: InsertIntoStatement,
+                         catalogTable: Option[CatalogTable],
+                         partitionSchema: StructType): InsertIntoStatement = {
+    val tblName = 
catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
+    preprocess(insert, tblName, partitionSchema, catalogTable)
+  }
+
+  // NOTE: this is copied from [[PreprocessTableInsertion]] with additional 
logic
+  // to unset user-specified columns at the end
+  private def preprocess(insert: InsertIntoStatement,
+                         tblName: String,
+                         partColNames: StructType,
+                         catalogTable: Option[CatalogTable]): 
InsertIntoStatement = {
+
+    val normalizedPartSpec = normalizePartitionSpec(
+      insert.partitionSpec, partColNames, tblName, conf.resolver)
+
+    val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
+    val expectedColumns = insert.table.output.filterNot(a => 
staticPartCols.contains(a.name))
+
+    val partitionsTrackedByCatalog = catalogTable.isDefined &&
+      catalogTable.get.partitionColumnNames.nonEmpty &&
+      catalogTable.get.tracksPartitionsInCatalog
+    if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
+      // empty partition column value
+      if (normalizedPartSpec.values.flatten.exists(v => v != null && 
v.isEmpty)) {
+        val spec = normalizedPartSpec.map(p => p._1 + "=" + 
p._2).mkString("[", ", ", "]")
+        throw QueryCompilationErrors.invalidPartitionSpecError(
+          s"The spec ($spec) contains an empty partition column value")
+      }
+    }
+
+    // Create a project if this INSERT has a user-specified column list.
+    val hasColumnList = insert.userSpecifiedCols.nonEmpty
+    val query = if (hasColumnList) {
+      createProjectForByNameQuery(tblName, insert)
+    } else {
+      insert.query
+    }
+    val newQuery = try {
+      TableOutputResolver.resolveOutputColumns(
+        tblName,
+        expectedColumns,
+        query,
+        byName = hasColumnList || insert.byName,
+        conf,
+        supportColDefaultValue = true)
+    } catch {
+      case e: AnalysisException if staticPartCols.nonEmpty &&
+        (e.getErrorClass == 
"INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" ||
+          e.getErrorClass == 
"INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS") =>
+        val newException = e.copy(
+          errorClass = Some("INSERT_PARTITION_COLUMN_ARITY_MISMATCH"),
+          messageParameters = e.messageParameters ++ Map(
+            "tableColumns" -> insert.table.output.map(c => 
toSQLId(c.name)).mkString(", "),
+            "staticPartCols" -> staticPartCols.toSeq.sorted.map(c => 
toSQLId(c)).mkString(", ")
+          ))
+        newException.setStackTrace(e.getStackTrace)
+        throw newException
+    }
+    if (normalizedPartSpec.nonEmpty) {
+      if (normalizedPartSpec.size != partColNames.length) {
+        throw 
QueryCompilationErrors.requestedPartitionsMismatchTablePartitionsError(
+          tblName, normalizedPartSpec, partColNames)
+      }
+
+      // NOTE: Hudi converts [[InsertIntoStatement]] to 
[[InsertIntoHoodieTableCommand]]
+      // and the user specified is no longer need after resolution
+      // (`userSpecifiedCols = Seq()`)
+      insert.copy(query = newQuery, partitionSpec = normalizedPartSpec, 
userSpecifiedCols = Seq())
+    } else {
+      // All partition columns are dynamic because the InsertIntoTable command 
does
+      // not explicitly specify partitioning columns.
+      // NOTE: Hudi converts [[InsertIntoStatement]] to 
[[InsertIntoHoodieTableCommand]]
+      // and the user specified is no longer need after resolution
+      // (`userSpecifiedCols = Seq()`)
+      insert.copy(query = newQuery, partitionSpec = 
partColNames.map(_.name).map(_ -> None).toMap,
+        userSpecifiedCols = Seq())
+    }
+  }
+}

Reply via email to