johanl-db commented on code in PR #55583:
URL: https://github.com/apache/spark/pull/55583#discussion_r3155127119
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
//
---------------------------------------------------------------------------
/**
- * Collapses multiple changes per row identity into the net effect.
- * Not yet implemented.
+ * Collapses multiple changes per row identity into the net effect:
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per
partition)
Review Comment:
```suggestion
* Pipeline: Window (per-rowId aggregates, sort by version) -> Filter
(keep first/last per partition)
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
//
---------------------------------------------------------------------------
/**
- * Collapses multiple changes per row identity into the net effect.
- * Not yet implemented.
+ * Collapses multiple changes per row identity into the net effect:
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per
partition)
+ * -> Project (relabel `_change_type` and drop helper columns).
*/
private def injectNetChangeComputation(
plan: LogicalPlan,
- cl: Changelog): LogicalPlan = {
- plan
+ cl: Changelog,
+ computeUpdates: Boolean): LogicalPlan = {
+ val windowedPlan = addNetChangesWindow(plan, cl)
+ val filteredAndRelabeledPlan =
+ removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan,
computeUpdates)
+ filteredAndRelabeledPlan
+ }
+
+ /**
+ * Adds a Window node partitioned by `rowId` and ordered by
+ * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`,
+ * `delete`) sort before post-events (`update_postimage`, `insert`) within
the same
+ * commit. Computes per-partition helper columns:
+ * - `__spark_cdc_row_number` (1..n) answers: "is this the first or last
row?".
+ * - `__spark_cdc_row_count` is the partition size which combined with
row_number is
+ * used to detect the last row.
+ * - `__spark_cdc_first_row_change_type_value` and
+ * `__spark_cdc_last_row_change_type_value` drive the first/last
classification at
+ * filter and relabel time.
+ */
+ private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog):
LogicalPlan = {
+ val changeTypeAttr = getAttribute(plan, "_change_type")
+ val rowIdExprs =
V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan)
+ val commitVersionAttr = getAttribute(plan, "_commit_version")
+ val changeTypeRank = CaseWhen(Seq(
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE))
-> Literal(0),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) ->
Literal(0),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) ->
Literal(1),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE))
-> Literal(1)),
+ Literal(2))
Review Comment:
For the 'else' branch: this should never happen, there's an expression
`RaiseError` that can throw an exception whenever it's evaluated, which would
be a good fit here
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
//
---------------------------------------------------------------------------
/**
- * Collapses multiple changes per row identity into the net effect.
- * Not yet implemented.
+ * Collapses multiple changes per row identity into the net effect:
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per
partition)
+ * -> Project (relabel `_change_type` and drop helper columns).
*/
private def injectNetChangeComputation(
plan: LogicalPlan,
- cl: Changelog): LogicalPlan = {
- plan
+ cl: Changelog,
+ computeUpdates: Boolean): LogicalPlan = {
+ val windowedPlan = addNetChangesWindow(plan, cl)
+ val filteredAndRelabeledPlan =
+ removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan,
computeUpdates)
+ filteredAndRelabeledPlan
+ }
+
+ /**
+ * Adds a Window node partitioned by `rowId` and ordered by
+ * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`,
+ * `delete`) sort before post-events (`update_postimage`, `insert`) within
the same
+ * commit. Computes per-partition helper columns:
+ * - `__spark_cdc_row_number` (1..n) answers: "is this the first or last
row?".
+ * - `__spark_cdc_row_count` is the partition size which combined with
row_number is
+ * used to detect the last row.
+ * - `__spark_cdc_first_row_change_type_value` and
+ * `__spark_cdc_last_row_change_type_value` drive the first/last
classification at
+ * filter and relabel time.
+ */
+ private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog):
LogicalPlan = {
+ val changeTypeAttr = getAttribute(plan, "_change_type")
+ val rowIdExprs =
V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan)
+ val commitVersionAttr = getAttribute(plan, "_commit_version")
+ val changeTypeRank = CaseWhen(Seq(
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE))
-> Literal(0),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) ->
Literal(0),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) ->
Literal(1),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE))
-> Literal(1)),
+ Literal(2))
+ val partitionByCols = rowIdExprs
+ val orderSpec = Seq(
+ SortOrder(commitVersionAttr, Ascending),
+ SortOrder(changeTypeRank, Ascending))
+ val rowNumberWindowSpec = WindowSpecDefinition(
+ partitionByCols, orderSpec,
+ UnspecifiedFrame)
+ val aggregateWindowSpec = WindowSpecDefinition(
+ partitionByCols, orderSpec,
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing))
+
+ val rowNumberAlias = Alias(
+ WindowExpression(RowNumber(), rowNumberWindowSpec),
+ NetChangesHelperColumns.RowNumber)()
+ val rowCountAlias = Alias(
+ WindowExpression(Count(Seq(Literal(1))).toAggregateExpression(),
aggregateWindowSpec),
+ NetChangesHelperColumns.RowCount)()
+ val firstRowChangeTypeValueAlias = Alias(
+ WindowExpression(
+ First(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+ aggregateWindowSpec),
+ NetChangesHelperColumns.FirstRowChangeTypeValue)()
+ val lastRowChangeTypeValueAlias = Alias(
+ WindowExpression(
+ Last(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+ aggregateWindowSpec),
+ NetChangesHelperColumns.LastRowChangeTypeValue)()
+
+ Window(
+ Seq(rowNumberAlias, rowCountAlias, firstRowChangeTypeValueAlias,
+ lastRowChangeTypeValueAlias),
+ partitionByCols, orderSpec, plan)
+ }
+
+ /**
+ * Filters and relabels the windowed plan: keeps only the first and/or last
row per
+ * `rowId` partition, then rewrites the surviving rows' `_change_type` and
drops the
+ * helper columns.
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Helper columns (`__spark_cdc_*`) are dropped in the same Project that
does the
+ * relabel, saving a follow-up cleanup pass.
+ */
+ private def removeIntermediateChangelogEntriesAndRelabelChangeTypes(
+ windowedPlan: LogicalPlan,
+ computeUpdates: Boolean
+ ): LogicalPlan = {
+ val rowNumberAttr = getAttribute(windowedPlan,
NetChangesHelperColumns.RowNumber)
+ val rowCountAttr = getAttribute(windowedPlan,
NetChangesHelperColumns.RowCount)
+ val firstRowChangeTypeAttr =
+ getAttribute(windowedPlan,
NetChangesHelperColumns.FirstRowChangeTypeValue)
+ val lastRowChangeTypeAttr =
+ getAttribute(windowedPlan,
NetChangesHelperColumns.LastRowChangeTypeValue)
+
+ val existedBeforeVersionRange = In(firstRowChangeTypeAttr, Seq(
+ Literal(Changelog.CHANGE_TYPE_DELETE),
+ Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)))
+ val existsAfterVersionRange = In(lastRowChangeTypeAttr, Seq(
+ Literal(Changelog.CHANGE_TYPE_INSERT),
+ Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)))
+
+ val isFirst = EqualTo(rowNumberAttr, Literal(1))
+ val isLast = EqualTo(rowNumberAttr, rowCountAttr)
+
+ // only keep first and last entry per set of changes for a rowId, order of
cases is important!
+ val keep = CaseWhen(Seq(
+ // filter out if inserted and deleted within range
+ And(Not(existedBeforeVersionRange), Not(existsAfterVersionRange)) ->
Literal(false),
+ // for persisting new row keep only last state
+ And(Not(existedBeforeVersionRange), existsAfterVersionRange) -> isLast,
+ // for previously existing row keep first state
+ And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> isFirst,
+ // for persisting row keep first and last state
+ And(existedBeforeVersionRange, existsAfterVersionRange) -> Or(isFirst,
isLast)),
+ Literal(false)) // dont keep row by default
+
+ val filteredPlan = Filter(keep, windowedPlan)
+
+ val computedPreUpdateLabel =
+ if (computeUpdates) Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)
+ else Literal(Changelog.CHANGE_TYPE_DELETE)
+ val computedPostUpdateLabel =
+ if (computeUpdates) Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)
+ else Literal(Changelog.CHANGE_TYPE_INSERT)
+
+ val changeTypeAttr = getAttribute(filteredPlan, "_change_type")
+
+ val relabel = CaseWhen(Seq(
+ And(Not(existedBeforeVersionRange), isLast) ->
Literal(Changelog.CHANGE_TYPE_INSERT),
+ And(Not(existsAfterVersionRange), isFirst) ->
Literal(Changelog.CHANGE_TYPE_DELETE),
+ And(And(existedBeforeVersionRange, existsAfterVersionRange), isFirst)
+ -> computedPreUpdateLabel,
+ And(And(existedBeforeVersionRange, existsAfterVersionRange), isLast)
+ -> computedPostUpdateLabel),
+ changeTypeAttr)
Review Comment:
I think it's helpful to show here why this is needed,e.g.:
"delete" + "insert" + "update_preimage" + "update_postimage" -> only
"delete" + "update_postimage" is kept -> relabel "delete" into "update_preimage"
That is:
Case 1: "insert" + "update_preimage" + "update_postimage" -> relabel
"update_postimage" to "insert"
Case 2: "update_preimage" + "update_postimage" + delete -> relabel
"update_preimage" to" delete
Case 3: "delete" + "insert" + "update_preimage" + "update_postimage" ->
relabel "delete" to "update_preimage"
Case 4: "update_preimage" + "update_postimage" + "delete" + "insert" ->
relabel "insert" to "update_postimage
~Can you make sure you have a test for all of these?~ I see the tests
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
//
---------------------------------------------------------------------------
/**
- * Collapses multiple changes per row identity into the net effect.
- * Not yet implemented.
+ * Collapses multiple changes per row identity into the net effect:
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per
partition)
+ * -> Project (relabel `_change_type` and drop helper columns).
*/
private def injectNetChangeComputation(
plan: LogicalPlan,
- cl: Changelog): LogicalPlan = {
- plan
+ cl: Changelog,
+ computeUpdates: Boolean): LogicalPlan = {
+ val windowedPlan = addNetChangesWindow(plan, cl)
+ val filteredAndRelabeledPlan =
+ removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan,
computeUpdates)
Review Comment:
nit:
```suggestion
removeIntermediateChanges(windowedPlan, computeUpdates)
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
//
---------------------------------------------------------------------------
/**
- * Collapses multiple changes per row identity into the net effect.
- * Not yet implemented.
+ * Collapses multiple changes per row identity into the net effect:
Review Comment:
nit
```suggestion
* Collapses multiple changes per row identity across versions into the
net effect:
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveChangelogTable.scala:
##########
@@ -277,13 +294,174 @@ object ResolveChangelogTable extends Rule[LogicalPlan] {
//
---------------------------------------------------------------------------
/**
- * Collapses multiple changes per row identity into the net effect.
- * Not yet implemented.
+ * Collapses multiple changes per row identity into the net effect:
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Pipeline: Window (per-rowId aggregates) -> Filter (keep first/last per
partition)
+ * -> Project (relabel `_change_type` and drop helper columns).
*/
private def injectNetChangeComputation(
plan: LogicalPlan,
- cl: Changelog): LogicalPlan = {
- plan
+ cl: Changelog,
+ computeUpdates: Boolean): LogicalPlan = {
+ val windowedPlan = addNetChangesWindow(plan, cl)
+ val filteredAndRelabeledPlan =
+ removeIntermediateChangelogEntriesAndRelabelChangeTypes(windowedPlan,
computeUpdates)
+ filteredAndRelabeledPlan
+ }
+
+ /**
+ * Adds a Window node partitioned by `rowId` and ordered by
+ * `(_commit_version, change_type_rank)` where pre-events (`update_preimage`,
+ * `delete`) sort before post-events (`update_postimage`, `insert`) within
the same
+ * commit. Computes per-partition helper columns:
+ * - `__spark_cdc_row_number` (1..n) answers: "is this the first or last
row?".
+ * - `__spark_cdc_row_count` is the partition size which combined with
row_number is
+ * used to detect the last row.
+ * - `__spark_cdc_first_row_change_type_value` and
+ * `__spark_cdc_last_row_change_type_value` drive the first/last
classification at
+ * filter and relabel time.
+ */
+ private def addNetChangesWindow(plan: LogicalPlan, cl: Changelog):
LogicalPlan = {
+ val changeTypeAttr = getAttribute(plan, "_change_type")
+ val rowIdExprs =
V2ExpressionUtils.resolveRefs[NamedExpression](cl.rowId().toSeq, plan)
+ val commitVersionAttr = getAttribute(plan, "_commit_version")
+ val changeTypeRank = CaseWhen(Seq(
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE))
-> Literal(0),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_DELETE)) ->
Literal(0),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_INSERT)) ->
Literal(1),
+ EqualTo(changeTypeAttr, Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE))
-> Literal(1)),
+ Literal(2))
+ val partitionByCols = rowIdExprs
+ val orderSpec = Seq(
+ SortOrder(commitVersionAttr, Ascending),
+ SortOrder(changeTypeRank, Ascending))
+ val rowNumberWindowSpec = WindowSpecDefinition(
+ partitionByCols, orderSpec,
+ UnspecifiedFrame)
+ val aggregateWindowSpec = WindowSpecDefinition(
+ partitionByCols, orderSpec,
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing))
+
+ val rowNumberAlias = Alias(
+ WindowExpression(RowNumber(), rowNumberWindowSpec),
+ NetChangesHelperColumns.RowNumber)()
+ val rowCountAlias = Alias(
+ WindowExpression(Count(Seq(Literal(1))).toAggregateExpression(),
aggregateWindowSpec),
+ NetChangesHelperColumns.RowCount)()
+ val firstRowChangeTypeValueAlias = Alias(
+ WindowExpression(
+ First(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+ aggregateWindowSpec),
+ NetChangesHelperColumns.FirstRowChangeTypeValue)()
+ val lastRowChangeTypeValueAlias = Alias(
+ WindowExpression(
+ Last(changeTypeAttr, ignoreNulls = false).toAggregateExpression(),
+ aggregateWindowSpec),
+ NetChangesHelperColumns.LastRowChangeTypeValue)()
+
+ Window(
+ Seq(rowNumberAlias, rowCountAlias, firstRowChangeTypeValueAlias,
+ lastRowChangeTypeValueAlias),
+ partitionByCols, orderSpec, plan)
+ }
+
+ /**
+ * Filters and relabels the windowed plan: keeps only the first and/or last
row per
+ * `rowId` partition, then rewrites the surviving rows' `_change_type` and
drops the
+ * helper columns.
+ *
+ * | existedBefore | existsAfter | output |
+ * |---------------|-------------|-------------------------------------|
+ * | false | false | (cancel) |
+ * | false | true | insert |
+ * | true | false | delete |
+ * | true | true | update_preimage + update_postimage |
+ *
+ * If `computeUpdates = false`, the `update_preimage` + `update_postimage`
pair is
+ * emitted as `delete` + `insert` instead.
+ *
+ * `existedBefore` is true iff the partition's first event is `delete` or
+ * `update_preimage`. `existsAfter` is true iff the partition's last event is
+ * `insert` or `update_postimage`.
+ *
+ * Helper columns (`__spark_cdc_*`) are dropped in the same Project that
does the
+ * relabel, saving a follow-up cleanup pass.
+ */
+ private def removeIntermediateChangelogEntriesAndRelabelChangeTypes(
+ windowedPlan: LogicalPlan,
+ computeUpdates: Boolean
+ ): LogicalPlan = {
+ val rowNumberAttr = getAttribute(windowedPlan,
NetChangesHelperColumns.RowNumber)
+ val rowCountAttr = getAttribute(windowedPlan,
NetChangesHelperColumns.RowCount)
+ val firstRowChangeTypeAttr =
+ getAttribute(windowedPlan,
NetChangesHelperColumns.FirstRowChangeTypeValue)
+ val lastRowChangeTypeAttr =
+ getAttribute(windowedPlan,
NetChangesHelperColumns.LastRowChangeTypeValue)
+
+ val existedBeforeVersionRange = In(firstRowChangeTypeAttr, Seq(
+ Literal(Changelog.CHANGE_TYPE_DELETE),
+ Literal(Changelog.CHANGE_TYPE_UPDATE_PREIMAGE)))
+ val existsAfterVersionRange = In(lastRowChangeTypeAttr, Seq(
+ Literal(Changelog.CHANGE_TYPE_INSERT),
+ Literal(Changelog.CHANGE_TYPE_UPDATE_POSTIMAGE)))
+
+ val isFirst = EqualTo(rowNumberAttr, Literal(1))
+ val isLast = EqualTo(rowNumberAttr, rowCountAttr)
+
+ // only keep first and last entry per set of changes for a rowId, order of
cases is important!
+ val keep = CaseWhen(Seq(
+ // filter out if inserted and deleted within range
+ And(Not(existedBeforeVersionRange), Not(existsAfterVersionRange)) ->
Literal(false),
+ // for persisting new row keep only last state
+ And(Not(existedBeforeVersionRange), existsAfterVersionRange) -> isLast,
+ // for previously existing row keep first state
+ And(existedBeforeVersionRange, Not(existsAfterVersionRange)) -> isFirst,
+ // for persisting row keep first and last state
+ And(existedBeforeVersionRange, existsAfterVersionRange) -> Or(isFirst,
isLast)),
+ Literal(false)) // dont keep row by default
Review Comment:
```suggestion
And(existedBeforeVersionRange, Not(existsAfterVersionRange)) ->
isFirst),
// for persisting row keep first and last state
// existedBeforeVersionRange = true, existsAfterVersionRange = true
Or(isFirst, isLast))
```
No need for an unreachable else branch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]