This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f22c50aab1f [FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in
batch mode (#24699)
f22c50aab1f is described below
commit f22c50aab1f0efe59c3c7a1d944fce79b4edd54d
Author: Grzegorz KoĊakowski <[email protected]>
AuthorDate: Mon Nov 4 09:14:31 2024 +0100
[FLINK-33722] Fix events ordering in MATCH_RECOGNIZE in batch mode (#24699)
---
.../plan/nodes/exec/batch/BatchExecMatch.java | 72 ++-
.../plan/nodes/exec/common/CommonExecMatch.java | 24 +-
.../plan/nodes/exec/stream/StreamExecMatch.java | 16 +-
.../runtime/batch/sql/MatchRecognizeITCase.java | 495 +++++++++++++++++----
4 files changed, 513 insertions(+), 94 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
index 4f02be5f91c..ec48153d02f 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMatch.java
@@ -18,22 +18,36 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import
org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecMatch;
import org.apache.flink.table.planner.plan.nodes.exec.spec.MatchSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.util.Collections;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
+
/** Batch {@link ExecNode} which matches along with MATCH_RECOGNIZE. */
public class BatchExecMatch extends CommonExecMatch
implements BatchExecNode<RowData>,
MultipleTransformationTranslator<RowData> {
+ public static final String TIMESTAMP_INSERTER_TRANSFORMATION =
"timestamp-inserter";
+
public BatchExecMatch(
ReadableConfig tableConfig,
MatchSpec matchSpec,
@@ -51,7 +65,61 @@ public class BatchExecMatch extends CommonExecMatch
}
@Override
- public boolean isProcTime(RowType inputRowType) {
- return true;
+ public void checkOrderKeys(RowType inputRowType) {
+ SortSpec orderKeys = matchSpec.getOrderKeys();
+ if (orderKeys.getFieldSize() == 0) {
+ throw new TableException("You must specify non-empty order by.");
+ }
+
+ SortSpec.SortFieldSpec timeOrderField = orderKeys.getFieldSpec(0);
+ int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+ LogicalType timeOrderFieldType =
inputRowType.getTypeAt(timeOrderFieldIdx);
+
+ if (!TypeCheckUtils.isTimePoint(timeOrderFieldType)) {
+ throw new TableException("You must specify time point for order by
as the first one.");
+ }
+
+ // time ordering needs to be ascending
+ if (!orderKeys.getAscendingOrders()[0]) {
+ throw new TableException("Primary sort order of a table must be
ascending on time.");
+ }
+ }
+
+ @Override
+ protected Transformation<RowData> translateOrder(
+ PlannerBase planner,
+ Transformation<RowData> inputTransform,
+ RowType inputRowType,
+ ExecEdge inputEdge,
+ ExecNodeConfig config) {
+ if (isProcTime(inputRowType)) {
+ // In proctime process records in the order they come.
+ return inputTransform;
+ }
+
+ // copy the timestamp field from order by clause into the StreamRecord
timestamp field
+ SortSpec.SortFieldSpec timeOrderField =
matchSpec.getOrderKeys().getFieldSpec(0);
+ int timeOrderFieldIdx = timeOrderField.getFieldIndex();
+ LogicalType timeOrderFieldType =
inputRowType.getTypeAt(timeOrderFieldIdx);
+ int precision = getPrecision(timeOrderFieldType);
+ Transformation<RowData> transform =
+ ExecNodeUtil.createOneInputTransformation(
+ inputTransform,
+ createTransformationMeta(
+ TIMESTAMP_INSERTER_TRANSFORMATION,
+ String.format(
+
"BatchRecordTimestampInserter(timestamp field: %s)",
+ timeOrderFieldIdx),
+ "BatchRecordTimestampInserter",
+ config),
+ new StreamRecordTimestampInserter(timeOrderFieldIdx,
precision),
+ inputTransform.getOutputType(),
+ inputTransform.getParallelism(),
+ false);
+ if (inputsContainSingleton()) {
+ transform.setParallelism(1);
+ transform.setMaxParallelism(1);
+ }
+ return transform;
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
index ad94053ebc0..c2ab5c84539 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecMatch.java
@@ -58,6 +58,7 @@ import
org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import
org.apache.flink.table.runtime.operators.match.PatternProcessFunctionRunner;
import org.apache.flink.table.runtime.operators.match.RowDataEventComparator;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.MathUtils;
@@ -86,7 +87,7 @@ public abstract class CommonExecMatch extends
ExecNodeBase<RowData>
public static final String MATCH_TRANSFORMATION = "match";
- private final MatchSpec matchSpec;
+ protected final MatchSpec matchSpec;
public CommonExecMatch(
int id,
@@ -115,7 +116,7 @@ public abstract class CommonExecMatch extends
ExecNodeBase<RowData>
createEventComparator(
config, planner.getFlinkContext().getClassLoader(),
inputRowType);
final Transformation<RowData> timestampedInputTransform =
- translateOrder(inputTransform, inputRowType, config);
+ translateOrder(planner, inputTransform, inputRowType,
inputEdge, config);
final Tuple2<Pattern<RowData, RowData>, List<String>>
cepPatternAndNames =
translatePattern(
@@ -200,7 +201,7 @@ public abstract class CommonExecMatch extends
ExecNodeBase<RowData>
return transform;
}
- protected void checkOrderKeys(RowType inputRowType) {}
+ protected abstract void checkOrderKeys(RowType inputRowType);
private EventComparator<RowData> createEventComparator(
ExecNodeConfig config, ClassLoader classLoader, RowType
inputRowType) {
@@ -215,10 +216,12 @@ public abstract class CommonExecMatch extends
ExecNodeBase<RowData>
}
}
- protected Transformation<RowData> translateOrder(
- Transformation<RowData> inputTransform, RowType inputRowType,
ExecNodeConfig config) {
- return inputTransform;
- }
+ protected abstract Transformation<RowData> translateOrder(
+ PlannerBase planner,
+ Transformation<RowData> inputTransform,
+ RowType inputRowType,
+ ExecEdge inputEdge,
+ ExecNodeConfig config);
@VisibleForTesting
public static Tuple2<Pattern<RowData, RowData>, List<String>>
translatePattern(
@@ -251,7 +254,12 @@ public abstract class CommonExecMatch extends
ExecNodeBase<RowData>
"Only constant intervals with millisecond resolution are
supported as time constraints of patterns.");
}
- public abstract boolean isProcTime(RowType inputRowType);
+ public boolean isProcTime(RowType inputRowType) {
+ final SortSpec.SortFieldSpec timeOrderField =
matchSpec.getOrderKeys().getFieldSpec(0);
+ final LogicalType timeOrderFieldType =
+ inputRowType.getTypeAt(timeOrderField.getFieldIndex());
+ return TypeCheckUtils.isProcTime(timeOrderFieldType);
+ }
/** The visitor to traverse the pattern RexNode. */
private static class PatternVisitor extends
RexDefaultVisitor<Pattern<RowData, RowData>> {
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 2a204e41ff3..464a36dbb29 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -121,7 +123,11 @@ public class StreamExecMatch extends CommonExecMatch
@Override
public Transformation<RowData> translateOrder(
- Transformation<RowData> inputTransform, RowType inputRowType,
ExecNodeConfig config) {
+ PlannerBase planner,
+ Transformation<RowData> inputTransform,
+ RowType inputRowType,
+ ExecEdge inputEdge,
+ ExecNodeConfig config) {
SortSpec.SortFieldSpec timeOrderField =
matchSpec.getOrderKeys().getFieldSpec(0);
int timeOrderFieldIdx = timeOrderField.getFieldIndex();
LogicalType timeOrderFieldType =
inputRowType.getTypeAt(timeOrderFieldIdx);
@@ -152,12 +158,4 @@ public class StreamExecMatch extends CommonExecMatch
return inputTransform;
}
}
-
- @Override
- public boolean isProcTime(RowType inputRowType) {
- final SortSpec.SortFieldSpec timeOrderField =
matchSpec.getOrderKeys().getFieldSpec(0);
- final LogicalType timeOrderFieldType =
- inputRowType.getTypeAt(timeOrderField.getFieldIndex());
- return TypeCheckUtils.isProcTime(timeOrderFieldType);
- }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
index 8efc072adfa..5aae60f5279 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
@@ -38,11 +38,16 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
import java.util.Comparator;
import java.util.List;
import static org.apache.flink.api.common.typeinfo.Types.DOUBLE;
+import static org.apache.flink.api.common.typeinfo.Types.INSTANT;
import static org.apache.flink.api.common.typeinfo.Types.INT;
+import static org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME;
import static org.apache.flink.api.common.typeinfo.Types.LONG;
import static org.apache.flink.api.common.typeinfo.Types.ROW_NAMED;
import static org.apache.flink.api.common.typeinfo.Types.STRING;
@@ -65,7 +70,7 @@ class MatchRecognizeITCase {
}
@Test
- void testSimplePattern() {
+ void testSimplePatternInProcTime() {
tEnv.createTemporaryView(
"MyTable",
tEnv.fromDataStream(
@@ -105,39 +110,137 @@ class MatchRecognizeITCase {
.containsExactly(Row.of(6, 7, 8));
}
+ @Test
+ void testSimplePatternInEventTime() {
+ Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromData(
+ Row.of(1, "a", now.plusSeconds(1)),
+ Row.of(2, "z", now.plusSeconds(2)),
+ Row.of(3, "b", now.plusSeconds(3)),
+ Row.of(4, "c", now.plusSeconds(4)),
+ Row.of(5, "d", now.plusSeconds(5)),
+ Row.of(6, "a", now.plusSeconds(6)),
+ Row.of(7, "b", now.plusSeconds(7)),
+ Row.of(8, "c", now.plusSeconds(8)),
+ Row.of(9, "h", now.plusSeconds(9)))
+ .returns(
+ ROW_NAMED(
+ new String[] {"id", "name",
"ts"},
+ INT,
+ STRING,
+ INSTANT)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("ts", DataTypes.TIMESTAMP_LTZ(3))
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT T.aid, T.bid, T.cid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " `A\"`.id AS aid,\n"
+ + " \u006C.id AS bid,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (`A\"` \u006C C)\n"
+ + " DEFINE\n"
+ + " `A\"` AS name = 'a',\n"
+ + " \u006C AS name = 'b',\n"
+ + " C AS name = 'c'\n"
+ + ") AS T");
+ assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+ .containsExactly(Row.of(6, 7, 8));
+ }
+
+ @Test
+ void testTimeConstraint() {
+ LocalDateTime now = LocalDateTime.parse("2023-12-01T12:00:00.000");
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromData(
+ Row.of(1, "z", now.plusSeconds(1)),
+ // records 2, 3, 4 arrive within 58
seconds --> matched
+ Row.of(2, "a", now.plusSeconds(2)),
+ Row.of(3, "b", now.plusSeconds(30)),
+ Row.of(4, "c", now.plusSeconds(60)),
+ Row.of(5, "x", now.plusSeconds(100)),
+ // records 6, 7, 8 arrive within 61 ->
not matched
+ Row.of(6, "a", now.plusSeconds(101)),
+ Row.of(7, "b", now.plusSeconds(131)),
+ Row.of(8, "c", now.plusSeconds(162)),
+ Row.of(9, "z", now.plusSeconds(200)))
+ .returns(
+ ROW_NAMED(
+ new String[] {"id", "name",
"ts"},
+ INT,
+ STRING,
+ LOCAL_DATE_TIME)),
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT T.aid, T.bid, T.cid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " B.id AS bid,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (A B C) WITHIN INTERVAL '1'
MINUTE\n"
+ + " DEFINE\n"
+ + " A AS name = 'a',\n"
+ + " B AS name = 'b',\n"
+ + " C AS name = 'c'\n"
+ + ") AS T");
+ assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+ .containsExactly(Row.of(2, 3, 4));
+ }
+
@Test
void testSimplePatternWithNulls() {
+ Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
tEnv.createTemporaryView(
"MyTable",
tEnv.fromDataStream(
env.fromData(
- Row.of(1, "a", null),
- Row.of(2, "b", null),
- Row.of(3, "c", null),
- Row.of(4, "d", null),
- Row.of(5, null, null),
- Row.of(6, "a", null),
- Row.of(7, "b", null),
- Row.of(8, "c", null),
- Row.of(9, null, null))
+ Row.of(1, "a", null,
now.plusSeconds(1)),
+ Row.of(2, "b", null,
now.plusSeconds(2)),
+ Row.of(3, "c", null,
now.plusSeconds(3)),
+ Row.of(4, "d", null,
now.plusSeconds(4)),
+ Row.of(5, null, null,
now.plusSeconds(5)),
+ Row.of(6, "a", null,
now.plusSeconds(6)),
+ Row.of(7, "b", null,
now.plusSeconds(7)),
+ Row.of(8, "c", null,
now.plusSeconds(8)),
+ Row.of(9, null, null,
now.plusSeconds(9)))
.returns(
ROW_NAMED(
- new String[] {"id", "name",
"nullField"},
+ new String[] {"id", "name",
"nullField", "ts"},
INT,
STRING,
- STRING)),
+ STRING,
+ INSTANT)),
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("nullField", DataTypes.STRING())
- .columnByExpression("proctime", "PROCTIME()")
+ .column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
TableResult tableResult =
tEnv.executeSql(
"SELECT T.aid, T.bNull, T.cid, T.aNull\n"
+ "FROM MyTable\n"
+ "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
+ + " ORDER BY ts\n"
+ " MEASURES\n"
+ " A.id AS aid,\n"
+ " A.nullField AS aNull,\n"
@@ -155,33 +258,35 @@ class MatchRecognizeITCase {
@Test
void testCodeSplitsAreProperlyGenerated() {
+ Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
tEnv.getConfig().setMaxGeneratedCodeLength(1);
tEnv.createTemporaryView(
"MyTable",
tEnv.fromDataStream(
env.fromData(
- Row.of(1, "a", "key1", "second_key3"),
- Row.of(2, "b", "key1", "second_key3"),
- Row.of(3, "c", "key1", "second_key3"),
- Row.of(4, "d", "key", "second_key"),
- Row.of(5, "e", "key", "second_key"),
- Row.of(6, "a", "key2", "second_key4"),
- Row.of(7, "b", "key2", "second_key4"),
- Row.of(8, "c", "key2", "second_key4"),
- Row.of(9, "f", "key", "second_key"))
+ Row.of(1, "a", "key1", "second_key3",
now.plusSeconds(1)),
+ Row.of(2, "b", "key1", "second_key3",
now.plusSeconds(2)),
+ Row.of(3, "c", "key1", "second_key3",
now.plusSeconds(3)),
+ Row.of(4, "d", "key", "second_key",
now.plusSeconds(4)),
+ Row.of(5, "e", "key", "second_key",
now.plusSeconds(5)),
+ Row.of(6, "a", "key2", "second_key4",
now.plusSeconds(6)),
+ Row.of(7, "b", "key2", "second_key4",
now.plusSeconds(7)),
+ Row.of(8, "c", "key2", "second_key4",
now.plusSeconds(8)),
+ Row.of(9, "f", "key", "second_key",
now.plusSeconds(9)))
.returns(
ROW_NAMED(
- new String[] {"id", "name",
"key1", "key2"},
+ new String[] {"id", "name",
"key1", "key2", "ts"},
INT,
STRING,
STRING,
- STRING)),
+ STRING,
+ INSTANT)),
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("key1", DataTypes.STRING())
.column("key2", DataTypes.STRING())
- .columnByExpression("proctime", "PROCTIME()")
+ .column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
TableResult tableResult =
tEnv.executeSql(
@@ -189,7 +294,7 @@ class MatchRecognizeITCase {
+ "FROM MyTable\n"
+ "MATCH_RECOGNIZE (\n"
+ " PARTITION BY key1, key2\n"
- + " ORDER BY proctime\n"
+ + " ORDER BY ts\n"
+ " MEASURES\n"
+ " A.id AS aid,\n"
+ " A.key1 AS akey1,\n"
@@ -210,6 +315,193 @@ class MatchRecognizeITCase {
Row.of("key2", "second_key4", 6, "key2", 7, 8,
"second_key4"));
}
+ @Test
+ void testEventsAreProperlyOrdered() {
+ LocalDateTime epoch = LocalDateTime.ofEpochSecond(2L, 0,
ZoneOffset.UTC);
+ tEnv.createTemporaryView(
+ "MyTable",
+ tEnv.fromDataStream(
+ env.fromData(
+ // event time order breaks this match
+ Row.of(epoch.plusSeconds(2), 12, 1,
"a", 1),
+ Row.of(epoch.plusSeconds(1L), 11, 2,
"b", 2),
+ Row.of(epoch.plusSeconds(3L), 10, 3,
"c", 3),
+ // secondary order breaks this match
+ Row.of(epoch.plusSeconds(4L), 8, 4,
"a", 4),
+ Row.of(epoch.plusSeconds(4L), 9, 5,
"b", 5),
+ Row.of(epoch.plusSeconds(5L), 7, 6,
"c", 6),
+ // ternary order breaks this match
+ Row.of(epoch.plusSeconds(6L), 6, 8,
"a", 7),
+ Row.of(epoch.plusSeconds(6L), 6, 7,
"b", 8),
+ Row.of(epoch.plusSeconds(8L), 4, 9,
"c", 9),
+ // match
+ Row.of(epoch.plusSeconds(9L), 3, 10,
"a", 10),
+ Row.of(epoch.plusSeconds(10L), 2, 11,
"b", 11),
+ Row.of(epoch.plusSeconds(11L), 1, 12,
"c", 12))
+ .returns(
+ ROW_NAMED(
+ new String[] {
+ "ts",
+ "secondaryOrder",
+ "ternaryOrder",
+ "name",
+ "id"
+ },
+ LOCAL_DATE_TIME,
+ INT,
+ INT,
+ STRING,
+ INT)),
+ Schema.newBuilder()
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .column("secondaryOrder", DataTypes.INT())
+ .column("ternaryOrder", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("id", DataTypes.INT())
+ .build()));
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT T.aid, T.bid, T.cid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts, secondaryOrder DESC,
ternaryOrder ASC\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " B.id AS bid,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (A B C)\n"
+ + " DEFINE\n"
+ + " A AS name = 'a',\n"
+ + " B AS name = 'b',\n"
+ + " C AS name = 'c'\n"
+ + ") AS T");
+ assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+ .containsExactly(Row.of(10, 11, 12));
+ }
+
+ @Test
+ void testMatchRecognizeAppliedToWindowedGrouping() {
+ LocalDateTime now = LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC);
+ tEnv.createTemporaryView(
+ "Ticker",
+ tEnv.fromDataStream(
+ env.fromData(
+ // first window
+ Row.of("ACME", now.plusSeconds(1), 1,
1),
+ Row.of("ACME", now.plusSeconds(2), 2,
2),
+ // second window
+ Row.of("ACME", now.plusSeconds(4), 1,
4),
+ Row.of("ACME", now.plusSeconds(5), 1,
3),
+ // third window
+ Row.of("ACME", now.plusSeconds(7), 2,
3),
+ Row.of("ACME", now.plusSeconds(8), 2,
3),
+ Row.of("ACME", now.plusSeconds(1), 20,
4),
+ Row.of("ACME", now.plusSeconds(1), 24,
4),
+ Row.of("ACME", now.plusSeconds(1), 25,
3),
+ Row.of("ACME", now.plusSeconds(1), 19,
8))
+ .returns(
+ ROW_NAMED(
+ new String[] {"symbol", "ts",
"price", "tax"},
+ STRING,
+ LOCAL_DATE_TIME,
+ INT,
+ INT)),
+ Schema.newBuilder()
+ .column("symbol", DataTypes.STRING())
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .column("price", DataTypes.INT())
+ .column("tax", DataTypes.INT())
+ .build()));
+
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT *\n"
+ + "FROM (\n"
+ + " SELECT\n"
+ + " symbol,\n"
+ + " SUM(price) as price,\n"
+ + " TUMBLE_ROWTIME(ts, interval '3'
second) as rowTime,\n"
+ + " TUMBLE_START(ts, interval '3' second)
as startTime\n"
+ + " FROM Ticker\n"
+ + " GROUP BY symbol, TUMBLE(ts, interval '3'
second)\n"
+ + ")\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol\n"
+ + " ORDER BY rowTime\n"
+ + " MEASURES\n"
+ + " B.price as dPrice,\n"
+ + " B.startTime as dTime\n"
+ + " ONE ROW PER MATCH\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " B AS B.price < A.price\n"
+ + ")");
+ assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+ .containsExactly(Row.of("ACME", 2, now.plusSeconds(3)));
+ }
+
+ @Test
+ void testWindowedGroupingAppliedToMatchRecognize() {
+ LocalDateTime epoch = LocalDateTime.ofEpochSecond(0, 0,
ZoneOffset.UTC);
+ tEnv.createTemporaryView(
+ "Ticker",
+ tEnv.fromDataStream(
+ env.fromData(
+ // first window
+ Row.of("ACME", epoch.plusSeconds(1),
1, 1),
+ Row.of("ACME", epoch.plusSeconds(2),
2, 2),
+ // second window
+ Row.of("ACME", epoch.plusSeconds(4),
1, 4),
+ Row.of("ACME", epoch.plusSeconds(5),
1, 3))
+ .returns(
+ ROW_NAMED(
+ new String[] {"symbol", "ts",
"price", "tax"},
+ STRING,
+ LOCAL_DATE_TIME,
+ INT,
+ INT)),
+ Schema.newBuilder()
+ .column("symbol", DataTypes.STRING())
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .column("price", DataTypes.INT())
+ .column("tax", DataTypes.INT())
+ .build()));
+
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT\n"
+ + " symbol,\n"
+ + " SUM(price) as price,\n"
+ + " TUMBLE_ROWTIME(matchRowtime, interval '3'
second) as rowTime,\n"
+ + " TUMBLE_START(matchRowtime, interval '3'
second) as startTime\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " A.price as price,\n"
+ + " A.tax as tax,\n"
+ + " MATCH_ROWTIME() as matchRowtime\n"
+ + " ONE ROW PER MATCH\n"
+ + " PATTERN (A)\n"
+ + " DEFINE\n"
+ + " A AS A.price > 0\n"
+ + ") AS T\n"
+ + "GROUP BY symbol, TUMBLE(matchRowtime,
interval '3' second)");
+ assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+ .containsExactly(
+ Row.of(
+ "ACME",
+ 3,
+ LocalDateTime.parse("1970-01-01T00:00:02.999"),
+ LocalDateTime.parse("1970-01-01T00:00")),
+ Row.of(
+ "ACME",
+ 2,
+ LocalDateTime.parse("1970-01-01T00:00:05.999"),
+ LocalDateTime.parse("1970-01-01T00:00:03")));
+ }
+
@Test
void testLogicalOffsets() {
tEnv.createTemporaryView(
@@ -236,14 +528,14 @@ class MatchRecognizeITCase {
.column("tstamp", DataTypes.BIGINT())
.column("price", DataTypes.INT())
.column("tax", DataTypes.INT())
- .columnByExpression("proctime", "PROCTIME()")
+ .columnByExpression("ts",
"TO_TIMESTAMP_LTZ(tstamp, 3)")
.build()));
TableResult tableResult =
tEnv.executeSql(
"SELECT *\n"
+ "FROM Ticker\n"
+ "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
+ + " ORDER BY ts\n"
+ " MEASURES\n"
+ " FIRST(DOWN.tstamp) AS start_tstamp,\n"
+ " LAST(DOWN.tstamp) AS bottom_tstamp,\n"
@@ -261,6 +553,53 @@ class MatchRecognizeITCase {
.containsExactly(Row.of(6L, 7L, 8L, 33, 33));
}
+ @Test
+ void testPartitionByWithParallelSource() {
+ LocalDateTime epoch = LocalDateTime.ofEpochSecond(0, 0,
ZoneOffset.UTC);
+ tEnv.createTemporaryView(
+ "Ticker",
+ tEnv.fromDataStream(
+ env.fromData(
+ Row.of("ACME", epoch.plusSeconds(1),
19, 1),
+ Row.of("ACME", epoch.plusSeconds(2),
17, 2),
+ Row.of("ACME", epoch.plusSeconds(3),
13, 3),
+ Row.of("ACME", epoch.plusSeconds(4),
20, 4))
+ .returns(
+ ROW_NAMED(
+ new String[] {"symbol", "ts",
"price", "tax"},
+ STRING,
+ LOCAL_DATE_TIME,
+ INT,
+ INT))
+ .setParallelism(env.getParallelism()),
+ Schema.newBuilder()
+ .column("symbol", DataTypes.STRING())
+ .column("ts", DataTypes.TIMESTAMP(3))
+ .column("price", DataTypes.INT())
+ .column("tax", DataTypes.INT())
+ .build()));
+
+ TableResult tableResult =
+ tEnv.executeSql(
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " DOWN.tax AS bottom_tax,\n"
+ + " UP.tax AS end_tax\n"
+ + " ONE ROW PER MATCH\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (DOWN UP)\n"
+ + " DEFINE\n"
+ + " DOWN AS DOWN.price = 13,\n"
+ + " UP AS UP.price = 20\n"
+ + ") AS T");
+ assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
+ .containsExactly(Row.of("ACME", 3, 4));
+ }
+
@Test
void testLogicalOffsetsWithStarVariable() {
tEnv.createTemporaryView(
@@ -287,14 +626,14 @@ class MatchRecognizeITCase {
.column("symbol", DataTypes.STRING())
.column("tstamp", DataTypes.BIGINT())
.column("price", DataTypes.INT())
- .columnByExpression("proctime", "PROCTIME()")
+ .columnByExpression("ts",
"TO_TIMESTAMP_LTZ(tstamp, 3)")
.build()));
TableResult tableResult =
tEnv.executeSql(
"SELECT *\n"
+ "FROM Ticker\n"
+ "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
+ + " ORDER BY ts\n"
+ " MEASURES\n"
+ " FIRST(id, 0) as id0,\n"
+ " FIRST(id, 1) as id1,\n"
@@ -345,14 +684,14 @@ class MatchRecognizeITCase {
.column("tstamp", DataTypes.BIGINT())
.column("price", DataTypes.INT())
.column("tax", DataTypes.INT())
- .columnByExpression("proctime", "PROCTIME()")
+ .columnByExpression("ts",
"TO_TIMESTAMP_LTZ(tstamp, 3)")
.build()));
TableResult tableResult =
tEnv.executeSql(
"SELECT *\n"
+ "FROM Ticker\n"
+ "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
+ + " ORDER BY ts\n"
+ " MEASURES\n"
+ " FIRST(DOWN.price) as first,\n"
+ " LAST(DOWN.price) as last,\n"
@@ -377,40 +716,42 @@ class MatchRecognizeITCase {
*/
@Test
void testAggregates() {
+ Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
tEnv.getConfig().setMaxGeneratedCodeLength(1);
tEnv.createTemporaryView(
"MyTable",
tEnv.fromDataStream(
env.fromData(
- Row.of(1, "a", 1, 0.8, 1),
- Row.of(2, "z", 2, 0.8, 3),
- Row.of(3, "b", 1, 0.8, 2),
- Row.of(4, "c", 1, 0.8, 5),
- Row.of(5, "d", 4, 0.1, 5),
- Row.of(6, "a", 2, 1.5, 2),
- Row.of(7, "b", 2, 0.8, 3),
- Row.of(8, "c", 1, 0.8, 2),
- Row.of(9, "h", 4, 0.8, 3),
- Row.of(10, "h", 4, 0.8, 3),
- Row.of(11, "h", 2, 0.8, 3),
- Row.of(12, "h", 2, 0.8, 3))
+ Row.of(1, "a", 1, 0.8, 1,
now.plusSeconds(1)),
+ Row.of(2, "z", 2, 0.8, 3,
now.plusSeconds(2)),
+ Row.of(3, "b", 1, 0.8, 2,
now.plusSeconds(3)),
+ Row.of(4, "c", 1, 0.8, 5,
now.plusSeconds(4)),
+ Row.of(5, "d", 4, 0.1, 5,
now.plusSeconds(5)),
+ Row.of(6, "a", 2, 1.5, 2,
now.plusSeconds(6)),
+ Row.of(7, "b", 2, 0.8, 3,
now.plusSeconds(7)),
+ Row.of(8, "c", 1, 0.8, 2,
now.plusSeconds(8)),
+ Row.of(9, "h", 4, 0.8, 3,
now.plusSeconds(9)),
+ Row.of(10, "h", 4, 0.8, 3,
now.plusSeconds(10)),
+ Row.of(11, "h", 2, 0.8, 3,
now.plusSeconds(11)),
+ Row.of(12, "h", 2, 0.8, 3,
now.plusSeconds(12)))
.returns(
ROW_NAMED(
new String[] {
- "id", "name", "price",
"rate", "weight"
+ "id", "name", "price",
"rate", "weight", "ts"
},
INT,
STRING,
INT,
DOUBLE,
- INT)),
+ INT,
+ INSTANT)),
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("price", DataTypes.INT())
.column("rate", DataTypes.DOUBLE())
.column("weight", DataTypes.INT())
- .columnByExpression("proctime", "PROCTIME()")
+ .column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
TableResult tableResult =
@@ -418,7 +759,7 @@ class MatchRecognizeITCase {
"SELECT *\n"
+ "FROM MyTable\n"
+ "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
+ + " ORDER BY ts\n"
+ " MEASURES\n"
+ " FIRST(id) as startId,\n"
+ " SUM(A.price) AS sumA,\n"
@@ -446,31 +787,33 @@ class MatchRecognizeITCase {
@Test
void testAggregatesWithNullInputs() {
+ Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
tEnv.getConfig().setMaxGeneratedCodeLength(1);
tEnv.createTemporaryView(
"MyTable",
tEnv.fromDataStream(
env.fromData(
- Row.of(1, "a", 10),
- Row.of(2, "z", 10),
- Row.of(3, "b", null),
- Row.of(4, "c", null),
- Row.of(5, "d", 3),
- Row.of(6, "c", 3),
- Row.of(7, "c", 3),
- Row.of(8, "c", 3),
- Row.of(9, "c", 2))
+ Row.of(1, "a", 10, now.plusSeconds(1)),
+ Row.of(2, "z", 10, now.plusSeconds(2)),
+ Row.of(3, "b", null,
now.plusSeconds(3)),
+ Row.of(4, "c", null,
now.plusSeconds(4)),
+ Row.of(5, "d", 3, now.plusSeconds(5)),
+ Row.of(6, "c", 3, now.plusSeconds(6)),
+ Row.of(7, "c", 3, now.plusSeconds(7)),
+ Row.of(8, "c", 3, now.plusSeconds(8)),
+ Row.of(9, "c", 2, now.plusSeconds(9)))
.returns(
ROW_NAMED(
- new String[] {"id", "name",
"price"},
+ new String[] {"id", "name",
"price", "ts"},
INT,
STRING,
- INT)),
+ INT,
+ INSTANT)),
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("price", DataTypes.INT())
- .columnByExpression("proctime", "PROCTIME()")
+ .column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
TableResult tableResult =
@@ -478,7 +821,7 @@ class MatchRecognizeITCase {
"SELECT *\n"
+ "FROM MyTable\n"
+ "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
+ + " ORDER BY ts\n"
+ " MEASURES\n"
+ " SUM(A.price) as sumA,\n"
+ " COUNT(A.id) as countAId,\n"
@@ -527,31 +870,33 @@ class MatchRecognizeITCase {
@Test
void testUserDefinedFunctions() {
+ Instant now = Instant.parse("2023-12-01T12:00:00.000Z");
tEnv.getConfig().setMaxGeneratedCodeLength(1);
tEnv.createTemporaryView(
"MyTable",
tEnv.fromDataStream(
env.fromData(
- Row.of(1, "a", 1),
- Row.of(2, "a", 1),
- Row.of(3, "a", 1),
- Row.of(4, "a", 1),
- Row.of(5, "a", 1),
- Row.of(6, "b", 1),
- Row.of(7, "a", 1),
- Row.of(8, "a", 1),
- Row.of(9, "f", 1))
+ Row.of(1, "a", 1, now.plusSeconds(1)),
+ Row.of(2, "a", 1, now.plusSeconds(2)),
+ Row.of(3, "a", 1, now.plusSeconds(3)),
+ Row.of(4, "a", 1, now.plusSeconds(4)),
+ Row.of(5, "a", 1, now.plusSeconds(5)),
+ Row.of(6, "b", 1, now.plusSeconds(6)),
+ Row.of(7, "a", 1, now.plusSeconds(7)),
+ Row.of(8, "a", 1, now.plusSeconds(8)),
+ Row.of(9, "f", 1, now.plusSeconds(9)))
.returns(
ROW_NAMED(
- new String[] {"id", "name",
"price"},
+ new String[] {"id", "name",
"price", "ts"},
INT,
STRING,
- INT)),
+ INT,
+ INSTANT)),
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("price", DataTypes.INT())
- .columnByExpression("proctime", "PROCTIME()")
+ .column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
tEnv.createTemporarySystemFunction("prefix", new
PrefixingScalarFunc());
tEnv.createTemporarySystemFunction("countFrom", new RichAggFunc());
@@ -567,7 +912,7 @@ class MatchRecognizeITCase {
"SELECT *\n"
+ "FROM MyTable\n"
+ "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
+ + " ORDER BY ts\n"
+ " MEASURES\n"
+ " FIRST(id) as firstId,\n"
+ " prefix(A.name) as
prefixedNameA,\n"