[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17600135#comment-17600135
 ] 

Shuiqiang Chen edited comment on FLINK-28988 at 9/5/22 3:51 AM:
----------------------------------------------------------------

TLDR: The filters in aboveFilter should not be pushed down into right table 
when it is a temporal join.

when there's no filter after temporal join, the query is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
   :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
   :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL 
SECOND)])
   :     +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
   +- LogicalFilter(condition=[=($cor0.id, $0)])
      +- LogicalSnapshot(period=[$cor0.ts])
         +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
            +- LogicalFilter(condition=[=($3, 1)])
               +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
                  +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
                     +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   :     +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :        +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num])
         +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
            +- Exchange(distribution=[hash[$0]])
               +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
                  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   :     +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :        +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num])
         +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
            +- Exchange(distribution=[hash[$0]])
               +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
                  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[ONLY_UPDATE_AFTER]. Therefore,  during  execution runtime, the 
rightSortedState in TemporalRowTimeJoinOperator contains the following rows:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, offline, 1970-01-01 08:00:00.010]
[+I, 0, online, 1970-01-01 08:00:00.020]

So we can get the expected temporal join result:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

However, if the filter was pushed down into the right table, the right sorted 
state will bocome:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, online, 1970-01-01 08:00:00.020]

and the temporal join result will become:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0015,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

while the expected result is:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1].

*We can find that if the record in right table is filtered before join, the 
temporal join result may be wrong.*

However, when we added a filter after temporal join, the actual output is:
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

this is not equal to 
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0015,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

*why?*
we notice that the query with filter is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalFilter(condition=[=($3, _UTF-16LE'online')])
+- LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], 
row_num=[$5])
   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0, 1}])
      :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
      :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 
2000:INTERVAL SECOND)])
      :     +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
      +- LogicalFilter(condition=[=($cor0.id, $0)])
         +- LogicalSnapshot(period=[$cor0.ts])
            +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
               +- LogicalFilter(condition=[=($3, 1)])
                  +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
                     +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
                        +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, CAST(_UTF-16LE'online':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS state, 
CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, CAST(row_num AS BIGINT) 
AS row_num])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   :     +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :        +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[$0 AS id, $2 AS ts, 1:BIGINT AS row_num], where=[=($1, 
_UTF-16LE'online')])
         +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
            +- Exchange(distribution=[hash[$0]])
               +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
                  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, CAST('online' AS VARCHAR(2147483647)) AS state, 
CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, CAST(row_num AS BIGINT) 
AS row_num])
+- TemporalJoin(joinType=[InnerJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   :     +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :        +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[$0 AS id, $2 AS ts, 1 AS row_num], where=[($1 = 
'online')])
         +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
            +- Exchange(distribution=[hash[$0]])
               +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
                  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

There is a Calc(select=[$0 AS id, $2 AS ts, 1 AS row_num], where=[($1 = 
'online')]) between Exchange(distribution=[hash[id]]) and 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]). According to 
[FLINK-9528|https://issues.apache.org/jira/browse/FLINK-9528] and 
[FLINK-16887|https://issues.apache.org/jira/browse/FLINK-16887], after the 
FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[BEFORE_AND_AFTER], that the Deduplicate operator will produce message with 
UPDATE_BEFORE message. As a result, the sortedRightState will become:
[-U, 0, online, 1970-01-01 08:00:00.000]
#[-U, 0, offline, 1970-01-01 08:00:00.010], it is filtered
[+U, 0, online, 1970-01-01 08:00:00.020]

Finally, only the [+U, 0, online, 1970-01-01 08:00:00.020] is joined, that's 
exactly equal to the actual output.

Overall, the solution would be not to push the filters in above filters into 
right table when it's a temporal join.





was (Author: csq):
TLDR: The filters in aboveFilter should not be pushed down into right table 
when it is a temporal join.

when there's no filter after temporal join, the query is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], row_num=[$5])
+- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
   :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
   :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 2000:INTERVAL 
SECOND)])
   :     +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
   +- LogicalFilter(condition=[=($cor0.id, $0)])
      +- LogicalSnapshot(period=[$cor0.ts])
         +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
            +- LogicalFilter(condition=[=($3, 1)])
               +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
                  +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
                     +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   :     +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :        +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1:BIGINT AS row_num])
         +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
            +- Exchange(distribution=[hash[$0]])
               +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
                  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, state, CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) 
AS ts0, row_num])
+- TemporalJoin(joinType=[LeftOuterJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, state, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   :     +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :        +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[$0 AS id, $1 AS state, $2 AS ts, 1 AS row_num])
         +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
            +- Exchange(distribution=[hash[$0]])
               +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
                  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

And after the FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[ONLY_UPDATE_AFTER]. Therefore,  during  execution runtime, the 
rightSortedState in TemporalRowTimeJoinOperator contains the following rows:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, offline, 1970-01-01 08:00:00.010]
[+I, 0, online, 1970-01-01 08:00:00.020]

So we can get the expected temporal join result:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.0015,0,offline,1970-01-01 08:00:00.010,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

However, if the filter was pushed down into the right table, the right sorted 
state will bocome:
[+I, 0, online, 1970-01-01 08:00:00.000]
[+I, 0, online, 1970-01-01 08:00:00.020]

and the temporal join result will become:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0015,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

while the expected result is:
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1].

However, when we added a filter after temporal join, the actual output is:
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

this is not equal to 
[+I,0,1970-01-01 08:00:00.000,0,online,970-01-01 08:00:00.000 ,1]
[+I,0,1970-01-01 08:00:00.005,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0010,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.0015,0,online,1970-01-01 08:00:00.000,1]
[+I,0,1970-01-01 08:00:00.020,0,online,1970-01-01 08:00:00.020,1]
[+I,0,1970-01-01 08:00:00.025,0,online,1970-01-01 08:00:00.020,1]

*why?*
we notice that the query with filter is explained as below:

{code:xml}
== Abstract Syntax Tree ==
LogicalFilter(condition=[=($3, _UTF-16LE'online')])
+- LogicalProject(id=[$0], ts=[$1], id0=[$2], state=[$3], ts0=[$4], 
row_num=[$5])
   +- LogicalCorrelate(correlation=[$cor0], joinType=[left], 
requiredColumns=[{0, 1}])
      :- LogicalProject(id=[AS($0, _UTF-16LE'id')], ts=[AS($1, _UTF-16LE'ts')])
      :  +- LogicalWatermarkAssigner(rowtime=[f1], watermark=[-($1, 
2000:INTERVAL SECOND)])
      :     +- LogicalTableScan(table=[[*anonymous_datastream_source$2*]])
      +- LogicalFilter(condition=[=($cor0.id, $0)])
         +- LogicalSnapshot(period=[$cor0.ts])
            +- LogicalProject(id=[$0], state=[$1], ts=[$2], row_num=[$3])
               +- LogicalFilter(condition=[=($3, 1)])
                  +- LogicalProject(id=[AS($0, _UTF-16LE'id')], state=[AS($1, 
_UTF-16LE'state')], ts=[AS($2, _UTF-16LE'ts')], row_num=[ROW_NUMBER() OVER 
(PARTITION BY AS($0, _UTF-16LE'id') ORDER BY AS($2, _UTF-16LE'ts') DESC NULLS 
LAST)])
                     +- LogicalWatermarkAssigner(rowtime=[f2], watermark=[-($2, 
2000:INTERVAL SECOND)])
                        +- 
LogicalTableScan(table=[[*anonymous_datastream_source$1*]])

== Optimized Physical Plan ==
Calc(select=[id, ts, id0, CAST(_UTF-16LE'online':VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS state, 
CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, CAST(row_num AS BIGINT) 
AS row_num])
+- TemporalJoin(joinType=[InnerJoin], where=[AND(=(id, id0), 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   :     +- WatermarkAssigner(rowtime=[f1], watermark=[-(f1, 2000:INTERVAL 
SECOND)])
   :        +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[$0 AS id, $2 AS ts, 1:BIGINT AS row_num], where=[=($1, 
_UTF-16LE'online')])
         +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
            +- Exchange(distribution=[hash[$0]])
               +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
                  +- WatermarkAssigner(rowtime=[f2], watermark=[-(f2, 
2000:INTERVAL SECOND)])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])

== Optimized Execution Plan ==
Calc(select=[id, ts, id0, CAST('online' AS VARCHAR(2147483647)) AS state, 
CAST(ts0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)) AS ts0, CAST(row_num AS BIGINT) 
AS row_num])
+- TemporalJoin(joinType=[InnerJoin], where=[((id = id0) AND 
__TEMPORAL_JOIN_CONDITION(ts, ts0, __TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(id0), 
__TEMPORAL_JOIN_LEFT_KEY(id), __TEMPORAL_JOIN_RIGHT_KEY(id0)))], select=[id, 
ts, id0, ts0, row_num])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[f0 AS id, f1 AS ts])
   :     +- WatermarkAssigner(rowtime=[f1], watermark=[(f1 - 2000:INTERVAL 
SECOND)])
   :        +- TableSourceScan(table=[[*anonymous_datastream_source$2*]], 
fields=[f0, f1])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[$0 AS id, $2 AS ts, 1 AS row_num], where=[($1 = 
'online')])
         +- Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME])
            +- Exchange(distribution=[hash[$0]])
               +- Calc(select=[f0 AS $0, f1 AS $1, f2 AS $2])
                  +- WatermarkAssigner(rowtime=[f2], watermark=[(f2 - 
2000:INTERVAL SECOND)])
                     +- 
TableSourceScan(table=[[*anonymous_datastream_source$1*]], fields=[f0, f1, f2])
{code}

There is a Calc(select=[$0 AS id, $2 AS ts, 1 AS row_num], where=[($1 = 
'online')]) between Exchange(distribution=[hash[id]]) and 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]). According to 
[FLINK-9528|https://issues.apache.org/jira/browse/FLINK-9528] and 
[FLINK-16887|https://issues.apache.org/jira/browse/FLINK-16887], after the 
FlinkChangelogModeInferenceProgram, the UpdateKindTrait of 
Deduplicate(keep=[LastRow], key=[$0], order=[ROWTIME]) will be come 
[BEFORE_AND_AFTER], that the Deduplicate operator will produce message with 
UPDATE_BEFORE message. As a result, the sortedRightState will become:
[-U, 0, online, 1970-01-01 08:00:00.000]
#[-U, 0, offline, 1970-01-01 08:00:00.010], it is filtered
[+U, 0, online, 1970-01-01 08:00:00.020]

Finally, only the [+U, 0, online, 1970-01-01 08:00:00.020] is joined, that's 
exactly equal to the actual output.

Overall, the solution would be not to push the filters in above filters into 
right table when it's a temporal join.




> Incorrect result for filter after temporal join
> -----------------------------------------------
>
>                 Key: FLINK-28988
>                 URL: https://issues.apache.org/jira/browse/FLINK-28988
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.15.1
>            Reporter: Xuannan Su
>            Assignee: Shuiqiang Chen
>            Priority: Major
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
>     public static void main(String[] args) throws Exception {
>         // set up the Java DataStream API
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // set up the Java Table API
>         final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>         final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
>                 env.fromElements(
>                         new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
>                         new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
>                         new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
>         final Table table =
>                 tableEnv.fromDataStream(
>                                 ds,
>                                 Schema.newBuilder()
>                                         .column("f0", DataTypes.INT())
>                                         .column("f1", DataTypes.STRING())
>                                         .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
>                                         .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
>                                         .build())
>                         .as("id", "state", "ts");
>         tableEnv.createTemporaryView("source_table", table);
>         final Table dedupeTable =
>                 tableEnv.sqlQuery(
>                         "SELECT * FROM ("
>                                 + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
>                                 + ") WHERE row_num = 1");
>         tableEnv.createTemporaryView("versioned_table", dedupeTable);
>         DataStreamSource<Tuple2<Integer, Instant>> event =
>                 env.fromElements(
>                         new Tuple2<>(0, Instant.ofEpochMilli(0)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(5)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(10)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(15)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(20)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(25)));
>         final Table eventTable =
>                 tableEnv.fromDataStream(
>                                 event,
>                                 Schema.newBuilder()
>                                         .column("f0", DataTypes.INT())
>                                         .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
>                                         .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
>                                         .build())
>                         .as("id", "ts");
>         tableEnv.createTemporaryView("event_table", eventTable);
>         final Table result =
>                 tableEnv.sqlQuery(
>                         "SELECT * FROM event_table"
>                                 + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
>                                 + " ON event_table.id = versioned_table.id");
>         result.execute().print();
>         result.filter($("state").isEqual("online")).execute().print();
>     }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
>  
> After filtering with predicate state = 'online', I expect only the two rows 
> with state offline will be filtered out. But I got the following result:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to