This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f22c50aab1f [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in 
batch mode (#24699)
f22c50aab1f is described below

commit f22c50aab1f0efe59c3c7a1d944fce79b4edd54d
Author: Grzegorz KoĊ‚akowski <[email protected]>
AuthorDate: Mon Nov 4 09:14:31 2024 +0100

    [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode (#24699)
---
 .../plan/nodes/exec/batch/BatchExecMatch.java      |  72 ++-
 .../plan/nodes/exec/common/CommonExecMatch.java    |  24 +-
 .../plan/nodes/exec/stream/StreamExecMatch.java    |  16 +-
 .../runtime/batch/sql/MatchRecognizeITCase.java    | 495 +++++++++++++++++----
 4 files changed, 513 insertions(+), 94 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
index 4f02be5f91c..ec48153d02f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
@@ -18,22 +18,36 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import 
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import 
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Collections;
 
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+
 /** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
 public class BatchExecMatch extends CommonExecMatch
         implements BatchExecNode<RowData>, 
MultipleTransformationTranslator<RowData> {
 
+    public static final String TIMESTAMP_INSERTER_TRANSFORMATION = 
"timestamp-inserter";
+
     public BatchExecMatch(
             ReadableConfig tableConfig,
             MatchSpec matchSpec,
@@ -51,7 +65,61 @@ public class BatchExecMatch extends CommonExecMatch
     }
 
     @Override
-    public boolean isProcTime(RowType inputRowType) {
-        return true;
+    public void checkOrderKeys(RowType inputRowType) {
+        SortSpec orderKeys = matchSpec.getOrderKeys();
+        if (orderKeys.getFieldSize() == 0) {
+            throw new TableException("You must specify non-empty order by.");
+        }
+
+        SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
+        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+        LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
+
+        if (!TypeCheckUtils.isTimePoint(timeOrderFieldType)) {
+            throw new TableException("You must specify time point for order by 
as the first one.");
+        }
+
+        // time ordering needs to be ascending
+        if (!orderKeys.getAscendingOrders()[0]) {
+            throw new TableException("Primary sort order of a table must be 
ascending on time.");
+        }
+    }
+
+    @Override
+    protected Transformation<RowData> translateOrder(
+            PlannerBase planner,
+            Transformation<RowData> inputTransform,
+            RowType inputRowType,
+            ExecEdge inputEdge,
+            ExecNodeConfig config) {
+        if (isProcTime(inputRowType)) {
+            // In proctime process records in the order they come.
+            return inputTransform;
+        }
+
+        // copy the timestamp field from order by clause into the StreamRecord 
timestamp field
+        SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
+        int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+        LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
+        int precision = getPrecision(timeOrderFieldType);
+        Transformation<RowData> transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        inputTransform,
+                        createTransformationMeta(
+                                TIMESTAMP_INSERTER_TRANSFORMATION,
+                                String.format(
+                                        
"BatchRecordTimestampInserter(timestamp field: %s)",
+                                        timeOrderFieldIdx),
+                                "BatchRecordTimestampInserter",
+                                config),
+                        new StreamRecordTimestampInserter(timeOrderFieldIdx, 
precision),
+                        inputTransform.getOutputType(),
+                        inputTransform.getParallelism(),
+                        false);
+        if (inputsContainSingleton()) {
+            transform.setParallelism(1);
+            transform.setMaxParallelism(1);
+        }
+        return transform;
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
index ad94053ebc0..c2ab5c84539 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
@@ -58,6 +58,7 @@ import 
org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
 import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.MathUtils;
@@ -86,7 +87,7 @@ public abstract class CommonExecMatch extends 
ExecNodeBase<RowData>
 
     public static final String MATCH_TRANSFORMATION = "match";
 
-    private final MatchSpec matchSpec;
+    protected final MatchSpec matchSpec;
 
     public CommonExecMatch(
             int id,
@@ -115,7 +116,7 @@ public abstract class CommonExecMatch extends 
ExecNodeBase<RowData>
                 createEventComparator(
                         config, planner.getFlinkContext().getClassLoader(), 
inputRowType);
         final Transformation<RowData> timestampedInputTransform =
-                translateOrder(inputTransform, inputRowType, config);
+                translateOrder(planner, inputTransform, inputRowType, 
inputEdge, config);
 
         final Tuple2<Pattern<RowData, RowData>, List<String>> 
cepPatternAndNames =
                 translatePattern(
@@ -200,7 +201,7 @@ public abstract class CommonExecMatch extends 
ExecNodeBase<RowData>
         return transform;
     }
 
-    protected void checkOrderKeys(RowType inputRowType) {}
+    protected abstract void checkOrderKeys(RowType inputRowType);
 
     private EventComparator<RowData> createEventComparator(
             ExecNodeConfig config, ClassLoader classLoader, RowType 
inputRowType) {
@@ -215,10 +216,12 @@ public abstract class CommonExecMatch extends 
ExecNodeBase<RowData>
         }
     }
 
-    protected Transformation<RowData> translateOrder(
-            Transformation<RowData> inputTransform, RowType inputRowType, 
ExecNodeConfig config) {
-        return inputTransform;
-    }
+    protected abstract Transformation<RowData> translateOrder(
+            PlannerBase planner,
+            Transformation<RowData> inputTransform,
+            RowType inputRowType,
+            ExecEdge inputEdge,
+            ExecNodeConfig config);
 
     @VisibleForTesting
     public static Tuple2<Pattern<RowData, RowData>, List<String>> 
translatePattern(
@@ -251,7 +254,12 @@ public abstract class CommonExecMatch extends 
ExecNodeBase<RowData>
                 "Only constant intervals with millisecond resolution are 
supported as time constraints of patterns.");
     }
 
-    public abstract boolean isProcTime(RowType inputRowType);
+    public boolean isProcTime(RowType inputRowType) {
+        final SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
+        final LogicalType timeOrderFieldType =
+                inputRowType.getTypeAt(timeOrderField.getFieldIndex());
+        return TypeCheckUtils.isProcTime(timeOrderFieldType);
+    }
 
     /** The visitor to traverse the pattern RexNode. */
     private static class PatternVisitor extends 
RexDefaultVisitor<Pattern<RowData, RowData>> {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 2a204e41ff3..464a36dbb29 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -121,7 +123,11 @@ public class StreamExecMatch extends CommonExecMatch
 
     @Override
     public Transformation<RowData> translateOrder(
-            Transformation<RowData> inputTransform, RowType inputRowType, 
ExecNodeConfig config) {
+            PlannerBase planner,
+            Transformation<RowData> inputTransform,
+            RowType inputRowType,
+            ExecEdge inputEdge,
+            ExecNodeConfig config) {
         SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
         int timeOrderFieldIdx = timeOrderField.getFieldIndex();
         LogicalType timeOrderFieldType = 
inputRowType.getTypeAt(timeOrderFieldIdx);
@@ -152,12 +158,4 @@ public class StreamExecMatch extends CommonExecMatch
             return inputTransform;
         }
     }
-
-    @Override
-    public boolean isProcTime(RowType inputRowType) {
-        final SortSpec.SortFieldSpec timeOrderField = 
matchSpec.getOrderKeys().getFieldSpec(0);
-        final LogicalType timeOrderFieldType =
-                inputRowType.getTypeAt(timeOrderField.getFieldIndex());
-        return TypeCheckUtils.isProcTime(timeOrderFieldType);
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
index 8efc072adfa..5aae60f5279 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
@@ -38,11 +38,16 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.util.Comparator;
 import java.util.List;
 
 import static org.apache.flink.api.common.typeinfo.Types.DOUBLE;
+import static org.apache.flink.api.common.typeinfo.Types.INSTANT;
 import static org.apache.flink.api.common.typeinfo.Types.INT;
+import static org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME;
 import static org.apache.flink.api.common.typeinfo.Types.LONG;
 import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED;
 import static org.apache.flink.api.common.typeinfo.Types.STRING;
@@ -65,7 +70,7 @@ class MatchRecognizeITCase {
     }
 
     @Test
-    void testSimplePattern() {
+    void testSimplePatternInProcTime() {
         tEnv.createTemporaryView(
                 "MyTable",
                 tEnv.fromDataStream(
@@ -105,39 +110,137 @@ class MatchRecognizeITCase {
                 .containsExactly(Row.of(6, 7, 8));
     }
 
+    @Test
+    void testSimplePatternInEventTime() {
+        Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromData(
+                                        Row.of(1, "a", now.plusSeconds(1)),
+                                        Row.of(2, "z", now.plusSeconds(2)),
+                                        Row.of(3, "b", now.plusSeconds(3)),
+                                        Row.of(4, "c", now.plusSeconds(4)),
+                                        Row.of(5, "d", now.plusSeconds(5)),
+                                        Row.of(6, "a", now.plusSeconds(6)),
+                                        Row.of(7, "b", now.plusSeconds(7)),
+                                        Row.of(8, "c", now.plusSeconds(8)),
+                                        Row.of(9, "h", now.plusSeconds(9)))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"id", "name", 
"ts"},
+                                                INT,
+                                                STRING,
+                                                INSTANT)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .column("ts", DataTypes.TIMESTAMP_LTZ(3))
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT T.aid, T.bid, T.cid\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY ts\n"
+                                + "  MEASURES\n"
+                                + "    `A\"`.id AS aid,\n"
+                                + "    \u006C.id AS bid,\n"
+                                + "    C.id AS cid\n"
+                                + "  PATTERN (`A\"` \u006C C)\n"
+                                + "  DEFINE\n"
+                                + "    `A\"` AS name = 'a',\n"
+                                + "    \u006C AS name = 'b',\n"
+                                + "    C AS name = 'c'\n"
+                                + ") AS T");
+        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+                .containsExactly(Row.of(6, 7, 8));
+    }
+
+    @Test
+    void testTimeConstraint() {
+        LocalDateTime now = LocalDateTime.parse("2023-12-01T12:00:00.000");
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromData(
+                                        Row.of(1, "z", now.plusSeconds(1)),
+                                        // records 2, 3, 4 arrive within 58 
seconds --> matched
+                                        Row.of(2, "a", now.plusSeconds(2)),
+                                        Row.of(3, "b", now.plusSeconds(30)),
+                                        Row.of(4, "c", now.plusSeconds(60)),
+                                        Row.of(5, "x", now.plusSeconds(100)),
+                                        // records 6, 7, 8 arrive within 61 -> 
not matched
+                                        Row.of(6, "a", now.plusSeconds(101)),
+                                        Row.of(7, "b", now.plusSeconds(131)),
+                                        Row.of(8, "c", now.plusSeconds(162)),
+                                        Row.of(9, "z", now.plusSeconds(200)))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"id", "name", 
"ts"},
+                                                INT,
+                                                STRING,
+                                                LOCAL_DATE_TIME)),
+                        Schema.newBuilder()
+                                .column("id", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT T.aid, T.bid, T.cid\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY ts\n"
+                                + "  MEASURES\n"
+                                + "    A.id AS aid,\n"
+                                + "    B.id AS bid,\n"
+                                + "    C.id AS cid\n"
+                                + "  PATTERN (A B C) WITHIN INTERVAL '1' 
MINUTE\n"
+                                + "  DEFINE\n"
+                                + "    A AS name = 'a',\n"
+                                + "    B AS name = 'b',\n"
+                                + "    C AS name = 'c'\n"
+                                + ") AS T");
+        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+                .containsExactly(Row.of(2, 3, 4));
+    }
+
     @Test
     void testSimplePatternWithNulls() {
+        Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
         tEnv.createTemporaryView(
                 "MyTable",
                 tEnv.fromDataStream(
                         env.fromData(
-                                        Row.of(1, "a", null),
-                                        Row.of(2, "b", null),
-                                        Row.of(3, "c", null),
-                                        Row.of(4, "d", null),
-                                        Row.of(5, null, null),
-                                        Row.of(6, "a", null),
-                                        Row.of(7, "b", null),
-                                        Row.of(8, "c", null),
-                                        Row.of(9, null, null))
+                                        Row.of(1, "a", null, 
now.plusSeconds(1)),
+                                        Row.of(2, "b", null, 
now.plusSeconds(2)),
+                                        Row.of(3, "c", null, 
now.plusSeconds(3)),
+                                        Row.of(4, "d", null, 
now.plusSeconds(4)),
+                                        Row.of(5, null, null, 
now.plusSeconds(5)),
+                                        Row.of(6, "a", null, 
now.plusSeconds(6)),
+                                        Row.of(7, "b", null, 
now.plusSeconds(7)),
+                                        Row.of(8, "c", null, 
now.plusSeconds(8)),
+                                        Row.of(9, null, null, 
now.plusSeconds(9)))
                                 .returns(
                                         ROW_NAMED(
-                                                new String[] {"id", "name", 
"nullField"},
+                                                new String[] {"id", "name", 
"nullField", "ts"},
                                                 INT,
                                                 STRING,
-                                                STRING)),
+                                                STRING,
+                                                INSTANT)),
                         Schema.newBuilder()
                                 .column("id", DataTypes.INT())
                                 .column("name", DataTypes.STRING())
                                 .column("nullField", DataTypes.STRING())
-                                .columnByExpression("proctime", "PROCTIME()")
+                                .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
         TableResult tableResult =
                 tEnv.executeSql(
                         "SELECT T.aid, T.bNull, T.cid, T.aNull\n"
                                 + "FROM MyTable\n"
                                 + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY proctime\n"
+                                + "  ORDER BY ts\n"
                                 + "  MEASURES\n"
                                 + "    A.id AS aid,\n"
                                 + "    A.nullField AS aNull,\n"
@@ -155,33 +258,35 @@ class MatchRecognizeITCase {
 
     @Test
     void testCodeSplitsAreProperlyGenerated() {
+        Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
         tEnv.getConfig().setMaxGeneratedCodeLength(1);
         tEnv.createTemporaryView(
                 "MyTable",
                 tEnv.fromDataStream(
                         env.fromData(
-                                        Row.of(1, "a", "key1", "second_key3"),
-                                        Row.of(2, "b", "key1", "second_key3"),
-                                        Row.of(3, "c", "key1", "second_key3"),
-                                        Row.of(4, "d", "key", "second_key"),
-                                        Row.of(5, "e", "key", "second_key"),
-                                        Row.of(6, "a", "key2", "second_key4"),
-                                        Row.of(7, "b", "key2", "second_key4"),
-                                        Row.of(8, "c", "key2", "second_key4"),
-                                        Row.of(9, "f", "key", "second_key"))
+                                        Row.of(1, "a", "key1", "second_key3", 
now.plusSeconds(1)),
+                                        Row.of(2, "b", "key1", "second_key3", 
now.plusSeconds(2)),
+                                        Row.of(3, "c", "key1", "second_key3", 
now.plusSeconds(3)),
+                                        Row.of(4, "d", "key", "second_key", 
now.plusSeconds(4)),
+                                        Row.of(5, "e", "key", "second_key", 
now.plusSeconds(5)),
+                                        Row.of(6, "a", "key2", "second_key4", 
now.plusSeconds(6)),
+                                        Row.of(7, "b", "key2", "second_key4", 
now.plusSeconds(7)),
+                                        Row.of(8, "c", "key2", "second_key4", 
now.plusSeconds(8)),
+                                        Row.of(9, "f", "key", "second_key", 
now.plusSeconds(9)))
                                 .returns(
                                         ROW_NAMED(
-                                                new String[] {"id", "name", 
"key1", "key2"},
+                                                new String[] {"id", "name", 
"key1", "key2", "ts"},
                                                 INT,
                                                 STRING,
                                                 STRING,
-                                                STRING)),
+                                                STRING,
+                                                INSTANT)),
                         Schema.newBuilder()
                                 .column("id", DataTypes.INT())
                                 .column("name", DataTypes.STRING())
                                 .column("key1", DataTypes.STRING())
                                 .column("key2", DataTypes.STRING())
-                                .columnByExpression("proctime", "PROCTIME()")
+                                .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
         TableResult tableResult =
                 tEnv.executeSql(
@@ -189,7 +294,7 @@ class MatchRecognizeITCase {
                                 + "FROM MyTable\n"
                                 + "MATCH_RECOGNIZE (\n"
                                 + "  PARTITION BY key1, key2\n"
-                                + "  ORDER BY proctime\n"
+                                + "  ORDER BY ts\n"
                                 + "  MEASURES\n"
                                 + "    A.id AS aid,\n"
                                 + "    A.key1 AS akey1,\n"
@@ -210,6 +315,193 @@ class MatchRecognizeITCase {
                         Row.of("key2", "second_key4", 6, "key2", 7, 8, 
"second_key4"));
     }
 
+    @Test
+    void testEventsAreProperlyOrdered() {
+        LocalDateTime epoch = LocalDateTime.ofEpochSecond(2L, 0, 
ZoneOffset.UTC);
+        tEnv.createTemporaryView(
+                "MyTable",
+                tEnv.fromDataStream(
+                        env.fromData(
+                                        // event time order breaks this match
+                                        Row.of(epoch.plusSeconds(2), 12, 1, 
"a", 1),
+                                        Row.of(epoch.plusSeconds(1L), 11, 2, 
"b", 2),
+                                        Row.of(epoch.plusSeconds(3L), 10, 3, 
"c", 3),
+                                        // secondary order breaks this match
+                                        Row.of(epoch.plusSeconds(4L), 8, 4, 
"a", 4),
+                                        Row.of(epoch.plusSeconds(4L), 9, 5, 
"b", 5),
+                                        Row.of(epoch.plusSeconds(5L), 7, 6, 
"c", 6),
+                                        // ternary order breaks this match
+                                        Row.of(epoch.plusSeconds(6L), 6, 8, 
"a", 7),
+                                        Row.of(epoch.plusSeconds(6L), 6, 7, 
"b", 8),
+                                        Row.of(epoch.plusSeconds(8L), 4, 9, 
"c", 9),
+                                        // match
+                                        Row.of(epoch.plusSeconds(9L), 3, 10, 
"a", 10),
+                                        Row.of(epoch.plusSeconds(10L), 2, 11, 
"b", 11),
+                                        Row.of(epoch.plusSeconds(11L), 1, 12, 
"c", 12))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {
+                                                    "ts",
+                                                    "secondaryOrder",
+                                                    "ternaryOrder",
+                                                    "name",
+                                                    "id"
+                                                },
+                                                LOCAL_DATE_TIME,
+                                                INT,
+                                                INT,
+                                                STRING,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .column("secondaryOrder", DataTypes.INT())
+                                .column("ternaryOrder", DataTypes.INT())
+                                .column("name", DataTypes.STRING())
+                                .column("id", DataTypes.INT())
+                                .build()));
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT T.aid, T.bid, T.cid\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY ts, secondaryOrder DESC, 
ternaryOrder ASC\n"
+                                + "  MEASURES\n"
+                                + "    A.id AS aid,\n"
+                                + "    B.id AS bid,\n"
+                                + "    C.id AS cid\n"
+                                + "  PATTERN (A B C)\n"
+                                + "  DEFINE\n"
+                                + "    A AS name = 'a',\n"
+                                + "    B AS name = 'b',\n"
+                                + "    C AS name = 'c'\n"
+                                + ") AS T");
+        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+                .containsExactly(Row.of(10, 11, 12));
+    }
+
+    @Test
+    void testMatchRecognizeAppliedToWindowedGrouping() {
+        LocalDateTime now = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);
+        tEnv.createTemporaryView(
+                "Ticker",
+                tEnv.fromDataStream(
+                        env.fromData(
+                                        // first window
+                                        Row.of("ACME", now.plusSeconds(1), 1, 
1),
+                                        Row.of("ACME", now.plusSeconds(2), 2, 
2),
+                                        // second window
+                                        Row.of("ACME", now.plusSeconds(4), 1, 
4),
+                                        Row.of("ACME", now.plusSeconds(5), 1, 
3),
+                                        // third window
+                                        Row.of("ACME", now.plusSeconds(7), 2, 
3),
+                                        Row.of("ACME", now.plusSeconds(8), 2, 
3),
+                                        Row.of("ACME", now.plusSeconds(1), 20, 
4),
+                                        Row.of("ACME", now.plusSeconds(1), 24, 
4),
+                                        Row.of("ACME", now.plusSeconds(1), 25, 
3),
+                                        Row.of("ACME", now.plusSeconds(1), 19, 
8))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"symbol", "ts", 
"price", "tax"},
+                                                STRING,
+                                                LOCAL_DATE_TIME,
+                                                INT,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("symbol", DataTypes.STRING())
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .column("price", DataTypes.INT())
+                                .column("tax", DataTypes.INT())
+                                .build()));
+
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT *\n"
+                                + "FROM (\n"
+                                + "   SELECT\n"
+                                + "      symbol,\n"
+                                + "      SUM(price) as price,\n"
+                                + "      TUMBLE_ROWTIME(ts, interval '3' 
second) as rowTime,\n"
+                                + "      TUMBLE_START(ts, interval '3' second) 
as startTime\n"
+                                + "   FROM Ticker\n"
+                                + "   GROUP BY symbol, TUMBLE(ts, interval '3' 
second)\n"
+                                + ")\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  PARTITION BY symbol\n"
+                                + "  ORDER BY rowTime\n"
+                                + "  MEASURES\n"
+                                + "    B.price as dPrice,\n"
+                                + "    B.startTime as dTime\n"
+                                + "  ONE ROW PER MATCH\n"
+                                + "  PATTERN (A B)\n"
+                                + "  DEFINE\n"
+                                + "    B AS B.price < A.price\n"
+                                + ")");
+        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+                .containsExactly(Row.of("ACME", 2, now.plusSeconds(3)));
+    }
+
+    @Test
+    void testWindowedGroupingAppliedToMatchRecognize() {
+        LocalDateTime epoch = LocalDateTime.ofEpochSecond(0, 0, 
ZoneOffset.UTC);
+        tEnv.createTemporaryView(
+                "Ticker",
+                tEnv.fromDataStream(
+                        env.fromData(
+                                        // first window
+                                        Row.of("ACME", epoch.plusSeconds(1), 
1, 1),
+                                        Row.of("ACME", epoch.plusSeconds(2), 
2, 2),
+                                        // second window
+                                        Row.of("ACME", epoch.plusSeconds(4), 
1, 4),
+                                        Row.of("ACME", epoch.plusSeconds(5), 
1, 3))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"symbol", "ts", 
"price", "tax"},
+                                                STRING,
+                                                LOCAL_DATE_TIME,
+                                                INT,
+                                                INT)),
+                        Schema.newBuilder()
+                                .column("symbol", DataTypes.STRING())
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .column("price", DataTypes.INT())
+                                .column("tax", DataTypes.INT())
+                                .build()));
+
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT\n"
+                                + "  symbol,\n"
+                                + "  SUM(price) as price,\n"
+                                + "  TUMBLE_ROWTIME(matchRowtime, interval '3' 
second) as rowTime,\n"
+                                + "  TUMBLE_START(matchRowtime, interval '3' 
second) as startTime\n"
+                                + "FROM Ticker\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  PARTITION BY symbol\n"
+                                + "  ORDER BY ts\n"
+                                + "  MEASURES\n"
+                                + "    A.price as price,\n"
+                                + "    A.tax as tax,\n"
+                                + "    MATCH_ROWTIME() as matchRowtime\n"
+                                + "  ONE ROW PER MATCH\n"
+                                + "  PATTERN (A)\n"
+                                + "  DEFINE\n"
+                                + "    A AS A.price > 0\n"
+                                + ") AS T\n"
+                                + "GROUP BY symbol, TUMBLE(matchRowtime, 
interval '3' second)");
+        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+                .containsExactly(
+                        Row.of(
+                                "ACME",
+                                3,
+                                LocalDateTime.parse("1970-01-01T00:00:02.999"),
+                                LocalDateTime.parse("1970-01-01T00:00")),
+                        Row.of(
+                                "ACME",
+                                2,
+                                LocalDateTime.parse("1970-01-01T00:00:05.999"),
+                                LocalDateTime.parse("1970-01-01T00:00:03")));
+    }
+
     @Test
     void testLogicalOffsets() {
         tEnv.createTemporaryView(
@@ -236,14 +528,14 @@ class MatchRecognizeITCase {
                                 .column("tstamp", DataTypes.BIGINT())
                                 .column("price", DataTypes.INT())
                                 .column("tax", DataTypes.INT())
-                                .columnByExpression("proctime", "PROCTIME()")
+                                .columnByExpression("ts", 
"TO_TIMESTAMP_LTZ(tstamp, 3)")
                                 .build()));
         TableResult tableResult =
                 tEnv.executeSql(
                         "SELECT *\n"
                                 + "FROM Ticker\n"
                                 + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY proctime\n"
+                                + "  ORDER BY ts\n"
                                 + "  MEASURES\n"
                                 + "    FIRST(DOWN.tstamp) AS start_tstamp,\n"
                                 + "    LAST(DOWN.tstamp) AS bottom_tstamp,\n"
@@ -261,6 +553,53 @@ class MatchRecognizeITCase {
                 .containsExactly(Row.of(6L, 7L, 8L, 33, 33));
     }
 
+    @Test
+    void testPartitionByWithParallelSource() {
+        LocalDateTime epoch = LocalDateTime.ofEpochSecond(0, 0, 
ZoneOffset.UTC);
+        tEnv.createTemporaryView(
+                "Ticker",
+                tEnv.fromDataStream(
+                        env.fromData(
+                                        Row.of("ACME", epoch.plusSeconds(1), 
19, 1),
+                                        Row.of("ACME", epoch.plusSeconds(2), 
17, 2),
+                                        Row.of("ACME", epoch.plusSeconds(3), 
13, 3),
+                                        Row.of("ACME", epoch.plusSeconds(4), 
20, 4))
+                                .returns(
+                                        ROW_NAMED(
+                                                new String[] {"symbol", "ts", 
"price", "tax"},
+                                                STRING,
+                                                LOCAL_DATE_TIME,
+                                                INT,
+                                                INT))
+                                .setParallelism(env.getParallelism()),
+                        Schema.newBuilder()
+                                .column("symbol", DataTypes.STRING())
+                                .column("ts", DataTypes.TIMESTAMP(3))
+                                .column("price", DataTypes.INT())
+                                .column("tax", DataTypes.INT())
+                                .build()));
+
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "SELECT *\n"
+                                + "FROM Ticker\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  PARTITION BY symbol\n"
+                                + "  ORDER BY ts\n"
+                                + "  MEASURES\n"
+                                + "    DOWN.tax AS bottom_tax,\n"
+                                + "    UP.tax AS end_tax\n"
+                                + "  ONE ROW PER MATCH\n"
+                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                                + "  PATTERN (DOWN UP)\n"
+                                + "  DEFINE\n"
+                                + "    DOWN AS DOWN.price = 13,\n"
+                                + "    UP AS UP.price = 20\n"
+                                + ") AS T");
+        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+                .containsExactly(Row.of("ACME", 3, 4));
+    }
+
     @Test
     void testLogicalOffsetsWithStarVariable() {
         tEnv.createTemporaryView(
@@ -287,14 +626,14 @@ class MatchRecognizeITCase {
                                 .column("symbol", DataTypes.STRING())
                                 .column("tstamp", DataTypes.BIGINT())
                                 .column("price", DataTypes.INT())
-                                .columnByExpression("proctime", "PROCTIME()")
+                                .columnByExpression("ts", 
"TO_TIMESTAMP_LTZ(tstamp, 3)")
                                 .build()));
         TableResult tableResult =
                 tEnv.executeSql(
                         "SELECT *\n"
                                 + "FROM Ticker\n"
                                 + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY proctime\n"
+                                + "  ORDER BY ts\n"
                                 + "  MEASURES\n"
                                 + "    FIRST(id, 0) as id0,\n"
                                 + "    FIRST(id, 1) as id1,\n"
@@ -345,14 +684,14 @@ class MatchRecognizeITCase {
                                 .column("tstamp", DataTypes.BIGINT())
                                 .column("price", DataTypes.INT())
                                 .column("tax", DataTypes.INT())
-                                .columnByExpression("proctime", "PROCTIME()")
+                                .columnByExpression("ts", 
"TO_TIMESTAMP_LTZ(tstamp, 3)")
                                 .build()));
         TableResult tableResult =
                 tEnv.executeSql(
                         "SELECT *\n"
                                 + "FROM Ticker\n"
                                 + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY proctime\n"
+                                + "  ORDER BY ts\n"
                                 + "  MEASURES\n"
                                 + "    FIRST(DOWN.price) as first,\n"
                                 + "    LAST(DOWN.price) as last,\n"
@@ -377,40 +716,42 @@ class MatchRecognizeITCase {
      */
     @Test
     void testAggregates() {
+        Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
         tEnv.getConfig().setMaxGeneratedCodeLength(1);
         tEnv.createTemporaryView(
                 "MyTable",
                 tEnv.fromDataStream(
                         env.fromData(
-                                        Row.of(1, "a", 1, 0.8, 1),
-                                        Row.of(2, "z", 2, 0.8, 3),
-                                        Row.of(3, "b", 1, 0.8, 2),
-                                        Row.of(4, "c", 1, 0.8, 5),
-                                        Row.of(5, "d", 4, 0.1, 5),
-                                        Row.of(6, "a", 2, 1.5, 2),
-                                        Row.of(7, "b", 2, 0.8, 3),
-                                        Row.of(8, "c", 1, 0.8, 2),
-                                        Row.of(9, "h", 4, 0.8, 3),
-                                        Row.of(10, "h", 4, 0.8, 3),
-                                        Row.of(11, "h", 2, 0.8, 3),
-                                        Row.of(12, "h", 2, 0.8, 3))
+                                        Row.of(1, "a", 1, 0.8, 1, 
now.plusSeconds(1)),
+                                        Row.of(2, "z", 2, 0.8, 3, 
now.plusSeconds(2)),
+                                        Row.of(3, "b", 1, 0.8, 2, 
now.plusSeconds(3)),
+                                        Row.of(4, "c", 1, 0.8, 5, 
now.plusSeconds(4)),
+                                        Row.of(5, "d", 4, 0.1, 5, 
now.plusSeconds(5)),
+                                        Row.of(6, "a", 2, 1.5, 2, 
now.plusSeconds(6)),
+                                        Row.of(7, "b", 2, 0.8, 3, 
now.plusSeconds(7)),
+                                        Row.of(8, "c", 1, 0.8, 2, 
now.plusSeconds(8)),
+                                        Row.of(9, "h", 4, 0.8, 3, 
now.plusSeconds(9)),
+                                        Row.of(10, "h", 4, 0.8, 3, 
now.plusSeconds(10)),
+                                        Row.of(11, "h", 2, 0.8, 3, 
now.plusSeconds(11)),
+                                        Row.of(12, "h", 2, 0.8, 3, 
now.plusSeconds(12)))
                                 .returns(
                                         ROW_NAMED(
                                                 new String[] {
-                                                    "id", "name", "price", 
"rate", "weight"
+                                                    "id", "name", "price", 
"rate", "weight", "ts"
                                                 },
                                                 INT,
                                                 STRING,
                                                 INT,
                                                 DOUBLE,
-                                                INT)),
+                                                INT,
+                                                INSTANT)),
                         Schema.newBuilder()
                                 .column("id", DataTypes.INT())
                                 .column("name", DataTypes.STRING())
                                 .column("price", DataTypes.INT())
                                 .column("rate", DataTypes.DOUBLE())
                                 .column("weight", DataTypes.INT())
-                                .columnByExpression("proctime", "PROCTIME()")
+                                .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
         tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
         TableResult tableResult =
@@ -418,7 +759,7 @@ class MatchRecognizeITCase {
                         "SELECT *\n"
                                 + "FROM MyTable\n"
                                 + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY proctime\n"
+                                + "  ORDER BY ts\n"
                                 + "  MEASURES\n"
                                 + "    FIRST(id) as startId,\n"
                                 + "    SUM(A.price) AS sumA,\n"
@@ -446,31 +787,33 @@ class MatchRecognizeITCase {
 
     @Test
     void testAggregatesWithNullInputs() {
+        Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
         tEnv.getConfig().setMaxGeneratedCodeLength(1);
         tEnv.createTemporaryView(
                 "MyTable",
                 tEnv.fromDataStream(
                         env.fromData(
-                                        Row.of(1, "a", 10),
-                                        Row.of(2, "z", 10),
-                                        Row.of(3, "b", null),
-                                        Row.of(4, "c", null),
-                                        Row.of(5, "d", 3),
-                                        Row.of(6, "c", 3),
-                                        Row.of(7, "c", 3),
-                                        Row.of(8, "c", 3),
-                                        Row.of(9, "c", 2))
+                                        Row.of(1, "a", 10, now.plusSeconds(1)),
+                                        Row.of(2, "z", 10, now.plusSeconds(2)),
+                                        Row.of(3, "b", null, 
now.plusSeconds(3)),
+                                        Row.of(4, "c", null, 
now.plusSeconds(4)),
+                                        Row.of(5, "d", 3, now.plusSeconds(5)),
+                                        Row.of(6, "c", 3, now.plusSeconds(6)),
+                                        Row.of(7, "c", 3, now.plusSeconds(7)),
+                                        Row.of(8, "c", 3, now.plusSeconds(8)),
+                                        Row.of(9, "c", 2, now.plusSeconds(9)))
                                 .returns(
                                         ROW_NAMED(
-                                                new String[] {"id", "name", 
"price"},
+                                                new String[] {"id", "name", 
"price", "ts"},
                                                 INT,
                                                 STRING,
-                                                INT)),
+                                                INT,
+                                                INSTANT)),
                         Schema.newBuilder()
                                 .column("id", DataTypes.INT())
                                 .column("name", DataTypes.STRING())
                                 .column("price", DataTypes.INT())
-                                .columnByExpression("proctime", "PROCTIME()")
+                                .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
         tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
         TableResult tableResult =
@@ -478,7 +821,7 @@ class MatchRecognizeITCase {
                         "SELECT *\n"
                                 + "FROM MyTable\n"
                                 + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY proctime\n"
+                                + "  ORDER BY ts\n"
                                 + "  MEASURES\n"
                                 + "    SUM(A.price) as sumA,\n"
                                 + "    COUNT(A.id) as countAId,\n"
@@ -527,31 +870,33 @@ class MatchRecognizeITCase {
 
     @Test
     void testUserDefinedFunctions() {
+        Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
         tEnv.getConfig().setMaxGeneratedCodeLength(1);
         tEnv.createTemporaryView(
                 "MyTable",
                 tEnv.fromDataStream(
                         env.fromData(
-                                        Row.of(1, "a", 1),
-                                        Row.of(2, "a", 1),
-                                        Row.of(3, "a", 1),
-                                        Row.of(4, "a", 1),
-                                        Row.of(5, "a", 1),
-                                        Row.of(6, "b", 1),
-                                        Row.of(7, "a", 1),
-                                        Row.of(8, "a", 1),
-                                        Row.of(9, "f", 1))
+                                        Row.of(1, "a", 1, now.plusSeconds(1)),
+                                        Row.of(2, "a", 1, now.plusSeconds(2)),
+                                        Row.of(3, "a", 1, now.plusSeconds(3)),
+                                        Row.of(4, "a", 1, now.plusSeconds(4)),
+                                        Row.of(5, "a", 1, now.plusSeconds(5)),
+                                        Row.of(6, "b", 1, now.plusSeconds(6)),
+                                        Row.of(7, "a", 1, now.plusSeconds(7)),
+                                        Row.of(8, "a", 1, now.plusSeconds(8)),
+                                        Row.of(9, "f", 1, now.plusSeconds(9)))
                                 .returns(
                                         ROW_NAMED(
-                                                new String[] {"id", "name", 
"price"},
+                                                new String[] {"id", "name", 
"price", "ts"},
                                                 INT,
                                                 STRING,
-                                                INT)),
+                                                INT,
+                                                INSTANT)),
                         Schema.newBuilder()
                                 .column("id", DataTypes.INT())
                                 .column("name", DataTypes.STRING())
                                 .column("price", DataTypes.INT())
-                                .columnByExpression("proctime", "PROCTIME()")
+                                .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
         tEnv.createTemporarySystemFunction("prefix", new 
PrefixingScalarFunc());
         tEnv.createTemporarySystemFunction("countFrom", new RichAggFunc());
@@ -567,7 +912,7 @@ class MatchRecognizeITCase {
                                 "SELECT *\n"
                                         + "FROM MyTable\n"
                                         + "MATCH_RECOGNIZE (\n"
-                                        + "  ORDER BY proctime\n"
+                                        + "  ORDER BY ts\n"
                                         + "  MEASURES\n"
                                         + "    FIRST(id) as firstId,\n"
                                         + "    prefix(A.name) as 
prefixedNameA,\n"

Reply via email to