dongjoon-hyun commented on code in PR #55518:
URL: https://github.com/apache/spark/pull/55518#discussion_r3192401119


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -154,30 +206,145 @@ object RewriteUpdateTable extends RewriteRowLevelCommand 
{
       cond: Expression): WriteDelta = {
 
     val operation = operationTable.operation.asInstanceOf[SupportsDelta]
+    // Column-update support applies to the standard delta path and the 
delete+reinsert path.
+    // When representUpdateAsDeleteAndInsert is true, the REINSERT leg of the 
Expand already
+    // uses only assigned values, so the narrow effectiveRowAttrs applies 
correctly.
+    val supportsColumnUpdate = operation.supportsColumnUpdates()
 
     // resolve all needed attrs (e.g. row ID and any required metadata attrs)
-    val rowAttrs = relation.output
     val rowIdAttrs = resolveRowIdAttrs(relation, operation)
     val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
 
-    // construct a read relation and include all required metadata columns
+    // Connector-declared data attrs used to determine pass-through columns in 
the write plan.
+    val connectorDataAttrs = if (supportsColumnUpdate) {
+      resolveRequiredDataAttrs(relation, operation)
+    } else Nil
+
+    // MOR uses a full-schema scan; ColumnPruning narrows it via Project 
references.
     val readRelation = buildRelationWithAttrs(relation, operationTable, 
metadataAttrs, rowIdAttrs)
 
+    // Connector-required attrs that are NOT being assigned are added as 
pass-throughs in the
+    // plan so that ColumnPruning keeps them in the physical scan AND the 
connector receives
+    // their current values via DeltaWriter.update's row argument.
+    val assignedAttrs = if (supportsColumnUpdate) 
computeAssignedAttrs(assignments)
+                        else relation.output
+    val connectorExtraAttrs: Seq[AttributeReference] = if 
(connectorDataAttrs.nonEmpty) {
+      val assignedAttrSet = AttributeSet(assignedAttrs)
+      connectorDataAttrs.filterNot(assignedAttrSet.contains)
+    } else Nil
+
     // build a plan for updated records that match the condition
     val matchedRowsPlan = Filter(cond, readRelation)
     val rowDeltaPlan = if (operation.representUpdateAsDeleteAndInsert) {
       buildDeletesAndInserts(matchedRowsPlan, assignments, rowIdAttrs)
+    } else if (supportsColumnUpdate) {
+      buildColumnUpdateProjection(
+        matchedRowsPlan, assignments, rowIdAttrs, metadataAttrs, 
connectorExtraAttrs)
     } else {
       buildWriteDeltaUpdateProjection(matchedRowsPlan, assignments, rowIdAttrs)
     }
 
+    // Effective row write schema:
+    // - Narrow path (connectorDataAttrs declared): exactly connector-declared 
cols in declared
+    //   order.  The connector must declare ALL columns it wants to receive 
(including updated
+    //   ones).  This mirrors the metadata pattern and enables strict 
areCompatible validation.
+    // - Heuristic path (connectorDataAttrs empty): only the assigned 
(changed) columns.
+    // - Full path (no column-update support): full table output.
+    val effectiveRowAttrs = if (supportsColumnUpdate && 
connectorDataAttrs.nonEmpty) {
+      connectorDataAttrs
+    } else if (supportsColumnUpdate) {
+      assignedAttrs
+    } else {
+      relation.output
+    }
+
     // build a plan to write the row delta to the table
     val writeRelation = relation.copy(table = operationTable)
-    val projections = buildWriteDeltaProjections(rowDeltaPlan, rowAttrs, 
rowIdAttrs, metadataAttrs)
+    val projections = buildWriteDeltaProjections(
+      rowDeltaPlan, effectiveRowAttrs, rowIdAttrs, metadataAttrs)
     val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
     WriteDelta(writeRelation, cond, rowDeltaPlan, relation, projections, 
groupFilterCond)
   }
 
+  // Builds the row delta projection for the column update path.
+  //
+  // The resulting Project references only:
+  //   - assigned column values (new values being written)
+  //   - connector pass-through values (connector declared but not assigned)
+  //   - metadata columns (nulled or preserved)
+  //   - row ID columns (for delta identification)
+  //   - original row ID values (only when a row ID column is being reassigned)
+  //
+  // ColumnPruning observes exactly these references and narrows the physical 
scan accordingly.
+  // Connectors that need additional columns in the scan (e.g., partition 
columns for
+  // distribution) should declare them in requiredDataAttributes().
+  //
+  // Note: AlignUpdateAssignments guarantees all assignment keys are top-level

Review Comment:
   Do we have a test coverage for this, `AlignUpdateAssignments` contract?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to