dongjoon-hyun commented on code in PR #55518:
URL: https://github.com/apache/spark/pull/55518#discussion_r3192364149
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala:
##########
@@ -50,20 +51,34 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
protected def buildOperationTable(
table: SupportsRowLevelOperations,
command: Command,
- options: CaseInsensitiveStringMap): RowLevelOperationTable = {
- val info = RowLevelOperationInfoImpl(command, options)
+ options: CaseInsensitiveStringMap,
+ updatedColumns: Seq[NamedReference] = Nil): RowLevelOperationTable = {
+ val info = RowLevelOperationInfoImpl(command, options, updatedColumns)
val operation = table.newRowLevelOperationBuilder(info).build()
RowLevelOperationTable(table, operation)
}
+ // Builds a DataSourceV2Relation for a row-level operation, optionally
narrowing its output.
+ //
+ // When dataAttrs is non-empty, the relation output is narrowed to include
only columns
+ // required for a column-update write. When dataAttrs is empty, the full
relation.output is
+ // preserved.
Review Comment:
For function description, please follow the community style like the other
code path.
```
/**
* ...
*/
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala:
##########
@@ -106,38 +104,92 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
val remainingRowsPlan = addOperationColumn(COPY_OPERATION,
Filter(remainingRowFilter, readRelation))
- // the new state is a union of updated and copied over records
- val query = Union(updatedRowsPlan, remainingRowsPlan)
+ val updatedAndRemainingRowsPlan = Union(updatedRowsPlan, remainingRowsPlan)
- // build a plan to replace read groups in the table
val writeRelation = relation.copy(table = operationTable)
- val projections = buildReplaceDataProjections(query, relation.output,
metadataAttrs)
+ val query = updatedAndRemainingRowsPlan
+ val metadataAttrs = resolveRequiredMetadataAttrs(relation,
operationTable.operation)
+ val projections = buildReplaceDataProjections(query, rowAttrs,
metadataAttrs)
val groupFilterCond = if (groupFilterEnabled) Some(cond) else None
ReplaceData(writeRelation, cond, query, relation, projections,
groupFilterCond)
}
+ // Common read-relation setup shared by both CoW plan builders.
+ //
+ // When the connector supports column updates and declares required data
attributes,
+ // the read relation is narrowed at analysis time so that
+ // GroupBasedRowLevelOperationScanPlanning uses only the needed columns for
the scan.
+ // Otherwise the full relation output is used.
Review Comment:
For function description, please follow the community style like the other
code path.
```
/**
* ...
*/
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]