This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.3 by this push:
new 4d0480952b1 [FLINK-39293][table] `MATCH_RECOGNIZE` fails with
`SqlParserException` in views
4d0480952b1 is described below
commit 4d0480952b16b3a519066d0b3e74b4ff02c255f6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Apr 24 16:52:02 2026 +0200
[FLINK-39293][table] `MATCH_RECOGNIZE` fails with `SqlParserException` in
views
---
.../src/main/codegen/templates/Parser.jj | 23 +-
.../calcite/sql/validate/SqlValidatorImpl.java | 81 ++-
.../runtime/batch/sql/MatchRecognizeITCase.java | 673 +++++++++++----------
3 files changed, 414 insertions(+), 363 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
index 1bb939a46fa..a9802fd631b 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
+++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
@@ -2202,7 +2202,10 @@ SqlNode TableRef3(ExprContext exprContext, boolean
lateral) :
[ tableRef = ExtendTable(tableRef) ]
tableRef = Over(tableRef)
[ tableRef = Snapshot(tableRef) ]
- [ tableRef = MatchRecognize(tableRef) ]
+ [
+ LOOKAHEAD(3)
+ tableRef = MatchRecognize(tableRef)
+ ]
)
|
LOOKAHEAD(2)
@@ -2210,7 +2213,10 @@ SqlNode TableRef3(ExprContext exprContext, boolean
lateral) :
tableRef = ParenthesizedExpression(exprContext)
tableRef = Over(tableRef)
tableRef = addLateral(tableRef, lateral)
- [ tableRef = MatchRecognize(tableRef) ]
+ [
+ LOOKAHEAD(3)
+ tableRef = MatchRecognize(tableRef)
+ ]
|
LOOKAHEAD(2)
[ <LATERAL> ] // "LATERAL" is implicit with "UNNEST", so ignore
@@ -3059,6 +3065,7 @@ void AddUnpivotValue(List<SqlNode> list) :
SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
{
final Span s, s0, s1, s2;
+ final SqlIdentifier aliasBeforeMatch;
final SqlNodeList measureList;
final SqlNodeList partitionList;
final SqlNodeList orderList;
@@ -3073,6 +3080,12 @@ SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
final SqlLiteral isStrictEnds;
}
{
+ [
+ <AS> aliasBeforeMatch = SimpleIdentifier() {
+ tableRef = SqlStdOperatorTable.AS.createCall(
+ Span.of(tableRef).end(this), tableRef, aliasBeforeMatch);
+ }
+ ]
<MATCH_RECOGNIZE> { s = span(); checkNotJoin(tableRef); } <LPAREN>
(
<PARTITION> { s2 = span(); } <BY>
@@ -7210,7 +7223,7 @@ SqlCall MatchRecognizeCallWithModifier() :
{
final Span s;
final SqlOperator runningOp;
- final SqlNode func;
+ final SqlNode e;
}
{
(
@@ -7219,8 +7232,8 @@ SqlCall MatchRecognizeCallWithModifier() :
<FINAL> { runningOp = SqlStdOperatorTable.FINAL; }
)
{ s = span(); }
- func = NamedFunctionCall() {
- return runningOp.createCall(s.end(func), func);
+ e = Expression3(ExprContext.ACCEPT_NON_QUERY) {
+ return runningOp.createCall(s.end(e), e);
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 6635637888a..6ef58067ced 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -169,23 +169,35 @@ import static org.apache.calcite.util.Util.first;
* Default implementation of {@link SqlValidator}, the class was copied over
because of
* CALCITE-4554.
*
- * <p>Lines 207 ~ 210, Flink improves error message for functions without
appropriate arguments in
+ * <p>Lines 219 ~ 222, Flink improves error message for functions without
appropriate arguments in
* handleUnresolvedFunction.
*
- * <p>Lines 1275 ~ 1277, CALCITE-7217, should be removed after upgrading
Calcite to 1.41.0.
+ * <p>Lines 1287 ~ 1289, CALCITE-7217, should be removed after upgrading
Calcite to 1.41.0.
*
- * <p>Lines 2036 ~ 2050, Flink improves error message for functions without
appropriate arguments in
+ * <p>Lines 2048 ~ 2062, Flink improves error message for functions without
appropriate arguments in
* handleUnresolvedFunction at {@link
SqlValidatorImpl#handleUnresolvedFunction}.
*
- * <p>Lines 2576 ~ 2595, CALCITE-7217, CALCITE-7312 should be removed after
upgrading Calcite to
+ * <p>Lines 2475 ~ 2477, CALCITE-7471 should be removed after upgrading
Calcite to 1.42.0.
+ *
+ * <p>Lines 2590 ~ 2609, CALCITE-7217, CALCITE-7312 should be removed after
upgrading Calcite to
* 1.42.0.
*
- * <p>Line 2626 ~2644, set the correct scope for VECTOR_SEARCH.
+ * <p>Line 2640 ~2658, set the correct scope for VECTOR_SEARCH.
*
- * <p>Lines 3923 ~ 3927, 6602 ~ 6608 Flink improves Optimize the retrieval of
sub-operands in
+ * <p>Lines 3937 ~ 3941, 6612 ~ 6618 Flink improves Optimize the retrieval of
sub-operands in
* SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
*
- * <p>Lines 5343 ~ 5349, FLINK-24352 Add null check for temporal table check
on SqlSnapshot.
+ * <p>Lines 5357 ~ 5363, FLINK-24352 Add null check for temporal table check
on SqlSnapshot.
+ *
+ * <p>Lines 5784-5786, CALCITE-7466 should be removed after upgrading Calcite
to 1.42.0.
+ *
+ * <p>Lines 5840-5842, CALCITE-7470 should be removed after upgrading Calcite
to 1.42.0.
+ *
+ * <p>Lines 7267-7290, CALCITE-7486 should be removed after upgrading Calcite
to 1.42.0.
+ *
+ * <p>Lines 7337-7354, CALCITE-7486 should be removed after upgrading Calcite
to 1.42.0.
+ *
+ * <p>Lines 7399-7407, CALCITE-7486 should be removed after upgrading Calcite
to 1.42.0.
*/
public class SqlValidatorImpl implements SqlValidatorWithHints {
// ~ Static fields/initializers
---------------------------------------------
@@ -2460,7 +2472,9 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
enclosingNode,
alias,
forceNullable);
- return node;
+ // ----- FLINK MODIFICATION BEGIN -----
+ return newNode;
+ // ----- FLINK MODIFICATION END -----
case PIVOT:
registerPivot(
@@ -5767,11 +5781,9 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
setValidatedNodeType(measure, type);
fields.add(alias, type);
- sqlNodes.add(
- SqlStdOperatorTable.AS.createCall(
- SqlParserPos.ZERO,
- expand,
- new SqlIdentifier(alias, SqlParserPos.ZERO)));
+ // ----- FLINK MODIFICATION BEGIN -----
+ sqlNodes.add(expand);
+ // ----- FLINK MODIFICATION END -----
}
SqlNodeList list = new SqlNodeList(sqlNodes,
measures.getParserPosition());
@@ -5825,11 +5837,9 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
// Some extra work need required here.
// In PREV, NEXT, FINAL and LAST, only one pattern variable is
allowed.
- sqlNodes.add(
- SqlStdOperatorTable.AS.createCall(
- SqlParserPos.ZERO,
- expand,
- new SqlIdentifier(alias, SqlParserPos.ZERO)));
+ // ----- FLINK MODIFICATION BEGIN -----
+ sqlNodes.add(expand);
+ // ----- FLINK MODIFICATION END -----
final RelDataType type = deriveType(scope, expand);
if (!SqlTypeUtil.inBooleanFamily(type)) {
@@ -7254,19 +7264,31 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
int firstLastCount;
int prevNextCount;
int aggregateCount;
+ // ----- FLINK MODIFICATION BEGIN -----
+ int index;
+ int argCount;
PatternValidator(boolean isMeasure) {
- this(isMeasure, 0, 0, 0);
+ this(isMeasure, 0, 0, 0, 0, 0);
}
PatternValidator(
- boolean isMeasure, int firstLastCount, int prevNextCount, int
aggregateCount) {
+ boolean isMeasure,
+ int firstLastCount,
+ int prevNextCount,
+ int aggregateCount,
+ int index,
+ int argCount) {
this.isMeasure = isMeasure;
this.firstLastCount = firstLastCount;
this.prevNextCount = prevNextCount;
this.aggregateCount = aggregateCount;
+ this.index = index;
+ this.argCount = argCount;
}
+ // ----- FLINK MODIFICATION END -----
+
@Override
public Set<String> visit(SqlCall call) {
boolean isSingle = false;
@@ -7312,7 +7334,9 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
call,
Static.RESOURCE.patternRunningFunctionInDefine(call.toString()));
}
- for (SqlNode node : operands) {
+ // ----- FLINK MODIFICATION BEGIN -----
+ for (int i = 0; i < operands.size(); i++) {
+ SqlNode node = operands.get(i);
if (node != null) {
vars.addAll(
requireNonNull(
@@ -7321,10 +7345,13 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
isMeasure,
firstLastCount,
prevNextCount,
- aggregateCount)),
+ aggregateCount,
+ i,
+ operands.size())),
() -> "node.accept(PatternValidator) for
node " + node));
}
}
+ // ----- FLINK MODIFICATION END -----
if (isSingle) {
switch (kind) {
@@ -7369,7 +7396,15 @@ public class SqlValidatorImpl implements
SqlValidatorWithHints {
@Override
public Set<String> visit(SqlLiteral literal) {
- return ImmutableSet.of();
+ // ----- FLINK MODIFICATION BEGIN -----
+ if ((this.argCount == 1 || this.index < this.argCount - 1)
+ && (this.firstLastCount > 0 || this.prevNextCount > 0)
+ && !SqlUtil.isNull(literal)) {
+ return ImmutableSet.of(literal.toValue());
+ } else {
+ return ImmutableSet.of();
+ }
+ // ----- FLINK MODIFICATION END -----
}
@Override
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
index 5aae60f5279..db4d7a3705e 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
@@ -41,8 +41,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
-import java.util.Comparator;
-import java.util.List;
import static org.apache.flink.api.common.typeinfo.Types.DOUBLE;
import static org.apache.flink.api.common.typeinfo.Types.INSTANT;
@@ -90,24 +88,23 @@ class MatchRecognizeITCase {
.column("name", DataTypes.STRING())
.columnByExpression("proctime", "PROCTIME()")
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT T.aid, T.bid, T.cid\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
- + " MEASURES\n"
- + " `A\"`.id AS aid,\n"
- + " \u006C.id AS bid,\n"
- + " C.id AS cid\n"
- + " PATTERN (`A\"` \u006C C)\n"
- + " DEFINE\n"
- + " `A\"` AS name = 'a',\n"
- + " \u006C AS name = 'b',\n"
- + " C AS name = 'c'\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(6, 7, 8));
+ final String sql =
+ "SELECT T.aid, T.bid, T.cid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " `A\"`.id AS aid,\n"
+ + " \u006C.id AS bid,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (`A\"` \u006C C)\n"
+ + " DEFINE\n"
+ + " `A\"` AS name = 'a',\n"
+ + " \u006C AS name = 'b',\n"
+ + " C AS name = 'c'\n"
+ + ") AS T";
+
+ assertTableResult(sql, Row.of(6, 7, 8));
}
@Test
@@ -137,24 +134,23 @@ class MatchRecognizeITCase {
.column("name", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT T.aid, T.bid, T.cid\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " `A\"`.id AS aid,\n"
- + " \u006C.id AS bid,\n"
- + " C.id AS cid\n"
- + " PATTERN (`A\"` \u006C C)\n"
- + " DEFINE\n"
- + " `A\"` AS name = 'a',\n"
- + " \u006C AS name = 'b',\n"
- + " C AS name = 'c'\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(6, 7, 8));
+ final String sql =
+ "SELECT T.aid, T.bid, T.cid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " `A\"`.id AS aid,\n"
+ + " \u006C.id AS bid,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (`A\"` \u006C C)\n"
+ + " DEFINE\n"
+ + " `A\"` AS name = 'a',\n"
+ + " \u006C AS name = 'b',\n"
+ + " C AS name = 'c'\n"
+ + ") AS T";
+
+ assertTableResult(sql, Row.of(6, 7, 8));
}
@Test
@@ -186,24 +182,23 @@ class MatchRecognizeITCase {
.column("name", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP(3))
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT T.aid, T.bid, T.cid\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " A.id AS aid,\n"
- + " B.id AS bid,\n"
- + " C.id AS cid\n"
- + " PATTERN (A B C) WITHIN INTERVAL '1'
MINUTE\n"
- + " DEFINE\n"
- + " A AS name = 'a',\n"
- + " B AS name = 'b',\n"
- + " C AS name = 'c'\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(2, 3, 4));
+
+ final String sql =
+ "SELECT T.aid, T.bid, T.cid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " B.id AS bid,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (A B C) WITHIN INTERVAL '1' MINUTE\n"
+ + " DEFINE\n"
+ + " A AS name = 'a',\n"
+ + " B AS name = 'b',\n"
+ + " C AS name = 'c'\n"
+ + ") AS T";
+ assertTableResult(sql, Row.of(2, 3, 4));
}
@Test
@@ -235,25 +230,25 @@ class MatchRecognizeITCase {
.column("nullField", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT T.aid, T.bNull, T.cid, T.aNull\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " A.id AS aid,\n"
- + " A.nullField AS aNull,\n"
- + " LAST(B.nullField) AS bNull,\n"
- + " C.id AS cid\n"
- + " PATTERN (A B C)\n"
- + " DEFINE\n"
- + " A AS name = 'a' AND nullField IS
NULL,\n"
- + " B AS name = 'b' AND LAST(A.nullField)
IS NULL,\n"
- + " C AS name = 'c'\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(1, null, 3, null), Row.of(6, null, 8,
null));
+
+ final String sql =
+ "SELECT T.aid, T.bNull, T.cid, T.aNull\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " A.nullField AS aNull,\n"
+ + " LAST(B.nullField) AS bNull,\n"
+ + " C.id AS cid\n"
+ + " PATTERN (A B C)\n"
+ + " DEFINE\n"
+ + " A AS name = 'a' AND nullField IS NULL,\n"
+ + " B AS name = 'b' AND LAST(A.nullField) IS
NULL,\n"
+ + " C AS name = 'c'\n"
+ + ") AS T";
+
+ assertTableResult(sql, Row.of(1, null, 3, null), Row.of(6, null, 8,
null));
}
@Test
@@ -288,31 +283,29 @@ class MatchRecognizeITCase {
.column("key2", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT *\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " PARTITION BY key1, key2\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " A.id AS aid,\n"
- + " A.key1 AS akey1,\n"
- + " LAST(B.id) AS bid,\n"
- + " C.id AS cid,\n"
- + " C.key2 AS ckey2\n"
- + " PATTERN (A B C)\n"
- + " DEFINE\n"
- + " A AS name = 'a' AND key1 LIKE '%key%'
AND id > 0,\n"
- + " B AS name = 'b' AND LAST(A.name, 2) IS
NULL,\n"
- + " C AS name = 'c' AND LAST(A.name) =
'a'\n"
- + ") AS T");
- List<Row> actual =
CollectionUtil.iteratorToList(tableResult.collect());
- actual.sort(Comparator.comparing(o -> String.valueOf(o.getField(0))));
- assertThat(actual)
- .containsExactly(
- Row.of("key1", "second_key3", 1, "key1", 2, 3,
"second_key3"),
- Row.of("key2", "second_key4", 6, "key2", 7, 8,
"second_key4"));
+ final String sql =
+ "SELECT *\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY key1, key2\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " A.key1 AS akey1,\n"
+ + " LAST(B.id) AS bid,\n"
+ + " C.id AS cid,\n"
+ + " C.key2 AS ckey2\n"
+ + " PATTERN (A B C)\n"
+ + " DEFINE\n"
+ + " A AS name = 'a' AND key1 LIKE '%key%' AND id >
0,\n"
+ + " B AS name = 'b' AND LAST(A.name, 2) IS NULL,\n"
+ + " C AS name = 'c' AND LAST(A.name) = 'a'\n"
+ + ") AS T";
+
+ assertTableResult(
+ sql,
+ Row.of("key1", "second_key3", 1, "key1", 2, 3, "second_key3"),
+ Row.of("key2", "second_key4", 6, "key2", 7, 8, "second_key4"));
}
@Test
@@ -413,31 +406,30 @@ class MatchRecognizeITCase {
.column("tax", DataTypes.INT())
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT *\n"
- + "FROM (\n"
- + " SELECT\n"
- + " symbol,\n"
- + " SUM(price) as price,\n"
- + " TUMBLE_ROWTIME(ts, interval '3'
second) as rowTime,\n"
- + " TUMBLE_START(ts, interval '3' second)
as startTime\n"
- + " FROM Ticker\n"
- + " GROUP BY symbol, TUMBLE(ts, interval '3'
second)\n"
- + ")\n"
- + "MATCH_RECOGNIZE (\n"
- + " PARTITION BY symbol\n"
- + " ORDER BY rowTime\n"
- + " MEASURES\n"
- + " B.price as dPrice,\n"
- + " B.startTime as dTime\n"
- + " ONE ROW PER MATCH\n"
- + " PATTERN (A B)\n"
- + " DEFINE\n"
- + " B AS B.price < A.price\n"
- + ")");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of("ACME", 2, now.plusSeconds(3)));
+ final String sql =
+ "SELECT *\n"
+ + "FROM (\n"
+ + " SELECT\n"
+ + " symbol,\n"
+ + " SUM(price) as price,\n"
+ + " TUMBLE_ROWTIME(ts, interval '3' second) as
rowTime,\n"
+ + " TUMBLE_START(ts, interval '3' second) as
startTime\n"
+ + " FROM Ticker\n"
+ + " GROUP BY symbol, TUMBLE(ts, interval '3'
second)\n"
+ + ")\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol\n"
+ + " ORDER BY rowTime\n"
+ + " MEASURES\n"
+ + " B.price as dPrice,\n"
+ + " B.startTime as dTime\n"
+ + " ONE ROW PER MATCH\n"
+ + " PATTERN (A B)\n"
+ + " DEFINE\n"
+ + " B AS B.price < A.price\n"
+ + ")";
+
+ assertTableResult(sql, Row.of("ACME", 2, now.plusSeconds(3)));
}
@Test
@@ -467,39 +459,39 @@ class MatchRecognizeITCase {
.column("tax", DataTypes.INT())
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT\n"
- + " symbol,\n"
- + " SUM(price) as price,\n"
- + " TUMBLE_ROWTIME(matchRowtime, interval '3'
second) as rowTime,\n"
- + " TUMBLE_START(matchRowtime, interval '3'
second) as startTime\n"
- + "FROM Ticker\n"
- + "MATCH_RECOGNIZE (\n"
- + " PARTITION BY symbol\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " A.price as price,\n"
- + " A.tax as tax,\n"
- + " MATCH_ROWTIME() as matchRowtime\n"
- + " ONE ROW PER MATCH\n"
- + " PATTERN (A)\n"
- + " DEFINE\n"
- + " A AS A.price > 0\n"
- + ") AS T\n"
- + "GROUP BY symbol, TUMBLE(matchRowtime,
interval '3' second)");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(
- Row.of(
- "ACME",
- 3,
- LocalDateTime.parse("1970-01-01T00:00:02.999"),
- LocalDateTime.parse("1970-01-01T00:00")),
- Row.of(
- "ACME",
- 2,
- LocalDateTime.parse("1970-01-01T00:00:05.999"),
- LocalDateTime.parse("1970-01-01T00:00:03")));
+ final String sql =
+ "SELECT\n"
+ + " symbol,\n"
+ + " SUM(price) as price,\n"
+ + " TUMBLE_ROWTIME(matchRowtime, interval '3' second)
as rowTime,\n"
+ + " TUMBLE_START(matchRowtime, interval '3' second)
as startTime\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " A.price as price,\n"
+ + " A.tax as tax,\n"
+ + " MATCH_ROWTIME() as matchRowtime\n"
+ + " ONE ROW PER MATCH\n"
+ + " PATTERN (A)\n"
+ + " DEFINE\n"
+ + " A AS A.price > 0\n"
+ + ") AS T\n"
+ + "GROUP BY symbol, TUMBLE(matchRowtime, interval '3'
second)";
+
+ assertTableResult(
+ sql,
+ Row.of(
+ "ACME",
+ 3,
+ LocalDateTime.parse("1970-01-01T00:00:02.999"),
+ LocalDateTime.parse("1970-01-01T00:00")),
+ Row.of(
+ "ACME",
+ 2,
+ LocalDateTime.parse("1970-01-01T00:00:05.999"),
+ LocalDateTime.parse("1970-01-01T00:00:03")));
}
@Test
@@ -530,27 +522,27 @@ class MatchRecognizeITCase {
.column("tax", DataTypes.INT())
.columnByExpression("ts",
"TO_TIMESTAMP_LTZ(tstamp, 3)")
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT *\n"
- + "FROM Ticker\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " FIRST(DOWN.tstamp) AS start_tstamp,\n"
- + " LAST(DOWN.tstamp) AS bottom_tstamp,\n"
- + " UP.tstamp AS end_tstamp,\n"
- + " FIRST(DOWN.price + DOWN.tax + 1) AS
bottom_total,\n"
- + " UP.price + UP.tax AS end_total\n"
- + " ONE ROW PER MATCH\n"
- + " AFTER MATCH SKIP PAST LAST ROW\n"
- + " PATTERN (DOWN{2,} UP)\n"
- + " DEFINE\n"
- + " DOWN AS price < LAST(DOWN.price, 1) OR
LAST(DOWN.price, 1) IS NULL,\n"
- + " UP AS price < FIRST(DOWN.price)\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(6L, 7L, 8L, 33, 33));
+
+ final String sql =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " FIRST(DOWN.tstamp) AS start_tstamp,\n"
+ + " LAST(DOWN.tstamp) AS bottom_tstamp,\n"
+ + " UP.tstamp AS end_tstamp,\n"
+ + " FIRST(DOWN.price + DOWN.tax + 1) AS
bottom_total,\n"
+ + " UP.price + UP.tax AS end_total\n"
+ + " ONE ROW PER MATCH\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (DOWN{2,} UP)\n"
+ + " DEFINE\n"
+ + " DOWN AS price < LAST(DOWN.price, 1) OR
LAST(DOWN.price, 1) IS NULL,\n"
+ + " UP AS price < FIRST(DOWN.price)\n"
+ + ") AS T";
+
+ assertTableResult(sql, Row.of(6L, 7L, 8L, 33, 33));
}
@Test
@@ -579,25 +571,24 @@ class MatchRecognizeITCase {
.column("tax", DataTypes.INT())
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT *\n"
- + "FROM Ticker\n"
- + "MATCH_RECOGNIZE (\n"
- + " PARTITION BY symbol\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " DOWN.tax AS bottom_tax,\n"
- + " UP.tax AS end_tax\n"
- + " ONE ROW PER MATCH\n"
- + " AFTER MATCH SKIP PAST LAST ROW\n"
- + " PATTERN (DOWN UP)\n"
- + " DEFINE\n"
- + " DOWN AS DOWN.price = 13,\n"
- + " UP AS UP.price = 20\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of("ACME", 3, 4));
+ final String sql =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " PARTITION BY symbol\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " DOWN.tax AS bottom_tax,\n"
+ + " UP.tax AS end_tax\n"
+ + " ONE ROW PER MATCH\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (DOWN UP)\n"
+ + " DEFINE\n"
+ + " DOWN AS DOWN.price = 13,\n"
+ + " UP AS UP.price = 20\n"
+ + ") AS T";
+
+ assertTableResult(sql, Row.of("ACME", 3, 4));
}
@Test
@@ -628,38 +619,37 @@ class MatchRecognizeITCase {
.column("price", DataTypes.INT())
.columnByExpression("ts",
"TO_TIMESTAMP_LTZ(tstamp, 3)")
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT *\n"
- + "FROM Ticker\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " FIRST(id, 0) as id0,\n"
- + " FIRST(id, 1) as id1,\n"
- + " FIRST(id, 2) as id2,\n"
- + " FIRST(id, 3) as id3,\n"
- + " FIRST(id, 4) as id4,\n"
- + " FIRST(id, 5) as id5,\n"
- + " FIRST(id, 6) as id6,\n"
- + " FIRST(id, 7) as id7,\n"
- + " LAST(id, 0) as id8,\n"
- + " LAST(id, 1) as id9,\n"
- + " LAST(id, 2) as id10,\n"
- + " LAST(id, 3) as id11,\n"
- + " LAST(id, 4) as id12,\n"
- + " LAST(id, 5) as id13,\n"
- + " LAST(id, 6) as id14,\n"
- + " LAST(id, 7) as id15\n"
- + " ONE ROW PER MATCH\n"
- + " AFTER MATCH SKIP PAST LAST ROW\n"
- + " PATTERN (`DOWN\"`{2,} UP)\n"
- + " DEFINE\n"
- + " `DOWN\"` AS price < LAST(price, 1) OR
LAST(price, 1) IS NULL,\n"
- + " UP AS price = FIRST(price) AND price >
FIRST(price, 3) AND price = LAST(price, 7)\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4,
3, 2, 1));
+ final String sql =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " FIRST(id, 0) as id0,\n"
+ + " FIRST(id, 1) as id1,\n"
+ + " FIRST(id, 2) as id2,\n"
+ + " FIRST(id, 3) as id3,\n"
+ + " FIRST(id, 4) as id4,\n"
+ + " FIRST(id, 5) as id5,\n"
+ + " FIRST(id, 6) as id6,\n"
+ + " FIRST(id, 7) as id7,\n"
+ + " LAST(id, 0) as id8,\n"
+ + " LAST(id, 1) as id9,\n"
+ + " LAST(id, 2) as id10,\n"
+ + " LAST(id, 3) as id11,\n"
+ + " LAST(id, 4) as id12,\n"
+ + " LAST(id, 5) as id13,\n"
+ + " LAST(id, 6) as id14,\n"
+ + " LAST(id, 7) as id15\n"
+ + " ONE ROW PER MATCH\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (`DOWN\"`{2,} UP)\n"
+ + " DEFINE\n"
+ + " `DOWN\"` AS price < LAST(price, 1) OR
LAST(price, 1) IS NULL,\n"
+ + " UP AS price = FIRST(price) AND price >
FIRST(price, 3) AND price = LAST(price, 7)\n"
+ + ") AS T";
+
+ assertTableResult(sql, Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4,
3, 2, 1));
}
@Test
@@ -686,25 +676,25 @@ class MatchRecognizeITCase {
.column("tax", DataTypes.INT())
.columnByExpression("ts",
"TO_TIMESTAMP_LTZ(tstamp, 3)")
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT *\n"
- + "FROM Ticker\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " FIRST(DOWN.price) as first,\n"
- + " LAST(DOWN.price) as last,\n"
- + " FIRST(DOWN.price, 5) as nullPrice\n"
- + " ONE ROW PER MATCH\n"
- + " AFTER MATCH SKIP PAST LAST ROW\n"
- + " PATTERN (DOWN{2,} UP)\n"
- + " DEFINE\n"
- + " DOWN AS price < LAST(DOWN.price, 1) OR
LAST(DOWN.price, 1) IS NULL,\n"
- + " UP AS price > LAST(DOWN.price)\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(19, 13, null));
+
+ final String sql =
+ "SELECT *\n"
+ + "FROM Ticker\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " FIRST(DOWN.price) as first,\n"
+ + " LAST(DOWN.price) as last,\n"
+ + " FIRST(DOWN.price, 5) as nullPrice\n"
+ + " ONE ROW PER MATCH\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (DOWN{2,} UP)\n"
+ + " DEFINE\n"
+ + " DOWN AS price < LAST(DOWN.price, 1) OR
LAST(DOWN.price, 1) IS NULL,\n"
+ + " UP AS price > LAST(DOWN.price)\n"
+ + ") AS T";
+
+ assertTableResult(sql, Row.of(19, 13, null));
}
/**
@@ -754,35 +744,36 @@ class MatchRecognizeITCase {
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT *\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " FIRST(id) as startId,\n"
- + " SUM(A.price) AS sumA,\n"
- + " COUNT(D.price) AS countD,\n"
- + " SUM(D.price) as sumD,\n"
- + " weightedAvg(price, weight) as wAvg,\n"
- + " AVG(B.price) AS avgB,\n"
- + " SUM(B.price * B.rate) as sumExprB,\n"
- + " LAST(id) as endId\n"
- + " AFTER MATCH SKIP PAST LAST ROW\n"
- + " PATTERN (A+ B+ C D? E)\n"
- + " DEFINE\n"
- + " A AS SUM(A.price) < 6,\n"
- + " B AS SUM(B.price * B.rate) <
SUM(A.price) AND\n"
- + " SUM(B.price * B.rate) > 0.2 AND\n"
- + " SUM(B.price) >= 1 AND\n"
- + " AVG(B.price) >= 1 AND\n"
- + " weightedAvg(price, weight) > 1\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(
- Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8),
- Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12));
+
+ final String sql =
+ "SELECT *\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " FIRST(id) as startId,\n"
+ + " SUM(A.price) AS sumA,\n"
+ + " COUNT(D.price) AS countD,\n"
+ + " SUM(D.price) as sumD,\n"
+ + " weightedAvg(price, weight) as wAvg,\n"
+ + " AVG(B.price) AS avgB,\n"
+ + " SUM(B.price * B.rate) as sumExprB,\n"
+ + " LAST(id) as endId\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (A+ B+ C D? E)\n"
+ + " DEFINE\n"
+ + " A AS SUM(A.price) < 6,\n"
+ + " B AS SUM(B.price * B.rate) < SUM(A.price) AND\n"
+ + " SUM(B.price * B.rate) > 0.2 AND\n"
+ + " SUM(B.price) >= 1 AND\n"
+ + " AVG(B.price) >= 1 AND\n"
+ + " weightedAvg(price, weight) > 1\n"
+ + ") AS T";
+
+ assertTableResult(
+ sql,
+ Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8),
+ Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12));
}
@Test
@@ -816,27 +807,27 @@ class MatchRecognizeITCase {
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build()));
tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT *\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " SUM(A.price) as sumA,\n"
- + " COUNT(A.id) as countAId,\n"
- + " COUNT(A.price) as countAPrice,\n"
- + " COUNT(*) as countAll,\n"
- + " COUNT(price) as countAllPrice,\n"
- + " LAST(id) as endId\n"
- + " AFTER MATCH SKIP PAST LAST ROW\n"
- + " PATTERN (A+ C)\n"
- + " DEFINE\n"
- + " A AS SUM(A.price) < 30,\n"
- + " C AS C.name = 'c'\n"
- + ") AS T");
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(29, 7L, 5L, 8L, 6L, 8));
+
+ final String sql =
+ "SELECT *\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " SUM(A.price) as sumA,\n"
+ + " COUNT(A.id) as countAId,\n"
+ + " COUNT(A.price) as countAPrice,\n"
+ + " COUNT(*) as countAll,\n"
+ + " COUNT(price) as countAllPrice,\n"
+ + " LAST(id) as endId\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (A+ C)\n"
+ + " DEFINE\n"
+ + " A AS SUM(A.price) < 30,\n"
+ + " C AS C.name = 'c'\n"
+ + ") AS T";
+
+ assertTableResult(sql, Row.of(29, 7L, 5L, 8L, 6L, 8));
}
@Test
@@ -851,21 +842,21 @@ class MatchRecognizeITCase {
.column("name", DataTypes.STRING())
.columnByExpression("proctime", "PROCTIME()")
.build()));
- TableResult tableResult =
- tEnv.executeSql(
- "SELECT T.aid\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY proctime\n"
- + " MEASURES\n"
- + " A.id AS aid,\n"
- + " A.proctime AS aProctime,\n"
- + " LAST(A.proctime + INTERVAL '1' second)
as calculatedField\n"
- + " PATTERN (A)\n"
- + " DEFINE\n"
- + " A AS proctime >= (CURRENT_TIMESTAMP -
INTERVAL '1' day)\n"
- + ") AS T");
-
assertThat(CollectionUtil.iteratorToList(tableResult.collect())).containsExactly(Row.of(1));
+
+ final String sql =
+ "SELECT T.aid\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY proctime\n"
+ + " MEASURES\n"
+ + " A.id AS aid,\n"
+ + " A.proctime AS aProctime,\n"
+ + " LAST(A.proctime + INTERVAL '1' second) as
calculatedField\n"
+ + " PATTERN (A)\n"
+ + " DEFINE\n"
+ + " A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL
'1' day)\n"
+ + ") AS T";
+ assertTableResult(sql, Row.of(1));
}
@Test
@@ -906,29 +897,28 @@ class MatchRecognizeITCase {
jobParameters.setString("prefix", prefix);
jobParameters.setString("start", Integer.toString(startFrom));
env.getConfig().setGlobalJobParameters(jobParameters);
- TableResult tableResult =
- tEnv.executeSql(
- String.format(
- "SELECT *\n"
- + "FROM MyTable\n"
- + "MATCH_RECOGNIZE (\n"
- + " ORDER BY ts\n"
- + " MEASURES\n"
- + " FIRST(id) as firstId,\n"
- + " prefix(A.name) as
prefixedNameA,\n"
- + " countFrom(A.price) as
countFromA,\n"
- + " LAST(id) as lastId\n"
- + " AFTER MATCH SKIP PAST LAST ROW\n"
- + " PATTERN (A+ C)\n"
- + " DEFINE\n"
- + " A AS prefix(A.name) = '%s:a'
AND countFrom(A.price) <= %d\n"
- + ") AS T",
- prefix, 4 + 4));
- assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
- .containsExactly(Row.of(1, "PREF:a", 8, 5), Row.of(7,
"PREF:a", 6, 9));
+ String sql =
+ String.format(
+ "SELECT *\n"
+ + "FROM MyTable\n"
+ + "MATCH_RECOGNIZE (\n"
+ + " ORDER BY ts\n"
+ + " MEASURES\n"
+ + " FIRST(id) as firstId,\n"
+ + " prefix(A.name) as prefixedNameA,\n"
+ + " countFrom(A.price) as countFromA,\n"
+ + " LAST(id) as lastId\n"
+ + " AFTER MATCH SKIP PAST LAST ROW\n"
+ + " PATTERN (A+ C)\n"
+ + " DEFINE\n"
+ + " A AS prefix(A.name) = '%s:a' AND
countFrom(A.price) <= %d\n"
+ + ") AS T",
+ prefix, 4 + 4);
+
+ assertTableResult(sql, Row.of(1, "PREF:a", 8, 5), Row.of(7, "PREF:a",
6, 9));
}
- /** Test prefixing function.. */
+ /** Test prefixing function... */
public static class PrefixingScalarFunc extends ScalarFunction {
private String prefix = "ERROR_VALUE";
@@ -981,4 +971,17 @@ class MatchRecognizeITCase {
countAcc.count += value;
}
}
+
+ private void assertTableResult(String sql, Row... expected) {
+ TableResult tableResult = tEnv.executeSql(sql);
+
assertThat(CollectionUtil.iteratorToList(tableResult.collect())).containsExactly(expected);
+
+ // Also check that same query is able to compile and return same
result if it is used in
+ // view
+ // test cases for FLINK-39293
+ tEnv.executeSql("CREATE VIEW test_view AS \n" + sql);
+ TableResult tableResultWithView = tEnv.executeSql("SELECT * FROM
test_view");
+
assertThat(CollectionUtil.iteratorToList(tableResultWithView.collect()))
+ .containsExactly(expected);
+ }
}