[ 
https://issues.apache.org/jira/browse/FLINK-28988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xuannan Su updated FLINK-28988:
-------------------------------
    Description: 
The following code can reproduce the case

 
{code:java}
public class TemporalJoinSQLExample1 {

    public static void main(String[] args) throws Exception {

        // set up the Java DataStream API
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        // set up the Java Table API
        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);

        final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
                env.fromElements(
                        new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
                        new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
                        new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));

        final Table table =
                tableEnv.fromDataStream(
                                ds,
                                Schema.newBuilder()
                                        .column("f0", DataTypes.INT())
                                        .column("f1", DataTypes.STRING())
                                        .column("f2", 
DataTypes.TIMESTAMP_LTZ(3))
                                        .watermark("f2", "f2 - INTERVAL '2' 
SECONDS")
                                        .build())
                        .as("id", "state", "ts");
        tableEnv.createTemporaryView("source_table", table);
        final Table dedupeTable =
                tableEnv.sqlQuery(
                        "SELECT * FROM ("
                                + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
id ORDER BY ts DESC) AS row_num FROM source_table"
                                + ") WHERE row_num = 1");
        tableEnv.createTemporaryView("versioned_table", dedupeTable);

        DataStreamSource<Tuple2<Integer, Instant>> event =
                env.fromElements(
                        new Tuple2<>(0, Instant.ofEpochMilli(0)),
                        new Tuple2<>(0, Instant.ofEpochMilli(5)),
                        new Tuple2<>(0, Instant.ofEpochMilli(10)),
                        new Tuple2<>(0, Instant.ofEpochMilli(15)),
                        new Tuple2<>(0, Instant.ofEpochMilli(20)),
                        new Tuple2<>(0, Instant.ofEpochMilli(25)));

        final Table eventTable =
                tableEnv.fromDataStream(
                                event,
                                Schema.newBuilder()
                                        .column("f0", DataTypes.INT())
                                        .column("f1", 
DataTypes.TIMESTAMP_LTZ(3))
                                        .watermark("f1", "f1 - INTERVAL '2' 
SECONDS")
                                        .build())
                        .as("id", "ts");

        tableEnv.createTemporaryView("event_table", eventTable);

        final Table result =
                tableEnv.sqlQuery(
                        "SELECT * FROM event_table"
                                + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
AS OF event_table.ts"
                                + " ON event_table.id = versioned_table.id");
        result.execute().print();

        result.filter($("state").isEqual("online")).execute().print();
    }
} {code}
 

The result of temporal join is the following:
|op|         id|                     ts|        id0|                         
state|                    ts0|             row_num|
|+I|          0|1970-01-01 08:00:00.000|          0|                        
online|1970-01-01 08:00:00.000|                   1|
|+I|          0|1970-01-01 08:00:00.005|          0|                        
online|1970-01-01 08:00:00.000|                   1|
|+I|          0|1970-01-01 08:00:00.010|          0|                       
offline|1970-01-01 08:00:00.010|                   1|
|+I|          0|1970-01-01 08:00:00.015|          0|                       
offline|1970-01-01 08:00:00.010|                   1|
|+I|          0|1970-01-01 08:00:00.020|          0|                        
online|1970-01-01 08:00:00.020|                   1|
|+I|          0|1970-01-01 08:00:00.025|          0|                        
online|1970-01-01 08:00:00.020|                   1|

 

After filtering with predicate state = 'online', I expect only the two rows 
with state offline will be filtered out. But I got the following result:

|op|         id|                     ts|        id0|                         
state|                    ts0|             row_num|
|+I|          0|1970-01-01 08:00:00.020|          0|                        
online|1970-01-01 08:00:00.020|                   1|
|+I|          0|1970-01-01 08:00:00.025|          0|                        
online|1970-01-01 08:00:00.020|                   1|

 
 
 

 

  was:
The following code can reproduce the case

 
{code:java}
public class TemporalJoinSQLExample1 {

    public static void main(String[] args) throws Exception {

        // set up the Java DataStream API
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        // set up the Java Table API
        final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);

        final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
                env.fromElements(
                        new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
                        new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
                        new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));

        final Table table =
                tableEnv.fromDataStream(
                                ds,
                                Schema.newBuilder()
                                        .column("f0", DataTypes.INT())
                                        .column("f1", DataTypes.STRING())
                                        .column("f2", 
DataTypes.TIMESTAMP_LTZ(3))
                                        .watermark("f2", "f2 - INTERVAL '2' 
SECONDS")
                                        .build())
                        .as("id", "state", "ts");
        tableEnv.createTemporaryView("source_table", table);
        final Table dedupeTable =
                tableEnv.sqlQuery(
                        "SELECT * FROM ("
                                + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
id ORDER BY ts DESC) AS row_num FROM source_table"
                                + ") WHERE row_num = 1");
        tableEnv.createTemporaryView("versioned_table", dedupeTable);

        DataStreamSource<Tuple2<Integer, Instant>> event =
                env.fromElements(
                        new Tuple2<>(0, Instant.ofEpochMilli(0)),
                        new Tuple2<>(0, Instant.ofEpochMilli(5)),
                        new Tuple2<>(0, Instant.ofEpochMilli(10)),
                        new Tuple2<>(0, Instant.ofEpochMilli(15)),
                        new Tuple2<>(0, Instant.ofEpochMilli(20)),
                        new Tuple2<>(0, Instant.ofEpochMilli(25)));

        final Table eventTable =
                tableEnv.fromDataStream(
                                event,
                                Schema.newBuilder()
                                        .column("f0", DataTypes.INT())
                                        .column("f1", 
DataTypes.TIMESTAMP_LTZ(3))
                                        .watermark("f1", "f1 - INTERVAL '2' 
SECONDS")
                                        .build())
                        .as("id", "ts");

        tableEnv.createTemporaryView("event_table", eventTable);

        final Table result =
                tableEnv.sqlQuery(
                        "SELECT * FROM event_table"
                                + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
AS OF event_table.ts"
                                + " ON event_table.id = versioned_table.id");
        result.execute().print();

        result.filter($("state").isEqual("online")).execute().print();
    }
} {code}
 

The result of temporal join is the following:

+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| op |          id |                      ts |         id0 |                    
      state |                     ts0 |              row_num |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| +I |           0 | 1970-01-01 08:00:00.000 |           0 |                    
     online | 1970-01-01 08:00:00.000 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.005 |           0 |                    
     online | 1970-01-01 08:00:00.000 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.010 |           0 |                    
    offline | 1970-01-01 08:00:00.010 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.015 |           0 |                    
    offline | 1970-01-01 08:00:00.010 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.020 |           0 |                    
     online | 1970-01-01 08:00:00.020 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.025 |           0 |                    
     online | 1970-01-01 08:00:00.020 |                    1 |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+

 

After filtering with predicate state = 'online', I expect only the two rows 
with state offline will be filtered out. But I got the following result:

+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| op |          id |                      ts |         id0 |                    
      state |                     ts0 |              row_num |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+
| +I |           0 | 1970-01-01 08:00:00.020 |           0 |                    
     online | 1970-01-01 08:00:00.020 |                    1 |
| +I |           0 | 1970-01-01 08:00:00.025 |           0 |                    
     online | 1970-01-01 08:00:00.020 |                    1 |
+----+-------------+-------------------------+-------------+--------------------------------+-------------------------+----------------------+


> Incorrect result for filter after temporal join
> -----------------------------------------------
>
>                 Key: FLINK-28988
>                 URL: https://issues.apache.org/jira/browse/FLINK-28988
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.15.1
>            Reporter: Xuannan Su
>            Priority: Major
>
> The following code can reproduce the case
>  
> {code:java}
> public class TemporalJoinSQLExample1 {
>     public static void main(String[] args) throws Exception {
>         // set up the Java DataStream API
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // set up the Java Table API
>         final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>         final DataStreamSource<Tuple3<Integer, String, Instant>> ds =
>                 env.fromElements(
>                         new Tuple3<>(0, "online", Instant.ofEpochMilli(0)),
>                         new Tuple3<>(0, "offline", Instant.ofEpochMilli(10)),
>                         new Tuple3<>(0, "online", Instant.ofEpochMilli(20)));
>         final Table table =
>                 tableEnv.fromDataStream(
>                                 ds,
>                                 Schema.newBuilder()
>                                         .column("f0", DataTypes.INT())
>                                         .column("f1", DataTypes.STRING())
>                                         .column("f2", 
> DataTypes.TIMESTAMP_LTZ(3))
>                                         .watermark("f2", "f2 - INTERVAL '2' 
> SECONDS")
>                                         .build())
>                         .as("id", "state", "ts");
>         tableEnv.createTemporaryView("source_table", table);
>         final Table dedupeTable =
>                 tableEnv.sqlQuery(
>                         "SELECT * FROM ("
>                                 + " SELECT *, ROW_NUMBER() OVER (PARTITION BY 
> id ORDER BY ts DESC) AS row_num FROM source_table"
>                                 + ") WHERE row_num = 1");
>         tableEnv.createTemporaryView("versioned_table", dedupeTable);
>         DataStreamSource<Tuple2<Integer, Instant>> event =
>                 env.fromElements(
>                         new Tuple2<>(0, Instant.ofEpochMilli(0)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(5)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(10)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(15)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(20)),
>                         new Tuple2<>(0, Instant.ofEpochMilli(25)));
>         final Table eventTable =
>                 tableEnv.fromDataStream(
>                                 event,
>                                 Schema.newBuilder()
>                                         .column("f0", DataTypes.INT())
>                                         .column("f1", 
> DataTypes.TIMESTAMP_LTZ(3))
>                                         .watermark("f1", "f1 - INTERVAL '2' 
> SECONDS")
>                                         .build())
>                         .as("id", "ts");
>         tableEnv.createTemporaryView("event_table", eventTable);
>         final Table result =
>                 tableEnv.sqlQuery(
>                         "SELECT * FROM event_table"
>                                 + " LEFT JOIN versioned_table FOR SYSTEM_TIME 
> AS OF event_table.ts"
>                                 + " ON event_table.id = versioned_table.id");
>         result.execute().print();
>         result.filter($("state").isEqual("online")).execute().print();
>     }
> } {code}
>  
> The result of temporal join is the following:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.000|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.005|          0|                        
> online|1970-01-01 08:00:00.000|                   1|
> |+I|          0|1970-01-01 08:00:00.010|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.015|          0|                       
> offline|1970-01-01 08:00:00.010|                   1|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
>  
> After filtering with predicate state = 'online', I expect only the two rows 
> with state offline will be filtered out. But I got the following result:
> |op|         id|                     ts|        id0|                         
> state|                    ts0|             row_num|
> |+I|          0|1970-01-01 08:00:00.020|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
> |+I|          0|1970-01-01 08:00:00.025|          0|                        
> online|1970-01-01 08:00:00.020|                   1|
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to