Grzegorz Kołakowski created FLINK-33722:
-------------------------------------------

             Summary: MATCH_RECOGNIZE in batch mode ignores events order
                 Key: FLINK-33722
                 URL: https://issues.apache.org/jira/browse/FLINK-33722
             Project: Flink
          Issue Type: Bug
          Components: Library / CEP
    Affects Versions: 1.17.1
            Reporter: Grzegorz Kołakowski


MATCH_RECOGNIZE in batch mode seems to ignore ORDER BY clause. Let's consider 
the following example:
{code:sql}
    FROM events
        MATCH_RECOGNIZE (
            PARTITION BY user_id
            ORDER BY ts ASC
            MEASURES
                FIRST(A.ts) as _start,
                LAST(A.ts) as _middle,
                LAST(B.ts) as _finish
            ONE ROW PER MATCH
            AFTER MATCH SKIP PAST LAST ROW
            PATTERN (A{2} B) WITHIN INTERVAL '2' HOURS
            DEFINE
                A AS active is false,
                B AS active is true
        ) AS T {code}
where _events_ is a Postgresql table containing ~10000 records.
{code:java}
CREATE TABLE events (
  id INT,
  user_id INT,
  ts TIMESTAMP(3),
  active BOOLEAN,
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres:5432/test',
    'username' = 'test',
    'password' = 'test',
    'table-name' = 'events'
); {code}
It can happen that __finish_ is smaller than __start_ or {_}_middle{_}, which 
is wrong.
{noformat}
   user_id                  _start                 _middle                 
_finish
         1 2023-11-23 14:34:42.346 2023-11-23 14:34:48.370 2023-11-23 
14:34:44.264{noformat}
 

Repository where I reproduced the problem: 
https://github.com/grzegorz8/flink-match-recognize-in-batch-debugging
----
 

According to [~dwysakowicz]:  In BATCH the CepOperator is always created to 
process records in processing time:
https://github.com/apache/flink/blob/7f7bee70e3ac0d9fb27d7e09b41d6396b748dada/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java#L54
A comparator is passed along to the operator covering the sorting on ts field: 
https://github.com/apache/flink/blob/fea9ffedecf81a97de5c31519ade3bab8228e743/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java#L173
 but this is only secondary sorting. It is applied only within records of the 
same timestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to