fresh-borzoni opened a new pull request, #27371:
URL: https://github.com/apache/flink/pull/27371

    ## What is the purpose
   Linked issue: close https://issues.apache.org/jira/browse/FLINK-38579
   
   This pull request fixes incorrect changelog mode inference when filters or 
non-equi join conditions are pushed down on non-upsert key columns. Without 
this fix, Flink incorrectly drops UPDATE_BEFORE events in these scenarios, 
leading to phantom rows in the output.
   
   **Problem**: When a filter like `c < 2` (where `c` is a non-upsert key 
column) is pushed down to a changelog source, and a row with `c=1` is updated 
to `c=2`, the old row `c=1` matches the filter but the new row `c=2` doesn't. 
The planner was incorrectly allowing `DropUpdateBefore` optimization, which 
caused the UPDATE_BEFORE event to be lost. Without the UPDATE_BEFORE, 
downstream operators couldn't retract the old `c=1` row, leaving it incorrectly 
in the result.
   
   **Solution**: The fix prevents `ONLY_UPDATE_AFTER` and `DELETE_BY_KEY` 
changelog mode when filters or non-equi join conditions reference non-upsert 
key columns, ensuring UPDATE_BEFORE events are preserved for correct retraction 
semantics.
   
   ## Brief change log
   
     - Added `referencesNonUpsertKeyColumns()` helper to check if RexNodes 
reference non-upsert key columns
     - Added `hasNonUpsertKeyFilterPushedDown()` to detect filters on 
non-upsert keys in TableSourceScan
     - Added `hasNonUpsertKeyNonEquiCondition()` to detect non-equi join 
conditions on non-upsert keys, with precise left/right input analysis
     - Modified `SatisfyUpdateKindTraitVisitor` to reject `ONLY_UPDATE_AFTER` 
for TableSourceScan when filter references non-upsert keys
     - Modified `SatisfyUpdateKindTraitVisitor` to reject `ONLY_UPDATE_AFTER` 
for StreamPhysicalJoin when non-equi condition references non-upsert keys
     - Added comprehensive IT tests covering filter pushdown and join scenarios
   
     ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - Added `ChangelogSourceITCase.testFilterPushedDownOnNonUpsertKey()` - 
Tests filter pushed down on non-upsert key column with UPDATE_BEFORE 
preservation (7 parameterized configurations, 6 pass, 1 skipped for 
incompatible CDC duplicate + MiniBatch)
     - Added 
`ChangelogSourceITCase.testJoinWithNonEquivConditionOnNonUpsertKey()` - Tests 
non-equi join condition on left side non-upsert key column
     - Added 
`ChangelogSourceITCase.testJoinWithNonEquivConditionOnRightNonUpsertKey()` - 
Tests non-equi join condition on right side non-upsert key column to validate 
left/right split logic
     - All tests verify that without the fix, wrong results occur (phantom 
rows), and with the fix, correct empty results are produced
     - Tests use changelog data with INSERT, UPDATE_BEFORE, UPDATE_AFTER events 
to simulate real CDC scenarios
   
     ## Does this pull request potentially affect one of the following parts:
   
       - Dependencies (does it add or upgrade a dependency): **no**
       - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
       - The serializers: **no**
       - The runtime per-record code paths (performance sensitive): **no** 
(only affects plan optimization phase)
       - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
       - The S3 file system connector: **no**
   
     ## Documentation
   
       - Does this pull request introduce a new feature? **no** (bug fix)
       - If yes, how is the feature documented? **not applicable**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to