johanl-db commented on code in PR #55583:
URL: https://github.com/apache/spark/pull/55583#discussion_r3155127119


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
   // 
---------------------------------------------------------------------------
 
   /**
-   * Collapses multiple changes per row identity into the net effect.
-   * Not yet implemented.
+   * Collapses multiple changes per row identity into the net effect:
+   *
+   * | existedBefore | existsAfter | output                              |
+   * |---------------|-------------|-------------------------------------|
+   * | false         | false       | (cancel)                            |
+   * | false         | true        | insert                              |
+   * | true          | false       | delete                              |
+   * | true          | true        | update_preimage + update_postimage  |
+   *
+   * If `computeUpdates = false`, the `update_preimage` + `update_postimage` 
pair is
+   * emitted as `delete` + `insert` instead.
+   *
+   * `existedBefore` is true iff the partition's first event is `delete` or
+   * `update_preimage`. `existsAfter` is true iff the partition's last event is
+   * `insert` or `update_postimage`.
+   *
+   * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per 
partition)

Review Comment:
   ```suggestion
      * Pipeline: Window (per-rowId aggregates, sort by version) -> Filter 
(keep first/last per partition)
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
   // 
---------------------------------------------------------------------------
 
   /**
-   * Collapses multiple changes per row identity into the net effect.
-   * Not yet implemented.
+   * Collapses multiple changes per row identity into the net effect:
+   *
+   * | existedBefore | existsAfter | output                              |
+   * |---------------|-------------|-------------------------------------|
+   * | false         | false       | (cancel)                            |
+   * | false         | true        | insert                              |
+   * | true          | false       | delete                              |
+   * | true          | true        | update_preimage + update_postimage  |
+   *
+   * If `computeUpdates = false`, the `update_preimage` + `update_postimage` 
pair is
+   * emitted as `delete` + `insert` instead.
+   *
+   * `existedBefore` is true iff the partition's first event is `delete` or
+   * `update_preimage`. `existsAfter` is true iff the partition's last event is
+   * `insert` or `update_postimage`.
+   *
+   * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per 
partition)
+   * -> Project (relabel `_change_type` and drop helper columns).
    */
   private def injectNetChangeComputation(
       plan: LogicalPlan,
-      cl: Changelog): LogicalPlan = {
-    plan
+      cl: Changelog,
+      computeUpdates: Boolean): LogicalPlan = {
+    val windowedPlan = addNetChangesWindow(plan, cl)
+    val filteredAndRelabeledPlan =
+      removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan, 
computeUpdates)
+    filteredAndRelabeledPlan
+  }
+
+  /**
+   * Adds a Window node partitioned by `rowId` and ordered by
+   * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`,
+   * `delete`) sort before post-events (`update_postimage`, `insert`) within 
the same
+   * commit. Computes per-partition helper columns:
+   *   - `__spark_cdc_row_number` (1..n) answers: "is this the first or last 
row?".
+   *   - `__spark_cdc_row_count` is the partition size which combined with 
row_number is
+   *     used to detect the last row.
+   *   - `__spark_cdc_first_row_change_type_value` and
+   *     `__spark_cdc_last_row_change_type_value` drive the first/last 
classification at
+   *     filter and relabel time.
+   */
+  private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog): 
LogicalPlan = {
+    val changeTypeAttr = getAttribute(plan, "_change_type")
+    val rowIdExprs = 
V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan)
+    val commitVersionAttr = getAttribute(plan, "_commit_version")
+    val changeTypeRank = CaseWhen(Seq(
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)) 
-> Literal(0),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) -> 
Literal(0),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) -> 
Literal(1),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)) 
-> Literal(1)),
+      Literal(2))

Review Comment:
   For the 'else' branch: this should never happen, there's an expression 
`RaiseError` that can throw an exception whenever it's evaluated, which would 
be a good fit here



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
   // 
---------------------------------------------------------------------------
 
   /**
-   * Collapses multiple changes per row identity into the net effect.
-   * Not yet implemented.
+   * Collapses multiple changes per row identity into the net effect:
+   *
+   * | existedBefore | existsAfter | output                              |
+   * |---------------|-------------|-------------------------------------|
+   * | false         | false       | (cancel)                            |
+   * | false         | true        | insert                              |
+   * | true          | false       | delete                              |
+   * | true          | true        | update_preimage + update_postimage  |
+   *
+   * If `computeUpdates = false`, the `update_preimage` + `update_postimage` 
pair is
+   * emitted as `delete` + `insert` instead.
+   *
+   * `existedBefore` is true iff the partition's first event is `delete` or
+   * `update_preimage`. `existsAfter` is true iff the partition's last event is
+   * `insert` or `update_postimage`.
+   *
+   * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per 
partition)
+   * -> Project (relabel `_change_type` and drop helper columns).
    */
   private def injectNetChangeComputation(
       plan: LogicalPlan,
-      cl: Changelog): LogicalPlan = {
-    plan
+      cl: Changelog,
+      computeUpdates: Boolean): LogicalPlan = {
+    val windowedPlan = addNetChangesWindow(plan, cl)
+    val filteredAndRelabeledPlan =
+      removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan, 
computeUpdates)
+    filteredAndRelabeledPlan
+  }
+
+  /**
+   * Adds a Window node partitioned by `rowId` and ordered by
+   * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`,
+   * `delete`) sort before post-events (`update_postimage`, `insert`) within 
the same
+   * commit. Computes per-partition helper columns:
+   *   - `__spark_cdc_row_number` (1..n) answers: "is this the first or last 
row?".
+   *   - `__spark_cdc_row_count` is the partition size which combined with 
row_number is
+   *     used to detect the last row.
+   *   - `__spark_cdc_first_row_change_type_value` and
+   *     `__spark_cdc_last_row_change_type_value` drive the first/last 
classification at
+   *     filter and relabel time.
+   */
+  private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog): 
LogicalPlan = {
+    val changeTypeAttr = getAttribute(plan, "_change_type")
+    val rowIdExprs = 
V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan)
+    val commitVersionAttr = getAttribute(plan, "_commit_version")
+    val changeTypeRank = CaseWhen(Seq(
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)) 
-> Literal(0),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) -> 
Literal(0),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) -> 
Literal(1),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)) 
-> Literal(1)),
+      Literal(2))
+    val partitionByCols = rowIdExprs
+    val orderSpec = Seq(
+      SortOrder(commitVersionAttr, Ascending),
+      SortOrder(changeTypeRank, Ascending))
+    val rowNumberWindowSpec = WindowSpecDefinition(
+      partitionByCols, orderSpec,
+      UnspecifiedFrame)
+    val aggregateWindowSpec = WindowSpecDefinition(
+      partitionByCols, orderSpec,
+      SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing))
+
+    val rowNumberAlias = Alias(
+      WindowExpression(RowNumber(), rowNumberWindowSpec),
+      NetChangesHelperColumns.RowNumber)()
+    val rowCountAlias = Alias(
+      WindowExpression(Count(Seq(Literal(1))).toAggregateExpression(), 
aggregateWindowSpec),
+      NetChangesHelperColumns.RowCount)()
+    val firstRowChangeTypeValueAlias = Alias(
+      WindowExpression(
+        First(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+        aggregateWindowSpec),
+      NetChangesHelperColumns.FirstRowChangeTypeValue)()
+    val lastRowChangeTypeValueAlias = Alias(
+      WindowExpression(
+        Last(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+        aggregateWindowSpec),
+      NetChangesHelperColumns.LastRowChangeTypeValue)()
+
+    Window(
+      Seq(rowNumberAlias, rowCountAlias, firstRowChangeTypeValueAlias,
+        lastRowChangeTypeValueAlias),
+      partitionByCols, orderSpec, plan)
+  }
+
+  /**
+   * Filters and relabels the windowed plan: keeps only the first and/or last 
row per
+   * `rowId` partition, then rewrites the surviving rows' `_change_type` and 
drops the
+   * helper columns.
+   *
+   * | existedBefore | existsAfter | output                              |
+   * |---------------|-------------|-------------------------------------|
+   * | false         | false       | (cancel)                            |
+   * | false         | true        | insert                              |
+   * | true          | false       | delete                              |
+   * | true          | true        | update_preimage + update_postimage  |
+   *
+   * If `computeUpdates = false`, the `update_preimage` + `update_postimage` 
pair is
+   * emitted as `delete` + `insert` instead.
+   *
+   * `existedBefore` is true iff the partition's first event is `delete` or
+   * `update_preimage`. `existsAfter` is true iff the partition's last event is
+   * `insert` or `update_postimage`.
+   *
+   * Helper columns (`__spark_cdc_*`) are dropped in the same Project that 
does the
+   * relabel, saving a follow-up cleanup pass.
+   */
+  private def removeIntermediateChangelogEntriesAndRelabelChangeTypes(
+       windowedPlan: LogicalPlan,
+       computeUpdates: Boolean
+     ): LogicalPlan = {
+    val rowNumberAttr = getAttribute(windowedPlan, 
NetChangesHelperColumns.RowNumber)
+    val rowCountAttr = getAttribute(windowedPlan, 
NetChangesHelperColumns.RowCount)
+    val firstRowChangeTypeAttr =
+      getAttribute(windowedPlan, 
NetChangesHelperColumns.FirstRowChangeTypeValue)
+    val lastRowChangeTypeAttr =
+      getAttribute(windowedPlan, 
NetChangesHelperColumns.LastRowChangeTypeValue)
+
+    val existedBeforeVersionRange = In(firstRowChangeTypeAttr, Seq(
+      Literal(Changelog.CHANGE_TYPE_DELETE),
+      Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)))
+    val existsAfterVersionRange = In(lastRowChangeTypeAttr, Seq(
+      Literal(Changelog.CHANGE_TYPE_INSERT),
+      Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)))
+
+    val isFirst = EqualTo(rowNumberAttr, Literal(1))
+    val isLast = EqualTo(rowNumberAttr, rowCountAttr)
+
+    // only keep first and last entry per set of changes for a rowId, order of 
cases is important!
+    val keep = CaseWhen(Seq(
+      // filter out if inserted and deleted within range
+      And(Not(existedBeforeVersionRange), Not(existsAfterVersionRange)) -> 
Literal(false),
+      // for persisting new row keep only last state
+      And(Not(existedBeforeVersionRange), existsAfterVersionRange) -> isLast,
+      // for previously existing row keep first state
+      And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> isFirst,
+      // for persisting row keep first and last state
+      And(existedBeforeVersionRange, existsAfterVersionRange) -> Or(isFirst, 
isLast)),
+      Literal(false)) // dont keep row by default
+
+    val filteredPlan = Filter(keep, windowedPlan)
+
+    val computedPreUpdateLabel =
+      if (computeUpdates) Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)
+      else Literal(Changelog.CHANGE_TYPE_DELETE)
+    val computedPostUpdateLabel =
+      if (computeUpdates) Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)
+      else Literal(Changelog.CHANGE_TYPE_INSERT)
+
+    val changeTypeAttr = getAttribute(filteredPlan, "_change_type")
+
+    val relabel = CaseWhen(Seq(
+      And(Not(existedBeforeVersionRange), isLast) -> 
Literal(Changelog.CHANGE_TYPE_INSERT),
+      And(Not(existsAfterVersionRange), isFirst) -> 
Literal(Changelog.CHANGE_TYPE_DELETE),
+      And(And(existedBeforeVersionRange, existsAfterVersionRange), isFirst)
+        -> computedPreUpdateLabel,
+      And(And(existedBeforeVersionRange, existsAfterVersionRange), isLast)
+        -> computedPostUpdateLabel),
+      changeTypeAttr)

Review Comment:
   I think it's helpful to show here why this is needed,e.g.:
   "delete" + "insert" + "update_preimage" + "update_postimage" -> only 
"delete" + "update_postimage" is kept -> relabel "delete" into "update_preimage"
   
   That is:
   Case 1: "insert" + "update_preimage" + "update_postimage" -> relabel 
"update_postimage" to "insert"
   Case 2: "update_preimage" + "update_postimage" + delete -> relabel 
"update_preimage" to" delete
   Case 3: "delete" + "insert" + "update_preimage" + "update_postimage"  -> 
relabel "delete" to "update_preimage"
   Case 4: "update_preimage" + "update_postimage" + "delete" + "insert" -> 
relabel "insert" to "update_postimage
   
   
   ~Can you make sure you have a test for all of these?~ I see the tests



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
   // 
---------------------------------------------------------------------------
 
   /**
-   * Collapses multiple changes per row identity into the net effect.
-   * Not yet implemented.
+   * Collapses multiple changes per row identity into the net effect:
+   *
+   * | existedBefore | existsAfter | output                              |
+   * |---------------|-------------|-------------------------------------|
+   * | false         | false       | (cancel)                            |
+   * | false         | true        | insert                              |
+   * | true          | false       | delete                              |
+   * | true          | true        | update_preimage + update_postimage  |
+   *
+   * If `computeUpdates = false`, the `update_preimage` + `update_postimage` 
pair is
+   * emitted as `delete` + `insert` instead.
+   *
+   * `existedBefore` is true iff the partition's first event is `delete` or
+   * `update_preimage`. `existsAfter` is true iff the partition's last event is
+   * `insert` or `update_postimage`.
+   *
+   * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per 
partition)
+   * -> Project (relabel `_change_type` and drop helper columns).
    */
   private def injectNetChangeComputation(
       plan: LogicalPlan,
-      cl: Changelog): LogicalPlan = {
-    plan
+      cl: Changelog,
+      computeUpdates: Boolean): LogicalPlan = {
+    val windowedPlan = addNetChangesWindow(plan, cl)
+    val filteredAndRelabeledPlan =
+      removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan, 
computeUpdates)

Review Comment:
   nit:
   ```suggestion
         removeIntermediateChanges(windowedPlan, computeUpdates)
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
   // 
---------------------------------------------------------------------------
 
   /**
-   * Collapses multiple changes per row identity into the net effect.
-   * Not yet implemented.
+   * Collapses multiple changes per row identity into the net effect:

Review Comment:
   nit
   ```suggestion
      * Collapses multiple changes per row identity across versions into the 
net effect:
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
   // 
---------------------------------------------------------------------------
 
   /**
-   * Collapses multiple changes per row identity into the net effect.
-   * Not yet implemented.
+   * Collapses multiple changes per row identity into the net effect:
+   *
+   * | existedBefore | existsAfter | output                              |
+   * |---------------|-------------|-------------------------------------|
+   * | false         | false       | (cancel)                            |
+   * | false         | true        | insert                              |
+   * | true          | false       | delete                              |
+   * | true          | true        | update_preimage + update_postimage  |
+   *
+   * If `computeUpdates = false`, the `update_preimage` + `update_postimage` 
pair is
+   * emitted as `delete` + `insert` instead.
+   *
+   * `existedBefore` is true iff the partition's first event is `delete` or
+   * `update_preimage`. `existsAfter` is true iff the partition's last event is
+   * `insert` or `update_postimage`.
+   *
+   * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per 
partition)
+   * -> Project (relabel `_change_type` and drop helper columns).
    */
   private def injectNetChangeComputation(
       plan: LogicalPlan,
-      cl: Changelog): LogicalPlan = {
-    plan
+      cl: Changelog,
+      computeUpdates: Boolean): LogicalPlan = {
+    val windowedPlan = addNetChangesWindow(plan, cl)
+    val filteredAndRelabeledPlan =
+      removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan, 
computeUpdates)
+    filteredAndRelabeledPlan
+  }
+
+  /**
+   * Adds a Window node partitioned by `rowId` and ordered by
+   * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`,
+   * `delete`) sort before post-events (`update_postimage`, `insert`) within 
the same
+   * commit. Computes per-partition helper columns:
+   *   - `__spark_cdc_row_number` (1..n) answers: "is this the first or last 
row?".
+   *   - `__spark_cdc_row_count` is the partition size which combined with 
row_number is
+   *     used to detect the last row.
+   *   - `__spark_cdc_first_row_change_type_value` and
+   *     `__spark_cdc_last_row_change_type_value` drive the first/last 
classification at
+   *     filter and relabel time.
+   */
+  private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog): 
LogicalPlan = {
+    val changeTypeAttr = getAttribute(plan, "_change_type")
+    val rowIdExprs = 
V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan)
+    val commitVersionAttr = getAttribute(plan, "_commit_version")
+    val changeTypeRank = CaseWhen(Seq(
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)) 
-> Literal(0),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) -> 
Literal(0),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) -> 
Literal(1),
+      EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)) 
-> Literal(1)),
+      Literal(2))
+    val partitionByCols = rowIdExprs
+    val orderSpec = Seq(
+      SortOrder(commitVersionAttr, Ascending),
+      SortOrder(changeTypeRank, Ascending))
+    val rowNumberWindowSpec = WindowSpecDefinition(
+      partitionByCols, orderSpec,
+      UnspecifiedFrame)
+    val aggregateWindowSpec = WindowSpecDefinition(
+      partitionByCols, orderSpec,
+      SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing))
+
+    val rowNumberAlias = Alias(
+      WindowExpression(RowNumber(), rowNumberWindowSpec),
+      NetChangesHelperColumns.RowNumber)()
+    val rowCountAlias = Alias(
+      WindowExpression(Count(Seq(Literal(1))).toAggregateExpression(), 
aggregateWindowSpec),
+      NetChangesHelperColumns.RowCount)()
+    val firstRowChangeTypeValueAlias = Alias(
+      WindowExpression(
+        First(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+        aggregateWindowSpec),
+      NetChangesHelperColumns.FirstRowChangeTypeValue)()
+    val lastRowChangeTypeValueAlias = Alias(
+      WindowExpression(
+        Last(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+        aggregateWindowSpec),
+      NetChangesHelperColumns.LastRowChangeTypeValue)()
+
+    Window(
+      Seq(rowNumberAlias, rowCountAlias, firstRowChangeTypeValueAlias,
+        lastRowChangeTypeValueAlias),
+      partitionByCols, orderSpec, plan)
+  }
+
+  /**
+   * Filters and relabels the windowed plan: keeps only the first and/or last 
row per
+   * `rowId` partition, then rewrites the surviving rows' `_change_type` and 
drops the
+   * helper columns.
+   *
+   * | existedBefore | existsAfter | output                              |
+   * |---------------|-------------|-------------------------------------|
+   * | false         | false       | (cancel)                            |
+   * | false         | true        | insert                              |
+   * | true          | false       | delete                              |
+   * | true          | true        | update_preimage + update_postimage  |
+   *
+   * If `computeUpdates = false`, the `update_preimage` + `update_postimage` 
pair is
+   * emitted as `delete` + `insert` instead.
+   *
+   * `existedBefore` is true iff the partition's first event is `delete` or
+   * `update_preimage`. `existsAfter` is true iff the partition's last event is
+   * `insert` or `update_postimage`.
+   *
+   * Helper columns (`__spark_cdc_*`) are dropped in the same Project that 
does the
+   * relabel, saving a follow-up cleanup pass.
+   */
+  private def removeIntermediateChangelogEntriesAndRelabelChangeTypes(
+       windowedPlan: LogicalPlan,
+       computeUpdates: Boolean
+     ): LogicalPlan = {
+    val rowNumberAttr = getAttribute(windowedPlan, 
NetChangesHelperColumns.RowNumber)
+    val rowCountAttr = getAttribute(windowedPlan, 
NetChangesHelperColumns.RowCount)
+    val firstRowChangeTypeAttr =
+      getAttribute(windowedPlan, 
NetChangesHelperColumns.FirstRowChangeTypeValue)
+    val lastRowChangeTypeAttr =
+      getAttribute(windowedPlan, 
NetChangesHelperColumns.LastRowChangeTypeValue)
+
+    val existedBeforeVersionRange = In(firstRowChangeTypeAttr, Seq(
+      Literal(Changelog.CHANGE_TYPE_DELETE),
+      Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)))
+    val existsAfterVersionRange = In(lastRowChangeTypeAttr, Seq(
+      Literal(Changelog.CHANGE_TYPE_INSERT),
+      Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)))
+
+    val isFirst = EqualTo(rowNumberAttr, Literal(1))
+    val isLast = EqualTo(rowNumberAttr, rowCountAttr)
+
+    // only keep first and last entry per set of changes for a rowId, order of 
cases is important!
+    val keep = CaseWhen(Seq(
+      // filter out if inserted and deleted within range
+      And(Not(existedBeforeVersionRange), Not(existsAfterVersionRange)) -> 
Literal(false),
+      // for persisting new row keep only last state
+      And(Not(existedBeforeVersionRange), existsAfterVersionRange) -> isLast,
+      // for previously existing row keep first state
+      And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> isFirst,
+      // for persisting row keep first and last state
+      And(existedBeforeVersionRange, existsAfterVersionRange) -> Or(isFirst, 
isLast)),
+      Literal(false)) // dont keep row by default

Review Comment:
   ```suggestion
         And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> 
isFirst),
         // for persisting row keep first and last state
         // existedBeforeVersionRange = true, existsAfterVersionRange = true
         Or(isFirst, isLast))
   ```
   
   No need for an unreachable else branch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to