aokolnychyi commented on code in PR #38005: URL: https://github.com/apache/spark/pull/38005#discussion_r1081828234
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala: ########## @@ -274,6 +274,120 @@ case class ReplaceData( } } +/** + * Writes a delta of rows to an existing table during a row-level operation. + * + * This node references a query that translates a logical DELETE, UPDATE, MERGE operation into + * a set of row-level changes to be encoded in the table. Each row in the query represents either + * a delete, update or insert and stores the operation type in a special column. + * + * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE operations for data sources + * that can handle deltas of rows. + * + * @param table a plan that references a row-level operation table + * @param condition a condition that defines matching records + * @param query a query with a delta of records that should written + * @param originalTable a plan for the original table for which the row-level command was triggered + * @param projections projections for row ID, row, metadata attributes + * @param write a logical write, if already constructed + */ +case class WriteDelta( + table: NamedRelation, + condition: Expression, + query: LogicalPlan, + originalTable: NamedRelation, + projections: WriteDeltaProjections, + write: Option[DeltaWrite] = None) extends RowLevelWrite { + + override val isByName: Boolean = false + override val stringArgs: Iterator[Any] = Iterator(table, query, write) + + override lazy val references: AttributeSet = query.outputSet + + lazy val operation: SupportsDelta = { + EliminateSubqueryAliases(table) match { + case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) => + operation.asInstanceOf[SupportsDelta] + case _ => + throw new AnalysisException(s"Cannot retrieve row-level operation from $table") + } + } + + override def outputResolved: Boolean = { + assert(table.resolved && query.resolved, + "`outputResolved` can only be called when `table` and `query` are both resolved.") + + operationResolved && rowAttrsResolved && rowIdAttrsResolved && metadataAttrsResolved + } + + private def operationResolved: Boolean = { + val attr = query.output.head + attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType == IntegerType && !attr.nullable + } + + // validates row projection output is compatible with table attributes + private def rowAttrsResolved: Boolean = { + table.skipSchemaResolution || (projections.rowProjection match { + case Some(projection) => + table.output.size == projection.schema.size && + projection.schema.zip(table.output).forall { case (field, outAttr) => + isCompatible(field, outAttr) + } + case None => + true + }) + } + + // validates row ID projection output is compatible with row ID attributes + private def rowIdAttrsResolved: Boolean = { + val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference]( + operation.rowId, + originalTable) + + val projectionSchema = projections.rowIdProjection.schema + rowIdAttrs.size == projectionSchema.size && projectionSchema.forall { field => + rowIdAttrs.exists(rowIdAttr => isCompatible(field, rowIdAttr)) + } + } + + // validates metadata projection output is compatible with metadata attributes + private def metadataAttrsResolved: Boolean = { + projections.metadataProjection match { + case Some(projection) => + val metadataAttrs = V2ExpressionUtils.resolveRefs[AttributeReference]( + operation.requiredMetadataAttributes, + originalTable) + + val projectionSchema = projection.schema + metadataAttrs.size == projectionSchema.size && projectionSchema.forall { field => + metadataAttrs.exists(metadataAttr => isCompatible(field, metadataAttr)) + } + case None => + true + } + } + + private def isCompatible(projectionField: StructField, outAttr: NamedExpression): Boolean = { + val inType = CharVarcharUtils.getRawType(projectionField.metadata).getOrElse(outAttr.dataType) Review Comment: Good catch! Let me fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org