[ https://issues.apache.org/jira/browse/FLINK-33722 ]


    Grzegorz Kołakowski deleted comment on FLINK-33722:
    ---------------------------------------------

was (Author: grzegorz.kolakowski):
Please assign me to the task.

> MATCH_RECOGNIZE in batch mode ignores events order
> --------------------------------------------------
>
>                 Key: FLINK-33722
>                 URL: https://issues.apache.org/jira/browse/FLINK-33722
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.17.1
>            Reporter: Grzegorz Kołakowski
>            Priority: Major
>
> MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider 
> the following example:
> {code:sql}
>     FROM events
>         MATCH_RECOGNIZE (
>             PARTITION BY user_id
>             ORDER BY ts ASC
>             MEASURES
>                 FIRST(A.ts) as _start,
>                 LAST(A.ts) as _middle,
>                 LAST(B.ts) as _finish
>             ONE ROW PER MATCH
>             AFTER MATCH SKIP PAST LAST ROW
>             PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
>             DEFINE
>                 A AS active is false,
>                 B AS active is true
>         ) AS T {code}
> where _events_ is a Postgresql table containing ~10000 records.
> {code:java}
> CREATE TABLE events (
>   id INT,
>   user_id INT,
>   ts TIMESTAMP(3),
>   active BOOLEAN,
>   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>     'connector' = 'jdbc',
>     'url' = 'jdbc:postgresql://postgres:5432/test',
>     'username' = 'test',
>     'password' = 'test',
>     'table-name' = 'events'
> ); {code}
> It can happen that _finish is smaller than _start or _middle, which is wrong.
> {noformat}
>    user_id                  _start                 _middle                 
> _finish
>          1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 
> 14:34:44.264{noformat}
>  
> Repository where I reproduced the problem: 
> [https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging]
> ----
>  
> According to [~dwysakowicz]:  In BATCH the CepOperator is always created to 
> process records in processing time:
> [https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54]
> A comparator is passed along to the operator covering the sorting on ts 
> field: 
> [https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173]
>  but this is only secondary sorting. It is applied only within records of the 
> same timestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to