YannByron commented on code in PR #7144:
URL: https://github.com/apache/paimon/pull/7144#discussion_r2747050421
##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala:
##########
@@ -50,31 +52,81 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with
ExpressionHelper {
*/
protected def generateAlignedExpressions(
attrs: Seq[Attribute],
- assignments: Seq[Assignment]): Seq[Expression] = {
+ assignments: Seq[Assignment],
+ v2Write: Boolean = false): Seq[Expression] = {
val attrUpdates = assignments.map(a => AttrUpdate(toRefSeq(a.key),
a.value))
- recursiveAlignUpdates(attrs, attrUpdates)
+ recursiveAlignUpdates(attrs, attrUpdates, Nil, v2Write)
}
protected def alignAssignments(
attrs: Seq[Attribute],
- assignments: Seq[Assignment]): Seq[Assignment] = {
- generateAlignedExpressions(attrs, assignments).zip(attrs).map {
+ assignments: Seq[Assignment],
+ v2Write: Boolean = false): Seq[Assignment] = {
+ generateAlignedExpressions(attrs, assignments, v2Write).zip(attrs).map {
case (expression, field) => Assignment(field, expression)
}
}
+ /**
+ * Align assignments in a MergeAction based on the target table's output
attributes.
+ * - DeleteAction: returns as-is
+ * - UpdateAction: aligns assignments for update
+ * - InsertAction: aligns assignments for insert
+ */
+ protected def alignMergeAction(
+ action: MergeAction,
+ targetOutput: Seq[Attribute],
+ v2Write: Boolean): MergeAction = {
+ action match {
+ case d @ DeleteAction(_) => d
+ case u @ UpdateAction(_, assignments) =>
+ u.copy(assignments = alignAssignments(targetOutput, assignments,
v2Write))
+ case i @ InsertAction(_, assignments) =>
+ i.copy(assignments = alignAssignments(targetOutput, assignments,
v2Write))
+ case _: UpdateStarAction =>
+ throw new RuntimeException("UpdateStarAction should not be here.")
+ case _: InsertStarAction =>
+ throw new RuntimeException("InsertStarAction should not be here.")
+ case _ =>
+ throw new RuntimeException(s"Can't recognize this action: $action")
+ }
+ }
+
+ /**
+ * Align all MergeActions in a MergeIntoTable based on the target table's
output attributes.
+ * Returns a new MergeIntoTable with aligned matchedActions,
notMatchedActions, and
+ * notMatchedBySourceActions.
+ */
+ protected def alignMergeIntoTable(
+ m: MergeIntoTable,
+ targetOutput: Seq[Attribute],
+ v2Write: Boolean): MergeIntoTable = {
+ m.copy(
+ matchedActions = m.matchedActions.map(alignMergeAction(_, targetOutput,
v2Write)),
+ notMatchedActions = m.notMatchedActions.map(alignMergeAction(_,
targetOutput, v2Write)),
+ notMatchedBySourceActions =
+ m.notMatchedBySourceActions.map(alignMergeAction(_, targetOutput,
v2Write))
+ )
+ }
+
private def recursiveAlignUpdates(
targetAttrs: Seq[NamedExpression],
updates: Seq[AttrUpdate],
- namePrefix: Seq[String] = Nil): Seq[Expression] = {
+ namePrefix: Seq[String] = Nil,
+ v2Write: Boolean = false): Seq[Expression] = {
// build aligned updated expression for each target attr
targetAttrs.map {
targetAttr =>
val headMatchedUpdates = updates.filter(u => resolver(u.ref.head,
targetAttr.name))
if (headMatchedUpdates.isEmpty) {
- // when no matched update, return the attr as is
- targetAttr
+ if (v2Write) {
+ // For V2Write, use default value or NULL for missing columns
+ getDefaultValueOrNull(targetAttr)
+ } else {
+ // For V1Write, return the attr as is
+ targetAttr
Review Comment:
Can we unity this behavior to assign the default value here for v1 writer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]