yihua commented on code in PR #12587:
URL: https://github.com/apache/hudi/pull/12587#discussion_r1907926527
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala:
##########
@@ -19,20 +19,56 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
import org.apache.hudi.SparkAdapterSupport
-
import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
Project, UpdateTable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
LogicalPlan, Project, UpdateTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
case class UpdateHoodieTableCommand(ut: UpdateTable) extends
HoodieLeafRunnableCommand
with SparkAdapterSupport with ProvidesHoodieConfig {
+ private var sparkSession: SparkSession = _
+
+ private lazy val hoodieCatalogTable =
sparkAdapter.resolveHoodieTable(ut.table) match {
+ case Some(catalogTable) => HoodieCatalogTable(sparkSession, catalogTable)
+ case _ =>
+ failAnalysis(s"Failed to resolve update statement into the Hudi table.
Got instead: ${ut.table}")
+ }
+
+ /**
+ * Validate there is no assignment clause for the given attribute in the
given table.
+ *
+ * @param resolver The resolver to use
+ * @param fields The fields from the target table who should not have any
assignment clause
+ * @param tableId Table identifier (for error messages)
+ * @param fieldType Type of the attribute to be validated (for error
messages)
+ * @param assignments The assignments clause of the merge into action
Review Comment:
nit: let's vertically align the lines of docs
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala:
##########
@@ -19,20 +19,56 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
import org.apache.hudi.SparkAdapterSupport
-
Review Comment:
nit: keep the import grouping
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala:
##########
@@ -19,20 +19,56 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
import org.apache.hudi.SparkAdapterSupport
-
import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
Project, UpdateTable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
LogicalPlan, Project, UpdateTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
case class UpdateHoodieTableCommand(ut: UpdateTable) extends
HoodieLeafRunnableCommand
with SparkAdapterSupport with ProvidesHoodieConfig {
+ private var sparkSession: SparkSession = _
+
+ private lazy val hoodieCatalogTable =
sparkAdapter.resolveHoodieTable(ut.table) match {
+ case Some(catalogTable) => HoodieCatalogTable(sparkSession, catalogTable)
+ case _ =>
+ failAnalysis(s"Failed to resolve update statement into the Hudi table.
Got instead: ${ut.table}")
+ }
+
+ /**
+ * Validate there is no assignment clause for the given attribute in the
given table.
+ *
+ * @param resolver The resolver to use
+ * @param fields The fields from the target table who should not have any
assignment clause
+ * @param tableId Table identifier (for error messages)
+ * @param fieldType Type of the attribute to be validated (for error
messages)
+ * @param assignments The assignments clause of the merge into action
+ *
+ *
+ * @throws AnalysisException if assignment clause for the given target table
attribute is found
+ */
+ private def validateNoAssignmentsToTargetTableAttr(resolver: Resolver,
+ fields: Seq[String],
+ tableId: String,
+ fieldType: String,
+ assignments:
Seq[(AttributeReference, Expression)]
+ ): Unit = {
+ fields.foreach(field => if (assignments.exists {
+ case (attr, _) => resolver(attr.name, field)
+ }) {
+ throw new AnalysisException(s"Detected update query with disallowed
assignment clause for $fieldType " +
+ s"`$field` for table `$tableId`")
Review Comment:
```suggestion
throw new AnalysisException(s"Detected disallowed assignment clause in
UPDATE statement for $fieldType " +
s"`$field` for table `$tableId`. Please remove the assignment clause
to avoid the error.")
```
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala:
##########
@@ -19,20 +19,56 @@ package org.apache.spark.sql.hudi.command
import org.apache.hudi.DataSourceWriteOptions.{SPARK_SQL_OPTIMIZED_WRITES,
SPARK_SQL_WRITES_PREPPED_KEY}
import org.apache.hudi.SparkAdapterSupport
-
import org.apache.spark.sql.HoodieCatalystExpressionUtils.attributeEquals
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
Project, UpdateTable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
LogicalPlan, Project, UpdateTable}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.failAnalysis
case class UpdateHoodieTableCommand(ut: UpdateTable) extends
HoodieLeafRunnableCommand
with SparkAdapterSupport with ProvidesHoodieConfig {
+ private var sparkSession: SparkSession = _
+
+ private lazy val hoodieCatalogTable =
sparkAdapter.resolveHoodieTable(ut.table) match {
+ case Some(catalogTable) => HoodieCatalogTable(sparkSession, catalogTable)
+ case _ =>
+ failAnalysis(s"Failed to resolve update statement into the Hudi table.
Got instead: ${ut.table}")
+ }
+
+ /**
+ * Validate there is no assignment clause for the given attribute in the
given table.
+ *
+ * @param resolver The resolver to use
+ * @param fields The fields from the target table who should not have any
assignment clause
+ * @param tableId Table identifier (for error messages)
+ * @param fieldType Type of the attribute to be validated (for error
messages)
+ * @param assignments The assignments clause of the merge into action
Review Comment:
```suggestion
* @param resolver The resolver to use
* @param fields The fields from the target table who should not have any
assignment clause
* @param tableId Table identifier (for error messages)
* @param fieldType Type of the attribute to be validated (for error
messages)
* @param assignments The assignments clause
```
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala:
##########
@@ -45,6 +81,24 @@ case class UpdateHoodieTableCommand(ut: UpdateTable) extends
HoodieLeafRunnableC
case Assignment(attr: AttributeReference, value) => attr -> value
}
+ // We don't support update queries changing partition column value.
+ validateNoAssignmentsToTargetTableAttr(
+ sparkSession.sessionState.conf.resolver,
+ hoodieCatalogTable.tableConfig.getPartitionFields.orElse(Array.empty),
+ tableId,
+ "partition field",
+ assignedAttributes
+ )
+
+ // We don't support update queries changing the primary key column value.
+ validateNoAssignmentsToTargetTableAttr(
+ sparkSession.sessionState.conf.resolver,
+ hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
+ tableId,
+ "primaryKey field",
Review Comment:
```suggestion
"record key field",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]