[ https://issues.apache.org/jira/browse/FLINK-29849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Godfrey He closed FLINK-29849. ------------------------------ Resolution: Fixed Fixed in master: eb44ac01c9969cb22ab832b6b2155b109f015b06 > Event time temporal join on an upsert source may produce incorrect execution > plan > --------------------------------------------------------------------------------- > > Key: FLINK-29849 > URL: https://issues.apache.org/jira/browse/FLINK-29849 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.16.0, 1.15.3 > Reporter: lincoln lee > Assignee: lincoln lee > Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > For current implementation, the execution plan is incorrect when do event > time temporal join on an upsert source. There's two problems: > 1. for an upsert source, we should not add a ChangelogNormalize node under a > temporal join input, or it will damage the versions of the version table. For > versioned tables, we use a single-temporal mechanism which relies sequencial > records of a same key to ensure the valid period of each version, so if the > ChangelogNormalize was added then an UB message will be produced based on the > previous UA or Insert message, and all the columns are totally same include > event time, e.g., > original upsert input > {code} > +I (key1, '2022-11-02 10:00:00', a1) > +U (key1, '2022-11-02 10:01:03', a2) > {code} > the versioned data should be: > {code} > v1 [~, '2022-11-02 10:00:00') > v2 ['2022-11-02 10:00:00', '2022-11-02 10:01:03') > {code} > after ChangelogNormalize's processing, will output: > {code} > +I (key1, '2022-11-02 10:00:00', a1) > -U (key1, '2022-11-02 10:00:00', a1) > +U (key1, '2022-11-02 10:01:03', a2) > {code} > versions are incorrect: > {code} > v1 ['2022-11-02 10:00:00', '2022-11-02 10:00:00') // invalid period > v2 ['2022-11-02 10:00:00', '2022-11-02 10:01:03') > {code} > 2. semantically, a filter cannot be pushed into an event time temporal join, > otherwise, the filter may also corrupt the versioned table -- This message was sent by Atlassian Jira (v8.20.10#820010)