SanJSp commented on code in PR #55583:
URL: https://github.com/apache/spark/pull/55583#discussion_r3161090610
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
//
---------------------------------------------------------------------------
/**
- * Collapses multiple changes per row identity into the net effect.
- * Not yet implemented.
+ * Collapses multiple changes per row identity into the net effect:
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per
partition)
+ * -> Project (relabel `_change_type` and drop helper columns).
*/
private def injectNetChangeComputation(
plan: LogicalPlan,
- cl: Changelog): LogicalPlan = {
- plan
+ cl: Changelog,
+ computeUpdates: Boolean): LogicalPlan = {
+ val windowedPlan = addNetChangesWindow(plan, cl)
+ val filteredAndRelabeledPlan =
+ removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan,
computeUpdates)
Review Comment:
I'd prefer to keep the longer name. It makes both responsibilities of the
function explicit (filter + relabel)
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
//
---------------------------------------------------------------------------
/**
- * Collapses multiple changes per row identity into the net effect.
- * Not yet implemented.
+ * Collapses multiple changes per row identity into the net effect:
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per
partition)
+ * -> Project (relabel `_change_type` and drop helper columns).
*/
private def injectNetChangeComputation(
plan: LogicalPlan,
- cl: Changelog): LogicalPlan = {
- plan
+ cl: Changelog,
+ computeUpdates: Boolean): LogicalPlan = {
+ val windowedPlan = addNetChangesWindow(plan, cl)
+ val filteredAndRelabeledPlan =
+ removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan,
computeUpdates)
+ filteredAndRelabeledPlan
+ }
+
+ /**
+ * Adds a Window node partitioned by `rowId` and ordered by
+ * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`,
+ * `delete`) sort before post-events (`update_postimage`, `insert`) within
the same
+ * commit. Computes per-partition helper columns:
+ * - `__spark_cdc_row_number` (1..n) answers: "is this the first or last
row?".
+ * - `__spark_cdc_row_count` is the partition size which combined with
row_number is
+ * used to detect the last row.
+ * - `__spark_cdc_first_row_change_type_value` and
+ * `__spark_cdc_last_row_change_type_value` drive the first/last
classification at
+ * filter and relabel time.
+ */
+ private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog):
LogicalPlan = {
+ val changeTypeAttr = getAttribute(plan, "_change_type")
+ val rowIdExprs =
V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan)
+ val commitVersionAttr = getAttribute(plan, "_commit_version")
+ val changeTypeRank = CaseWhen(Seq(
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE))
-> Literal(0),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) ->
Literal(0),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) ->
Literal(1),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE))
-> Literal(1)),
+ Literal(2))
+ val partitionByCols = rowIdExprs
+ val orderSpec = Seq(
+ SortOrder(commitVersionAttr, Ascending),
+ SortOrder(changeTypeRank, Ascending))
+ val rowNumberWindowSpec = WindowSpecDefinition(
+ partitionByCols, orderSpec,
+ UnspecifiedFrame)
+ val aggregateWindowSpec = WindowSpecDefinition(
+ partitionByCols, orderSpec,
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing))
+
+ val rowNumberAlias = Alias(
+ WindowExpression(RowNumber(), rowNumberWindowSpec),
+ NetChangesHelperColumns.RowNumber)()
+ val rowCountAlias = Alias(
+ WindowExpression(Count(Seq(Literal(1))).toAggregateExpression(),
aggregateWindowSpec),
+ NetChangesHelperColumns.RowCount)()
+ val firstRowChangeTypeValueAlias = Alias(
+ WindowExpression(
+ First(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+ aggregateWindowSpec),
+ NetChangesHelperColumns.FirstRowChangeTypeValue)()
+ val lastRowChangeTypeValueAlias = Alias(
+ WindowExpression(
+ Last(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+ aggregateWindowSpec),
+ NetChangesHelperColumns.LastRowChangeTypeValue)()
+
+ Window(
+ Seq(rowNumberAlias, rowCountAlias, firstRowChangeTypeValueAlias,
+ lastRowChangeTypeValueAlias),
+ partitionByCols, orderSpec, plan)
+ }
+
+ /**
+ * Filters and relabels the windowed plan: keeps only the first and/or last
row per
+ * `rowId` partition, then rewrites the surviving rows' `_change_type` and
drops the
+ * helper columns.
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Helper columns (`__spark_cdc_*`) are dropped in the same Project that
does the
+ * relabel, saving a follow-up cleanup pass.
+ */
+ private def removeIntermediateChangelogEntriesAndRelabelChangeTypes(
+ windowedPlan: LogicalPlan,
+ computeUpdates: Boolean
+ ): LogicalPlan = {
+ val rowNumberAttr = getAttribute(windowedPlan,
NetChangesHelperColumns.RowNumber)
+ val rowCountAttr = getAttribute(windowedPlan,
NetChangesHelperColumns.RowCount)
+ val firstRowChangeTypeAttr =
+ getAttribute(windowedPlan,
NetChangesHelperColumns.FirstRowChangeTypeValue)
+ val lastRowChangeTypeAttr =
+ getAttribute(windowedPlan,
NetChangesHelperColumns.LastRowChangeTypeValue)
+
+ val existedBeforeVersionRange = In(firstRowChangeTypeAttr, Seq(
+ Literal(Changelog.CHANGE_TYPE_DELETE),
+ Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)))
+ val existsAfterVersionRange = In(lastRowChangeTypeAttr, Seq(
+ Literal(Changelog.CHANGE_TYPE_INSERT),
+ Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)))
+
+ val isFirst = EqualTo(rowNumberAttr, Literal(1))
+ val isLast = EqualTo(rowNumberAttr, rowCountAttr)
+
+ // only keep first and last entry per set of changes for a rowId, order of
cases is important!
+ val keep = CaseWhen(Seq(
+ // filter out if inserted and deleted within range
+ And(Not(existedBeforeVersionRange), Not(existsAfterVersionRange)) ->
Literal(false),
+ // for persisting new row keep only last state
+ And(Not(existedBeforeVersionRange), existsAfterVersionRange) -> isLast,
+ // for previously existing row keep first state
+ And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> isFirst,
+ // for persisting row keep first and last state
+ And(existedBeforeVersionRange, existsAfterVersionRange) -> Or(isFirst,
isLast)),
+ Literal(false)) // dont keep row by default
Review Comment:
Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]