Xuannan Su created FLINK-28988: ---------------------------------- Summary: Incorrect result for filter after temporal join Key: FLINK-28988 URL: https://issues.apache.org/jira/browse/FLINK-28988 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.15.1 Reporter: Xuannan Su
The following code can reproduce the case {code:java} public class TemporalJoinSQLExample1 { public static void main(String[] args) throws Exception { // set up the Java DataStream API final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // set up the Java Table API final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStreamSource<Tuple3<Integer, String, Instant>> ds = env.fromElements( new Tuple3<>(0, "online", Instant.ofEpochMilli(0)), new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)), new Tuple3<>(0, "online", Instant.ofEpochMilli(20))); final Table table = tableEnv.fromDataStream( ds, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.STRING()) .column("f2", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f2", "f2 - INTERVAL '2' SECONDS") .build()) .as("id", "state", "ts"); tableEnv.createTemporaryView("source_table", table); final Table dedupeTable = tableEnv.sqlQuery( "SELECT * FROM (" + " SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts DESC) AS row_num FROM source_table" + ") WHERE row_num = 1"); tableEnv.createTemporaryView("versioned_table", dedupeTable); DataStreamSource<Tuple2<Integer, Instant>> event = env.fromElements( new Tuple2<>(0, Instant.ofEpochMilli(0)), new Tuple2<>(0, Instant.ofEpochMilli(5)), new Tuple2<>(0, Instant.ofEpochMilli(10)), new Tuple2<>(0, Instant.ofEpochMilli(15)), new Tuple2<>(0, Instant.ofEpochMilli(20)), new Tuple2<>(0, Instant.ofEpochMilli(25))); final Table eventTable = tableEnv.fromDataStream( event, Schema.newBuilder() .column("f0", DataTypes.INT()) .column("f1", DataTypes.TIMESTAMP_LTZ(3)) .watermark("f1", "f1 - INTERVAL '2' SECONDS") .build()) .as("id", "ts"); tableEnv.createTemporaryView("event_table", eventTable); final Table result = tableEnv.sqlQuery( "SELECT * FROM event_table" + " LEFT JOIN versioned_table FOR SYSTEM_TIME AS OF event_table.ts" + " ON event_table.id = versioned_table.id"); result.execute().print(); result.filter($("state").isEqual("online")).execute().print(); } } {code} The result of temporal join is the following: +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ | op | id | ts | id0 | state | ts0 | row_num | +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ | +I | 0 | 1970-01-01 08:00:00.000 | 0 | online | 1970-01-01 08:00:00.000 | 1 | | +I | 0 | 1970-01-01 08:00:00.005 | 0 | online | 1970-01-01 08:00:00.000 | 1 | | +I | 0 | 1970-01-01 08:00:00.010 | 0 | offline | 1970-01-01 08:00:00.010 | 1 | | +I | 0 | 1970-01-01 08:00:00.015 | 0 | offline | 1970-01-01 08:00:00.010 | 1 | | +I | 0 | 1970-01-01 08:00:00.020 | 0 | online | 1970-01-01 08:00:00.020 | 1 | | +I | 0 | 1970-01-01 08:00:00.025 | 0 | online | 1970-01-01 08:00:00.020 | 1 | +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ After filtering with predicate state = 'online', I expect only the two rows with state offline will be filtered out. But I got the following result: +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ | op | id | ts | id0 | state | ts0 | row_num | +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ | +I | 0 | 1970-01-01 08:00:00.020 | 0 | online | 1970-01-01 08:00:00.020 | 1 | | +I | 0 | 1970-01-01 08:00:00.025 | 0 | online | 1970-01-01 08:00:00.020 | 1 | +----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+ -- This message was sent by Atlassian Jira (v8.20.10#820010)