fresh-borzoni opened a new pull request, #27371:
URL: https://github.com/apache/flink/pull/27371
## What is the purpose
Linked issue: close https://issues.apache.org/jira/browse/FLINK-38579
This pull request fixes incorrect changelog mode inference when filters or
non-equi join conditions are pushed down on non-upsert key columns. Without
this fix, Flink incorrectly drops UPDATE_BEFORE events in these scenarios,
leading to phantom rows in the output.
**Problem**: When a filter like `c < 2` (where `c` is a non-upsert key
column) is pushed down to a changelog source, and a row with `c=1` is updated
to `c=2`, the old row `c=1` matches the filter but the new row `c=2` doesn't.
The planner was incorrectly allowing `DropUpdateBefore` optimization, which
caused the UPDATE_BEFORE event to be lost. Without the UPDATE_BEFORE,
downstream operators couldn't retract the old `c=1` row, leaving it incorrectly
in the result.
**Solution**: The fix prevents `ONLY_UPDATE_AFTER` and `DELETE_BY_KEY`
changelog mode when filters or non-equi join conditions reference non-upsert
key columns, ensuring UPDATE_BEFORE events are preserved for correct retraction
semantics.
## Brief change log
- Added `referencesNonUpsertKeyColumns()` helper to check if RexNodes
reference non-upsert key columns
- Added `hasNonUpsertKeyFilterPushedDown()` to detect filters on
non-upsert keys in TableSourceScan
- Added `hasNonUpsertKeyNonEquiCondition()` to detect non-equi join
conditions on non-upsert keys, with precise left/right input analysis
- Modified `SatisfyUpdateKindTraitVisitor` to reject `ONLY_UPDATE_AFTER`
for TableSourceScan when filter references non-upsert keys
- Modified `SatisfyUpdateKindTraitVisitor` to reject `ONLY_UPDATE_AFTER`
for StreamPhysicalJoin when non-equi condition references non-upsert keys
- Added comprehensive IT tests covering filter pushdown and join scenarios
## Verifying this change
This change added tests and can be verified as follows:
- Added `ChangelogSourceITCase.testFilterPushedDownOnNonUpsertKey()` -
Tests filter pushed down on non-upsert key column with UPDATE_BEFORE
preservation (7 parameterized configurations, 6 pass, 1 skipped for
incompatible CDC duplicate + MiniBatch)
- Added
`ChangelogSourceITCase.testJoinWithNonEquivConditionOnNonUpsertKey()` - Tests
non-equi join condition on left side non-upsert key column
- Added
`ChangelogSourceITCase.testJoinWithNonEquivConditionOnRightNonUpsertKey()` -
Tests non-equi join condition on right side non-upsert key column to validate
left/right split logic
- All tests verify that without the fix, wrong results occur (phantom
rows), and with the fix, correct empty results are produced
- Tests use changelog data with INSERT, UPDATE_BEFORE, UPDATE_AFTER events
to simulate real CDC scenarios
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
(only affects plan optimization phase)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no** (bug fix)
- If yes, how is the feature documented? **not applicable**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]