[ https://issues.apache.org/jira/browse/FLINK-33722 ]
Grzegorz Kołakowski deleted comment on FLINK-33722: --------------------------------------------- was (Author: grzegorz.kolakowski): Please assign me to the task. > MATCH_RECOGNIZE in batch mode ignores events order > -------------------------------------------------- > > Key: FLINK-33722 > URL: https://issues.apache.org/jira/browse/FLINK-33722 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.17.1 > Reporter: Grzegorz Kołakowski > Priority: Major > > MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider > the following example: > {code:sql} > FROM events > MATCH_RECOGNIZE ( > PARTITION BY user_id > ORDER BY ts ASC > MEASURES > FIRST(A.ts) as _start, > LAST(A.ts) as _middle, > LAST(B.ts) as _finish > ONE ROW PER MATCH > AFTER MATCH SKIP PAST LAST ROW > PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS > DEFINE > A AS active is false, > B AS active is true > ) AS T {code} > where _events_ is a Postgresql table containing ~10000 records. > {code:java} > CREATE TABLE events ( > id INT, > user_id INT, > ts TIMESTAMP(3), > active BOOLEAN, > WATERMARK FOR ts AS ts - INTERVAL '5' SECOND, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:postgresql://postgres:5432/test', > 'username' = 'test', > 'password' = 'test', > 'table-name' = 'events' > ); {code} > It can happen that _finish is smaller than _start or _middle, which is wrong. > {noformat} > user_id _start _middle > _finish > 1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 > 14:34:44.264{noformat} > > Repository where I reproduced the problem: > [https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging] > ---- > > According to [~dwysakowicz]: In BATCH the CepOperator is always created to > process records in processing time: > [https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54] > A comparator is passed along to the operator covering the sorting on ts > field: > [https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173] > but this is only secondary sorting. It is applied only within records of the > same timestamp. -- This message was sent by Atlassian Jira (v8.20.10#820010)