[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-28988:
--------------------------------
           Flags: Important
    Release Note: 
After FLINK-28988 applied, the filter will not be pushed down into both inputs 
of the join.
Note this may cause incompatible plan changes compare to 1.16.0, e.g., when 
left input is an upsert source(use upsert-kafka connector), the query plan will 
remove the ChangelogNormalize node from which appeared in 1.16.0.

> Incorrect result for filter after temporal join
> -----------------------------------------------
>
>                 Key: FLINK-28988
>                 URL: https://issues.apache.org/jira/browse/FLINK-28988
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.15.1
>            Reporter: Xuannan Su
>            Assignee: Shuiqiang Chen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.17.0, 1.16.1
>
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
>     public static void main(String[] args) throws Exception {
>         // set up the Java DataStream API
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // set up the Java Table API
>         final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>         final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
>                 env.fromElements(
>                         new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
>                         new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
>                         new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
>         final Table table =
>                 tableEnv.fromDataStream(
>                                 ds,
>                                 Schema.newBuilder()
>                                         .column("f0", DataTypes.INT())
>                                         .column("f1", DataTypes.STRING())
>                                         .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
>                                         .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
>                                         .build())
>                         .as("id", "state", "ts");
>         tableEnv.createTemporaryView("source_table", table);
>         final Table dedupeTable =
>                 tableEnv.sqlQuery(
>                         "SELECT * FROM ("
>                                 + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
>                                 + ") WHERE row_num = 1");
>         tableEnv.createTemporaryView("versioned_table", dedupeTable);
>         DataStreamSource<Tuple2<Integer, Instant>> event =
>                 env.fromElements(
>                         new Tuple2<>(0, Instant.ofEpochMilli(0)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(5)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(10)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(15)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(20)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(25)));
>         final Table eventTable =
>                 tableEnv.fromDataStream(
>                                 event,
>                                 Schema.newBuilder()
>                                         .column("f0", DataTypes.INT())
>                                         .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
>                                         .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
>                                         .build())
>                         .as("id", "ts");
>         tableEnv.createTemporaryView("event_table", eventTable);
>         final Table result =
>                 tableEnv.sqlQuery(
>                         "SELECT * FROM event_table"
>                                 + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
>                                 + " ON event_table.id = versioned_table.id");
>         result.execute().print();
>         result.filter($("state").isEqual("online")).execute().print();
>     }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
>  
> After filtering with predicate state = 'online', I expect only the two rows 
> with state offline will be filtered out. But I got the following result:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to