Caizhi Weng created FLINK-25357: ----------------------------------- Summary: SQL planner incorrectly changes a streaming join with FLOOR(rowtime) into interval join Key: FLINK-25357 URL: https://issues.apache.org/jira/browse/FLINK-25357 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.14.2 Reporter: Caizhi Weng
This issue is reported from the [user mailing list|https://lists.apache.org/thread/v8omhomp58hb8m5dj4noxbr1dsyy6zjl]. Add the following test case to {{TableEnvironmentITCase}} to reproduce this issue. {code:scala} @Test def myTest(): Unit = { val data = Seq( Row.of( "1", java.time.LocalDateTime.of(2021, 12, 13, 12, 5, 8) ), Row.of( "1", java.time.LocalDateTime.of(2021, 12, 13, 13, 5, 4) ), Row.of( "1", java.time.LocalDateTime.of(2021, 12, 13, 14, 5, 6) ) ) tEnv.executeSql( s""" |create table T ( | id STRING, | b TIMESTAMP(3), | WATERMARK FOR b AS b - INTERVAL '60' MINUTES |) WITH ( | 'connector' = 'values', | 'bounded' = 'true', | 'data-id' = '${TestValuesTableFactory.registerData(data)}' |) |""".stripMargin) tEnv.executeSql( """ |SELECT | source.id AS sourceid, | CAST(source.b AS TIMESTAMP) AS source_startat, | CAST(target.b AS TIMESTAMP) AS target_startat |FROM T source, T target |WHERE source.id = target.id |AND source.id IN ('1', '2', '3') |AND source.b >= FLOOR(target.b TO HOUR) + INTERVAL '1' HOUR AND source.b < FLOOR(target.b TO HOUR) + INTERVAL '2' HOUR |""".stripMargin).print() } {code} Results (correct) for the batch task is {code} +--------------------------------+----------------------------+----------------------------+ | sourceid | source_startat | target_startat | +--------------------------------+----------------------------+----------------------------+ | 1 | 2021-12-13 13:05:04.000000 | 2021-12-13 12:05:08.000000 | | 1 | 2021-12-13 14:05:06.000000 | 2021-12-13 13:05:04.000000 | +--------------------------------+----------------------------+----------------------------+ {code} Results (incorrect) for the streaming task is {code} +----+--------------------------------+----------------------------+----------------------------+ | op | sourceid | source_startat | target_startat | +----+--------------------------------+----------------------------+----------------------------+ | +I | 1 | 2021-12-13 14:05:06.000000 | 2021-12-13 12:05:08.000000 | | +I | 1 | 2021-12-13 14:05:06.000000 | 2021-12-13 13:05:04.000000 | +----+--------------------------------+----------------------------+----------------------------+ {code} Plan for the streaming task is {code} LogicalProject(sourceid=[$0], source_startat=[CAST($1):TIMESTAMP(6)], target_startat=[CAST($3):TIMESTAMP(6)]) +- LogicalFilter(condition=[AND(=($0, $2), OR(=($0, _UTF-16LE'1'), =($0, _UTF-16LE'2'), =($0, _UTF-16LE'3')), >=($1, +(FLOOR($3, FLAG(HOUR)), 3600000:INTERVAL HOUR)), <($1, +(FLOOR($3, FLAG(HOUR)), 7200000:INTERVAL HOUR)))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalWatermarkAssigner(rowtime=[b], watermark=[-($1, 3600000:INTERVAL MINUTE)]) : +- LogicalTableScan(table=[[default_catalog, default_database, T]]) +- LogicalWatermarkAssigner(rowtime=[b], watermark=[-($1, 3600000:INTERVAL MINUTE)]) +- LogicalTableScan(table=[[default_catalog, default_database, T]]) == Optimized Physical Plan == Calc(select=[id AS sourceid, CAST(CAST(b)) AS source_startat, CAST(CAST(b0)) AS target_startat]) +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=3600000, leftUpperBound=7199999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id, id0), >=(b, +(FLOOR(b0, FLAG(HOUR)), 3600000:INTERVAL HOUR)), <(b, +(FLOOR(b0, FLAG(HOUR)), 7200000:INTERVAL HOUR)))], select=[id, b, id0, b0]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[id, b], where=[SEARCH(id, Sarg[_UTF-16LE'1', _UTF-16LE'2', _UTF-16LE'3']:CHAR(1) CHARACTER SET "UTF-16LE")]) : +- WatermarkAssigner(rowtime=[b], watermark=[-(b, 3600000:INTERVAL MINUTE)]) : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, b]) +- Exchange(distribution=[hash[id]]) +- WatermarkAssigner(rowtime=[b], watermark=[-(b, 3600000:INTERVAL MINUTE)]) +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, b]) == Optimized Execution Plan == Calc(select=[id AS sourceid, CAST(CAST(b)) AS source_startat, CAST(CAST(b0)) AS target_startat]) +- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=3600000, leftUpperBound=7199999, leftTimeIndex=1, rightTimeIndex=1], where=[((id = id0) AND (b >= (FLOOR(b0, FLAG(HOUR)) + 3600000:INTERVAL HOUR)) AND (b < (FLOOR(b0, FLAG(HOUR)) + 7200000:INTERVAL HOUR)))], select=[id, b, id0, b0]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[id, b], where=[SEARCH(id, Sarg[_UTF-16LE'1', _UTF-16LE'2', _UTF-16LE'3']:CHAR(1) CHARACTER SET "UTF-16LE")]) : +- WatermarkAssigner(rowtime=[b], watermark=[(b - 3600000:INTERVAL MINUTE)])(reuse_id=[1]) : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[id, b]) +- Exchange(distribution=[hash[id]]) +- Reused(reference_id=[1]) {code} You can see that the planner incorrectly changes this join to an interval join. The generated condition for the interval join is also incorrect, which causes the 1st line of the streaming result to be produced. {code:java} public class ConditionFunction$171 extends org.apache.flink.api.common.functions.AbstractRichFunction implements org.apache.flink.table.runtime.generated.JoinCondition { public ConditionFunction$171(Object[] references) throws Exception {} @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {} @Override public boolean apply( org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) throws Exception { return true; } @Override public void close() throws Exception { super.close(); } } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)