[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3154: [HUDI-1884] MergeInto Support Partial Update For COW
xiarixiaoyao commented on a change in pull request #3154: URL: https://github.com/apache/hudi/pull/3154#discussion_r670211077 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala ## @@ -164,7 +165,47 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi case UpdateAction(condition, assignments) => val (resolvedCondition, resolvedAssignments) = resolveConditionAssignments(condition, assignments) - UpdateAction(resolvedCondition, resolvedAssignments) + + // Get the target table type and pre-combine field. + val targetTableId = getMergeIntoTargetTableId(mergeInto) + val targetTable = +sparkSession.sessionState.catalog.getTableMetadata(targetTableId) + val targetTableType = HoodieOptionConfig.getTableType(targetTable.storage.properties) + val preCombineField = HoodieOptionConfig.getPreCombineField(targetTable.storage.properties) + + // Get the map of target attribute to value of the update assignments. + val target2Values = resolvedAssignments.map { + case Assignment(attr: AttributeReference, value) => +attr.name -> value + case o => throw new IllegalArgumentException(s"Assignment key must be an attribute, current is: ${o.key}") + }.toMap + + // Validate if there are incorrect target attributes. + val unKnowTargets = target2Values.keys +.filterNot(removeMetaFields(target.output).map(_.name).contains(_)) + if (unKnowTargets.nonEmpty) { +throw new AnalysisException(s"Cannot find target attributes: ${unKnowTargets.mkString(",")}.") + } + + // Fill the missing target attribute in the update action for COW table to support partial update. + // e.g. If the update action missing 'id' attribute, we fill a "id = target.id" to the update action. + val newAssignments = removeMetaFields(target.output) +.map(attr => { + // TODO support partial update for MOR. + if (!target2Values.contains(attr.name) && targetTableType == MOR_TABLE_TYPE_OPT_VAL) { +throw new AnalysisException(s"Missing specify the value for target field: '${attr.name}' in merge into update action" + + s" for MOR table. Currently we cannot support partial update for MOR," + + s" please complete all the target fields just like '...update set id = s0.id, name = s0.name '") + } + if (preCombineField.isDefined && preCombineField.get.equalsIgnoreCase(attr.name) Review comment: why should we specify value for the preCombineField in merge-into update action。 maybe we can delete this judgment logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #3154: [HUDI-1884] MergeInto Support Partial Update For COW
xiarixiaoyao commented on a change in pull request #3154: URL: https://github.com/apache/hudi/pull/3154#discussion_r670208968 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala ## @@ -102,7 +103,7 @@ case class HoodieResolveReferences(sparkSession: SparkSession) extends Rule[Logi def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { // Resolve merge into -case MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions) +case mergeInto@MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions) Review comment: maybe better keep style like spark。 case mergeInto @ MergeIntoTable(target, source, mergeCondition, matchedActions, notMatchedActions) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org