This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d51921d5d2b [HUDI-8898] Support INSERT SQL statement with a subset of
columns in Spark 3.5 (#12692)
d51921d5d2b is described below
commit d51921d5d2b3145a1deda19a1f7242e6d513e86c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Jan 24 18:08:41 2025 -0800
[HUDI-8898] Support INSERT SQL statement with a subset of columns in Spark
3.5 (#12692)
---
.../spark/sql/hudi/analysis/HoodieAnalysis.scala | 5 +
.../spark/sql/hudi/dml/TestInsertTable.scala | 133 ++++++++++++++++++
.../sql/hudi/analysis/HoodieSpark35Analysis.scala | 151 ++++++++++++++++++++-
3 files changed, 286 insertions(+), 3 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index 69ce7ceb130..b4eaef47914 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -75,6 +75,11 @@ object HoodieAnalysis extends SparkAdapterSupport {
// leading to all relations resolving as V2 instead of current expectation
of them being resolved as V1)
rules ++= Seq(dataSourceV2ToV1Fallback, spark3ResolveReferences)
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ rules += (_ => instantiateKlass(
+
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))
+ }
+
val resolveAlterTableCommandsClass =
if (HoodieSparkUtils.gteqSpark3_5) {
"org.apache.spark.sql.hudi.Spark35ResolveHudiAlterTableCommand"
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 52a9e4f20e2..3942af2b145 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -43,6 +43,139 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
class TestInsertTable extends HoodieSparkSqlTestBase {
+ test("Test Insert Into with subset of columns") {
+ // This is only supported by Spark 3.5
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ Seq("cow", "mor").foreach(tableType =>
+ Seq(true, false).foreach(isPartitioned => withTempDir { tmp =>
+ testInsertIntoWithSubsetOfColumns(
+ "hudi", tableType, s"${tmp.getCanonicalPath}/hudi_table",
isPartitioned)
+ }))
+ }
+ }
+
+ test("Test Insert Into with subset of columns on Parquet table") {
+ // This is only supported by Spark 3.5
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ // Make sure parquet tables are not affected by the custom rules for
+ // INSERT INTO statements on Hudi tables
+ Seq(true, false).foreach(isPartitioned => withTempDir { tmp =>
+ testInsertIntoWithSubsetOfColumns(
+ "parquet", "", s"${tmp.getCanonicalPath}/parquet_table",
isPartitioned)
+ })
+ }
+ }
+
+ private def testInsertIntoWithSubsetOfColumns(format: String,
+ tableType: String,
+ tablePath: String,
+ isPartitioned: Boolean): Unit
= {
+ val tableName = generateTableName
+ val createTablePartitionClause = if (isPartitioned) "partitioned by (dt)"
else ""
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using $format
+ | tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id'
+ | )
+ | $createTablePartitionClause
+ | location '$tablePath'
+ """.stripMargin)
+
+ // INSERT INTO with all columns
+ // Same ordering of columns as the schema
+ spark.sql(
+ s"""
+ | insert into $tableName (id, name, price, ts, dt)
+ | values (1, 'a1', 10, 1000, '2025-01-01'),
+ | (2, 'a2', 20, 2000, '2025-01-02')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02")
+ )
+
+ // Different ordering of columns compared to the schema
+ spark.sql(
+ s"""
+ | insert into $tableName (dt, name, id, price, ts)
+ | values ('2025-01-03', 'a3', 3, 30, 3000)
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+ Seq(3, "a3", 30.0, 3000, "2025-01-03")
+ )
+
+ // INSERT INTO with a subset of columns
+ // Using different ordering of subset of columns in user-specified columns,
+ // and VALUES without column names
+ spark.sql(
+ s"""
+ | insert into $tableName (dt, ts, name, id)
+ | values ('2025-01-04', 4000, 'a4', 4)
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+ Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+ Seq(4, "a4", null, 4000, "2025-01-04")
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName (id, price, ts, dt)
+ | values (5, 50.0, 5000, '2025-01-05')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+ Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+ Seq(4, "a4", null, 4000, "2025-01-04"),
+ Seq(5, null, 50.0, 5000, "2025-01-05")
+ )
+
+ // Using a subset of columns in user-specified columns, and VALUES with
column names
+ spark.sql(
+ s"""
+ | insert into $tableName (dt, ts, id, name)
+ | values ('2025-01-06' as dt, 6000 as ts, 6 as id, 'a6' as name)
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+ Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+ Seq(4, "a4", null, 4000, "2025-01-04"),
+ Seq(5, null, 50.0, 5000, "2025-01-05"),
+ Seq(6, "a6", null, 6000, "2025-01-06")
+ )
+
+ if (isPartitioned) {
+ spark.sql(
+ s"""
+ | insert into $tableName partition(dt='2025-01-07') (ts, id, name)
+ | values (7000, 7, 'a7')
+ """.stripMargin)
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1", 10.0, 1000, "2025-01-01"),
+ Seq(2, "a2", 20.0, 2000, "2025-01-02"),
+ Seq(3, "a3", 30.0, 3000, "2025-01-03"),
+ Seq(4, "a4", null, 4000, "2025-01-04"),
+ Seq(5, null, 50.0, 5000, "2025-01-05"),
+ Seq(6, "a6", null, 6000, "2025-01-06"),
+ Seq(7, "a7", null, 7000, "2025-01-07")
+ )
+ }
+ }
+
test("Test table type name incase-sensitive test") {
withTempDir { tmp =>
val targetTable = generateTableName
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
index f137c9dea6c..3e5e0028cf9 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark35Analysis.scala
@@ -17,15 +17,23 @@
package org.apache.spark.sql.hudi.analysis
-import org.apache.hudi.DefaultSource
+import org.apache.hudi.{DefaultSource, EmptyRelation, HoodieBaseRelation}
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.spark.sql.{AnalysisException, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.{ResolveInsertionBase,
TableOutputResolver}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation, PreprocessTableInsertion}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table
-import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.sql.sources.InsertableRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
/**
* NOTE: PLEASE READ CAREFULLY
@@ -64,3 +72,140 @@ case class
HoodieSpark35DataSourceV2ToV1Fallback(sparkSession: SparkSession) ext
LogicalRelation(relation, output, catalogTable, isStreaming = false)
}
}
+
+/**
+ * In Spark 3.5, the following Resolution rules are removed,
+ * [[ResolveUserSpecifiedColumns]] and [[ResolveDefaultColumns]]
+ * (see code changes in [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+ * from https://github.com/apache/spark/pull/41262).
+ * The same logic of resolving the user specified columns and default values,
+ * which are required for a subset of columns as user specified compared to
the table
+ * schema to work properly, are deferred to [[PreprocessTableInsertion]] for
v1 INSERT.
+ *
+ * Note that [[HoodieAnalysis]] intercepts the [[InsertIntoStatement]] after
Spark's built-in
+ * Resolution rules are applies, the logic of resolving the user specified
columns and default
+ * values may no longer be applied. To make INSERT with a subset of columns
specified by user
+ * to work, this custom resolution rule
[[HoodieSpark35ResolveColumnsForInsertInto]] is added
+ * to achieve the same, before converting [[InsertIntoStatement]] into
+ * [[InsertIntoHoodieTableCommand]].
+ *
+ * The implementation is copied and adapted from [[PreprocessTableInsertion]]
+ *
https://github.com/apache/spark/blob/d061aadf25fd258d2d3e7332a489c9c24a2b5530/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L373
+ *
+ * Also note that, the project logic in [[ResolveImplementationsEarly]] for
INSERT is still
+ * needed in the case of INSERT with all columns in a different ordering.
+ */
+case class HoodieSpark35ResolveColumnsForInsertInto() extends
ResolveInsertionBase {
+ // NOTE: This is copied from [[PreprocessTableInsertion]] with additional
handling of Hudi relations
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case i@InsertIntoStatement(table, _, _, query, _, _, _)
+ if table.resolved && query.resolved
+ && i.userSpecifiedCols.nonEmpty &&
i.table.isInstanceOf[LogicalRelation]
+ &&
sparkAdapter.isHoodieTable(i.table.asInstanceOf[LogicalRelation].catalogTable.get)
=>
+ table match {
+ case relation: HiveTableRelation =>
+ val metadata = relation.tableMeta
+ preprocess(i, metadata.identifier.quotedString,
metadata.partitionSchema,
+ Some(metadata))
+ case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) =>
+ preprocess(i, catalogTable, h.partitionSchema)
+ case LogicalRelation(_: InsertableRelation, _, catalogTable, _) =>
+ preprocess(i, catalogTable, new StructType())
+ // The two conditions below are adapted to Hudi relations
+ case LogicalRelation(_: EmptyRelation, _, catalogTable, _) =>
+ preprocess(i, catalogTable)
+ case LogicalRelation(_: HoodieBaseRelation, _, catalogTable, _) =>
+ preprocess(i, catalogTable)
+ case _ => i
+ }
+ case _ => plan
+ }
+ }
+
+ private def preprocess(insert: InsertIntoStatement,
+ catalogTable: Option[CatalogTable]):
InsertIntoStatement = {
+ preprocess(insert, catalogTable,
catalogTable.map(_.partitionSchema).getOrElse(new StructType()))
+ }
+
+ private def preprocess(insert: InsertIntoStatement,
+ catalogTable: Option[CatalogTable],
+ partitionSchema: StructType): InsertIntoStatement = {
+ val tblName =
catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
+ preprocess(insert, tblName, partitionSchema, catalogTable)
+ }
+
+ // NOTE: this is copied from [[PreprocessTableInsertion]] with additional
logic
+ // to unset user-specified columns at the end
+ private def preprocess(insert: InsertIntoStatement,
+ tblName: String,
+ partColNames: StructType,
+ catalogTable: Option[CatalogTable]):
InsertIntoStatement = {
+
+ val normalizedPartSpec = normalizePartitionSpec(
+ insert.partitionSpec, partColNames, tblName, conf.resolver)
+
+ val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
+ val expectedColumns = insert.table.output.filterNot(a =>
staticPartCols.contains(a.name))
+
+ val partitionsTrackedByCatalog = catalogTable.isDefined &&
+ catalogTable.get.partitionColumnNames.nonEmpty &&
+ catalogTable.get.tracksPartitionsInCatalog
+ if (partitionsTrackedByCatalog && normalizedPartSpec.nonEmpty) {
+ // empty partition column value
+ if (normalizedPartSpec.values.flatten.exists(v => v != null &&
v.isEmpty)) {
+ val spec = normalizedPartSpec.map(p => p._1 + "=" +
p._2).mkString("[", ", ", "]")
+ throw QueryCompilationErrors.invalidPartitionSpecError(
+ s"The spec ($spec) contains an empty partition column value")
+ }
+ }
+
+ // Create a project if this INSERT has a user-specified column list.
+ val hasColumnList = insert.userSpecifiedCols.nonEmpty
+ val query = if (hasColumnList) {
+ createProjectForByNameQuery(tblName, insert)
+ } else {
+ insert.query
+ }
+ val newQuery = try {
+ TableOutputResolver.resolveOutputColumns(
+ tblName,
+ expectedColumns,
+ query,
+ byName = hasColumnList || insert.byName,
+ conf,
+ supportColDefaultValue = true)
+ } catch {
+ case e: AnalysisException if staticPartCols.nonEmpty &&
+ (e.getErrorClass ==
"INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" ||
+ e.getErrorClass ==
"INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS") =>
+ val newException = e.copy(
+ errorClass = Some("INSERT_PARTITION_COLUMN_ARITY_MISMATCH"),
+ messageParameters = e.messageParameters ++ Map(
+ "tableColumns" -> insert.table.output.map(c =>
toSQLId(c.name)).mkString(", "),
+ "staticPartCols" -> staticPartCols.toSeq.sorted.map(c =>
toSQLId(c)).mkString(", ")
+ ))
+ newException.setStackTrace(e.getStackTrace)
+ throw newException
+ }
+ if (normalizedPartSpec.nonEmpty) {
+ if (normalizedPartSpec.size != partColNames.length) {
+ throw
QueryCompilationErrors.requestedPartitionsMismatchTablePartitionsError(
+ tblName, normalizedPartSpec, partColNames)
+ }
+
+ // NOTE: Hudi converts [[InsertIntoStatement]] to
[[InsertIntoHoodieTableCommand]]
+ // and the user specified is no longer need after resolution
+ // (`userSpecifiedCols = Seq()`)
+ insert.copy(query = newQuery, partitionSpec = normalizedPartSpec,
userSpecifiedCols = Seq())
+ } else {
+ // All partition columns are dynamic because the InsertIntoTable command
does
+ // not explicitly specify partitioning columns.
+ // NOTE: Hudi converts [[InsertIntoStatement]] to
[[InsertIntoHoodieTableCommand]]
+ // and the user specified is no longer need after resolution
+ // (`userSpecifiedCols = Seq()`)
+ insert.copy(query = newQuery, partitionSpec =
partColNames.map(_.name).map(_ -> None).toMap,
+ userSpecifiedCols = Seq())
+ }
+ }
+}