This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new 4d0480952b1 [FLINK-39293][table] `MATCH_RECOGNIZE` fails with 
`SqlParserException` in views
4d0480952b1 is described below

commit 4d0480952b16b3a519066d0b3e74b4ff02c255f6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Apr 24 16:52:02 2026 +0200

    [FLINK-39293][table] `MATCH_RECOGNIZE` fails with `SqlParserException` in 
views
---
 .../src/main/codegen/templates/Parser.jj           |  23 +-
 .../calcite/sql/validate/SqlValidatorImpl.java     |  81 ++-
 .../runtime/batch/sql/MatchRecognizeITCase.java    | 673 +++++++++++----------
 3 files changed, 414 insertions(+), 363 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj 
b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
index 1bb939a46fa..a9802fd631b 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
+++ b/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj
@@ -2202,7 +2202,10 @@ SqlNode TableRef3(ExprContext exprContext, boolean 
lateral) :
             [ tableRef = ExtendTable(tableRef) ]
             tableRef = Over(tableRef)
             [ tableRef = Snapshot(tableRef) ]
-            [ tableRef = MatchRecognize(tableRef) ]
+            [
+              LOOKAHEAD(3)
+              tableRef = MatchRecognize(tableRef)
+            ]
         )
     |
         LOOKAHEAD(2)
@@ -2210,7 +2213,10 @@ SqlNode TableRef3(ExprContext exprContext, boolean 
lateral) :
         tableRef = ParenthesizedExpression(exprContext)
         tableRef = Over(tableRef)
         tableRef = addLateral(tableRef, lateral)
-        [ tableRef = MatchRecognize(tableRef) ]
+        [
+          LOOKAHEAD(3)
+          tableRef = MatchRecognize(tableRef)
+        ]
     |
         LOOKAHEAD(2)
         [ <LATERAL> ] // "LATERAL" is implicit with "UNNEST", so ignore
@@ -3059,6 +3065,7 @@ void AddUnpivotValue(List<SqlNode> list) :
 SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
 {
     final Span s, s0, s1, s2;
+    final SqlIdentifier aliasBeforeMatch;
     final SqlNodeList measureList;
     final SqlNodeList partitionList;
     final SqlNodeList orderList;
@@ -3073,6 +3080,12 @@ SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
     final SqlLiteral isStrictEnds;
 }
 {
+    [
+      <AS> aliasBeforeMatch = SimpleIdentifier() {
+          tableRef = SqlStdOperatorTable.AS.createCall(
+              Span.of(tableRef).end(this), tableRef, aliasBeforeMatch);
+      }
+    ]
     <MATCH_RECOGNIZE> { s = span(); checkNotJoin(tableRef); } <LPAREN>
     (
         <PARTITION> { s2 = span(); } <BY>
@@ -7210,7 +7223,7 @@ SqlCall MatchRecognizeCallWithModifier() :
 {
     final Span s;
     final SqlOperator runningOp;
-    final SqlNode func;
+    final SqlNode e;
 }
 {
     (
@@ -7219,8 +7232,8 @@ SqlCall MatchRecognizeCallWithModifier() :
         <FINAL> { runningOp = SqlStdOperatorTable.FINAL; }
     )
     { s = span(); }
-    func = NamedFunctionCall() {
-        return runningOp.createCall(s.end(func), func);
+    e = Expression3(ExprContext.ACCEPT_NON_QUERY) {
+        return runningOp.createCall(s.end(e), e);
     }
 }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 6635637888a..6ef58067ced 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -169,23 +169,35 @@ import static org.apache.calcite.util.Util.first;
  * Default implementation of {@link SqlValidator}, the class was copied over 
because of
  * CALCITE-4554.
  *
- * <p>Lines 207 ~ 210, Flink improves error message for functions without 
appropriate arguments in
+ * <p>Lines 219 ~ 222, Flink improves error message for functions without 
appropriate arguments in
  * handleUnresolvedFunction.
  *
- * <p>Lines 1275 ~ 1277, CALCITE-7217, should be removed after upgrading 
Calcite to 1.41.0.
+ * <p>Lines 1287 ~ 1289, CALCITE-7217, should be removed after upgrading 
Calcite to 1.41.0.
  *
- * <p>Lines 2036 ~ 2050, Flink improves error message for functions without 
appropriate arguments in
+ * <p>Lines 2048 ~ 2062, Flink improves error message for functions without 
appropriate arguments in
  * handleUnresolvedFunction at {@link 
SqlValidatorImpl#handleUnresolvedFunction}.
  *
- * <p>Lines 2576 ~ 2595, CALCITE-7217, CALCITE-7312 should be removed after 
upgrading Calcite to
+ * <p>Lines 2475 ~ 2477, CALCITE-7471 should be removed after upgrading 
Calcite to 1.42.0.
+ *
+ * <p>Lines 2590 ~ 2609, CALCITE-7217, CALCITE-7312 should be removed after 
upgrading Calcite to
  * 1.42.0.
  *
- * <p>Line 2626 ~2644, set the correct scope for VECTOR_SEARCH.
+ * <p>Line 2640 ~2658, set the correct scope for VECTOR_SEARCH.
  *
- * <p>Lines 3923 ~ 3927, 6602 ~ 6608 Flink improves Optimize the retrieval of 
sub-operands in
+ * <p>Lines 3937 ~ 3941, 6612 ~ 6618 Flink improves Optimize the retrieval of 
sub-operands in
  * SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
  *
- * <p>Lines 5343 ~ 5349, FLINK-24352 Add null check for temporal table check 
on SqlSnapshot.
+ * <p>Lines 5357 ~ 5363, FLINK-24352 Add null check for temporal table check 
on SqlSnapshot.
+ *
+ * <p>Lines 5784-5786, CALCITE-7466 should be removed after upgrading Calcite 
to 1.42.0.
+ *
+ * <p>Lines 5840-5842, CALCITE-7470 should be removed after upgrading Calcite 
to 1.42.0.
+ *
+ * <p>Lines 7267-7290, CALCITE-7486 should be removed after upgrading Calcite 
to 1.42.0.
+ *
+ * <p>Lines 7337-7354, CALCITE-7486 should be removed after upgrading Calcite 
to 1.42.0.
+ *
+ * <p>Lines 7399-7407, CALCITE-7486 should be removed after upgrading Calcite 
to 1.42.0.
  */
 public class SqlValidatorImpl implements SqlValidatorWithHints {
     // ~ Static fields/initializers 
---------------------------------------------
@@ -2460,7 +2472,9 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                         enclosingNode,
                         alias,
                         forceNullable);
-                return node;
+                // ----- FLINK MODIFICATION BEGIN -----
+                return newNode;
+            // ----- FLINK MODIFICATION END -----
 
             case PIVOT:
                 registerPivot(
@@ -5767,11 +5781,9 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
             setValidatedNodeType(measure, type);
 
             fields.add(alias, type);
-            sqlNodes.add(
-                    SqlStdOperatorTable.AS.createCall(
-                            SqlParserPos.ZERO,
-                            expand,
-                            new SqlIdentifier(alias, SqlParserPos.ZERO)));
+            // ----- FLINK MODIFICATION BEGIN -----
+            sqlNodes.add(expand);
+            // ----- FLINK MODIFICATION END -----
         }
 
         SqlNodeList list = new SqlNodeList(sqlNodes, 
measures.getParserPosition());
@@ -5825,11 +5837,9 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
 
             // Some extra work need required here.
             // In PREV, NEXT, FINAL and LAST, only one pattern variable is 
allowed.
-            sqlNodes.add(
-                    SqlStdOperatorTable.AS.createCall(
-                            SqlParserPos.ZERO,
-                            expand,
-                            new SqlIdentifier(alias, SqlParserPos.ZERO)));
+            // ----- FLINK MODIFICATION BEGIN -----
+            sqlNodes.add(expand);
+            // ----- FLINK MODIFICATION END -----
 
             final RelDataType type = deriveType(scope, expand);
             if (!SqlTypeUtil.inBooleanFamily(type)) {
@@ -7254,19 +7264,31 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
         int firstLastCount;
         int prevNextCount;
         int aggregateCount;
+        // ----- FLINK MODIFICATION BEGIN -----
+        int index;
+        int argCount;
 
         PatternValidator(boolean isMeasure) {
-            this(isMeasure, 0, 0, 0);
+            this(isMeasure, 0, 0, 0, 0, 0);
         }
 
         PatternValidator(
-                boolean isMeasure, int firstLastCount, int prevNextCount, int 
aggregateCount) {
+                boolean isMeasure,
+                int firstLastCount,
+                int prevNextCount,
+                int aggregateCount,
+                int index,
+                int argCount) {
             this.isMeasure = isMeasure;
             this.firstLastCount = firstLastCount;
             this.prevNextCount = prevNextCount;
             this.aggregateCount = aggregateCount;
+            this.index = index;
+            this.argCount = argCount;
         }
 
+        // ----- FLINK MODIFICATION END -----
+
         @Override
         public Set<String> visit(SqlCall call) {
             boolean isSingle = false;
@@ -7312,7 +7334,9 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                         call, 
Static.RESOURCE.patternRunningFunctionInDefine(call.toString()));
             }
 
-            for (SqlNode node : operands) {
+            // ----- FLINK MODIFICATION BEGIN -----
+            for (int i = 0; i < operands.size(); i++) {
+                SqlNode node = operands.get(i);
                 if (node != null) {
                     vars.addAll(
                             requireNonNull(
@@ -7321,10 +7345,13 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
                                                     isMeasure,
                                                     firstLastCount,
                                                     prevNextCount,
-                                                    aggregateCount)),
+                                                    aggregateCount,
+                                                    i,
+                                                    operands.size())),
                                     () -> "node.accept(PatternValidator) for 
node " + node));
                 }
             }
+            // ----- FLINK MODIFICATION END -----
 
             if (isSingle) {
                 switch (kind) {
@@ -7369,7 +7396,15 @@ public class SqlValidatorImpl implements 
SqlValidatorWithHints {
 
         @Override
         public Set<String> visit(SqlLiteral literal) {
-            return ImmutableSet.of();
+            // ----- FLINK MODIFICATION BEGIN -----
+            if ((this.argCount == 1 || this.index < this.argCount - 1)
+                    && (this.firstLastCount > 0 || this.prevNextCount > 0)
+                    && !SqlUtil.isNull(literal)) {
+                return ImmutableSet.of(literal.toValue());
+            } else {
+                return ImmutableSet.of();
+            }
+            // ----- FLINK MODIFICATION END -----
         }
 
         @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
index 5aae60f5279..db4d7a3705e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/MatchRecognizeITCase.java
@@ -41,8 +41,6 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneOffset;
-import java.util.Comparator;
-import java.util.List;
 
 import static org.apache.flink.api.common.typeinfo.Types.DOUBLE;
 import static org.apache.flink.api.common.typeinfo.Types.INSTANT;
@@ -90,24 +88,23 @@ class MatchRecognizeITCase {
                                 .column("name", DataTypes.STRING())
                                 .columnByExpression("proctime", "PROCTIME()")
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT T.aid, T.bid, T.cid\n"
-                                + "FROM MyTable\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY proctime\n"
-                                + "  MEASURES\n"
-                                + "    `A\"`.id AS aid,\n"
-                                + "    \u006C.id AS bid,\n"
-                                + "    C.id AS cid\n"
-                                + "  PATTERN (`A\"` \u006C C)\n"
-                                + "  DEFINE\n"
-                                + "    `A\"` AS name = 'a',\n"
-                                + "    \u006C AS name = 'b',\n"
-                                + "    C AS name = 'c'\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(6, 7, 8));
+        final String sql =
+                "SELECT T.aid, T.bid, T.cid\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    `A\"`.id AS aid,\n"
+                        + "    \u006C.id AS bid,\n"
+                        + "    C.id AS cid\n"
+                        + "  PATTERN (`A\"` \u006C C)\n"
+                        + "  DEFINE\n"
+                        + "    `A\"` AS name = 'a',\n"
+                        + "    \u006C AS name = 'b',\n"
+                        + "    C AS name = 'c'\n"
+                        + ") AS T";
+
+        assertTableResult(sql, Row.of(6, 7, 8));
     }
 
     @Test
@@ -137,24 +134,23 @@ class MatchRecognizeITCase {
                                 .column("name", DataTypes.STRING())
                                 .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT T.aid, T.bid, T.cid\n"
-                                + "FROM MyTable\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    `A\"`.id AS aid,\n"
-                                + "    \u006C.id AS bid,\n"
-                                + "    C.id AS cid\n"
-                                + "  PATTERN (`A\"` \u006C C)\n"
-                                + "  DEFINE\n"
-                                + "    `A\"` AS name = 'a',\n"
-                                + "    \u006C AS name = 'b',\n"
-                                + "    C AS name = 'c'\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(6, 7, 8));
+        final String sql =
+                "SELECT T.aid, T.bid, T.cid\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    `A\"`.id AS aid,\n"
+                        + "    \u006C.id AS bid,\n"
+                        + "    C.id AS cid\n"
+                        + "  PATTERN (`A\"` \u006C C)\n"
+                        + "  DEFINE\n"
+                        + "    `A\"` AS name = 'a',\n"
+                        + "    \u006C AS name = 'b',\n"
+                        + "    C AS name = 'c'\n"
+                        + ") AS T";
+
+        assertTableResult(sql, Row.of(6, 7, 8));
     }
 
     @Test
@@ -186,24 +182,23 @@ class MatchRecognizeITCase {
                                 .column("name", DataTypes.STRING())
                                 .column("ts", DataTypes.TIMESTAMP(3))
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT T.aid, T.bid, T.cid\n"
-                                + "FROM MyTable\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    A.id AS aid,\n"
-                                + "    B.id AS bid,\n"
-                                + "    C.id AS cid\n"
-                                + "  PATTERN (A B C) WITHIN INTERVAL '1' 
MINUTE\n"
-                                + "  DEFINE\n"
-                                + "    A AS name = 'a',\n"
-                                + "    B AS name = 'b',\n"
-                                + "    C AS name = 'c'\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(2, 3, 4));
+
+        final String sql =
+                "SELECT T.aid, T.bid, T.cid\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    A.id AS aid,\n"
+                        + "    B.id AS bid,\n"
+                        + "    C.id AS cid\n"
+                        + "  PATTERN (A B C) WITHIN INTERVAL '1' MINUTE\n"
+                        + "  DEFINE\n"
+                        + "    A AS name = 'a',\n"
+                        + "    B AS name = 'b',\n"
+                        + "    C AS name = 'c'\n"
+                        + ") AS T";
+        assertTableResult(sql, Row.of(2, 3, 4));
     }
 
     @Test
@@ -235,25 +230,25 @@ class MatchRecognizeITCase {
                                 .column("nullField", DataTypes.STRING())
                                 .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT T.aid, T.bNull, T.cid, T.aNull\n"
-                                + "FROM MyTable\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    A.id AS aid,\n"
-                                + "    A.nullField AS aNull,\n"
-                                + "    LAST(B.nullField) AS bNull,\n"
-                                + "    C.id AS cid\n"
-                                + "  PATTERN (A B C)\n"
-                                + "  DEFINE\n"
-                                + "    A AS name = 'a' AND nullField IS 
NULL,\n"
-                                + "    B AS name = 'b' AND LAST(A.nullField) 
IS NULL,\n"
-                                + "    C AS name = 'c'\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(1, null, 3, null), Row.of(6, null, 8, 
null));
+
+        final String sql =
+                "SELECT T.aid, T.bNull, T.cid, T.aNull\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    A.id AS aid,\n"
+                        + "    A.nullField AS aNull,\n"
+                        + "    LAST(B.nullField) AS bNull,\n"
+                        + "    C.id AS cid\n"
+                        + "  PATTERN (A B C)\n"
+                        + "  DEFINE\n"
+                        + "    A AS name = 'a' AND nullField IS NULL,\n"
+                        + "    B AS name = 'b' AND LAST(A.nullField) IS 
NULL,\n"
+                        + "    C AS name = 'c'\n"
+                        + ") AS T";
+
+        assertTableResult(sql, Row.of(1, null, 3, null), Row.of(6, null, 8, 
null));
     }
 
     @Test
@@ -288,31 +283,29 @@ class MatchRecognizeITCase {
                                 .column("key2", DataTypes.STRING())
                                 .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT *\n"
-                                + "FROM MyTable\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  PARTITION BY key1, key2\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    A.id AS aid,\n"
-                                + "    A.key1 AS akey1,\n"
-                                + "    LAST(B.id) AS bid,\n"
-                                + "    C.id AS cid,\n"
-                                + "    C.key2 AS ckey2\n"
-                                + "  PATTERN (A B C)\n"
-                                + "  DEFINE\n"
-                                + "    A AS name = 'a' AND key1 LIKE '%key%' 
AND id > 0,\n"
-                                + "    B AS name = 'b' AND LAST(A.name, 2) IS 
NULL,\n"
-                                + "    C AS name = 'c' AND LAST(A.name) = 
'a'\n"
-                                + ") AS T");
-        List<Row> actual = 
CollectionUtil.iteratorToList(tableResult.collect());
-        actual.sort(Comparator.comparing(o -> String.valueOf(o.getField(0))));
-        assertThat(actual)
-                .containsExactly(
-                        Row.of("key1", "second_key3", 1, "key1", 2, 3, 
"second_key3"),
-                        Row.of("key2", "second_key4", 6, "key2", 7, 8, 
"second_key4"));
+        final String sql =
+                "SELECT *\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  PARTITION BY key1, key2\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    A.id AS aid,\n"
+                        + "    A.key1 AS akey1,\n"
+                        + "    LAST(B.id) AS bid,\n"
+                        + "    C.id AS cid,\n"
+                        + "    C.key2 AS ckey2\n"
+                        + "  PATTERN (A B C)\n"
+                        + "  DEFINE\n"
+                        + "    A AS name = 'a' AND key1 LIKE '%key%' AND id > 
0,\n"
+                        + "    B AS name = 'b' AND LAST(A.name, 2) IS NULL,\n"
+                        + "    C AS name = 'c' AND LAST(A.name) = 'a'\n"
+                        + ") AS T";
+
+        assertTableResult(
+                sql,
+                Row.of("key1", "second_key3", 1, "key1", 2, 3, "second_key3"),
+                Row.of("key2", "second_key4", 6, "key2", 7, 8, "second_key4"));
     }
 
     @Test
@@ -413,31 +406,30 @@ class MatchRecognizeITCase {
                                 .column("tax", DataTypes.INT())
                                 .build()));
 
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT *\n"
-                                + "FROM (\n"
-                                + "   SELECT\n"
-                                + "      symbol,\n"
-                                + "      SUM(price) as price,\n"
-                                + "      TUMBLE_ROWTIME(ts, interval '3' 
second) as rowTime,\n"
-                                + "      TUMBLE_START(ts, interval '3' second) 
as startTime\n"
-                                + "   FROM Ticker\n"
-                                + "   GROUP BY symbol, TUMBLE(ts, interval '3' 
second)\n"
-                                + ")\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  PARTITION BY symbol\n"
-                                + "  ORDER BY rowTime\n"
-                                + "  MEASURES\n"
-                                + "    B.price as dPrice,\n"
-                                + "    B.startTime as dTime\n"
-                                + "  ONE ROW PER MATCH\n"
-                                + "  PATTERN (A B)\n"
-                                + "  DEFINE\n"
-                                + "    B AS B.price < A.price\n"
-                                + ")");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of("ACME", 2, now.plusSeconds(3)));
+        final String sql =
+                "SELECT *\n"
+                        + "FROM (\n"
+                        + "   SELECT\n"
+                        + "      symbol,\n"
+                        + "      SUM(price) as price,\n"
+                        + "      TUMBLE_ROWTIME(ts, interval '3' second) as 
rowTime,\n"
+                        + "      TUMBLE_START(ts, interval '3' second) as 
startTime\n"
+                        + "   FROM Ticker\n"
+                        + "   GROUP BY symbol, TUMBLE(ts, interval '3' 
second)\n"
+                        + ")\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  PARTITION BY symbol\n"
+                        + "  ORDER BY rowTime\n"
+                        + "  MEASURES\n"
+                        + "    B.price as dPrice,\n"
+                        + "    B.startTime as dTime\n"
+                        + "  ONE ROW PER MATCH\n"
+                        + "  PATTERN (A B)\n"
+                        + "  DEFINE\n"
+                        + "    B AS B.price < A.price\n"
+                        + ")";
+
+        assertTableResult(sql, Row.of("ACME", 2, now.plusSeconds(3)));
     }
 
     @Test
@@ -467,39 +459,39 @@ class MatchRecognizeITCase {
                                 .column("tax", DataTypes.INT())
                                 .build()));
 
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT\n"
-                                + "  symbol,\n"
-                                + "  SUM(price) as price,\n"
-                                + "  TUMBLE_ROWTIME(matchRowtime, interval '3' 
second) as rowTime,\n"
-                                + "  TUMBLE_START(matchRowtime, interval '3' 
second) as startTime\n"
-                                + "FROM Ticker\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  PARTITION BY symbol\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    A.price as price,\n"
-                                + "    A.tax as tax,\n"
-                                + "    MATCH_ROWTIME() as matchRowtime\n"
-                                + "  ONE ROW PER MATCH\n"
-                                + "  PATTERN (A)\n"
-                                + "  DEFINE\n"
-                                + "    A AS A.price > 0\n"
-                                + ") AS T\n"
-                                + "GROUP BY symbol, TUMBLE(matchRowtime, 
interval '3' second)");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(
-                        Row.of(
-                                "ACME",
-                                3,
-                                LocalDateTime.parse("1970-01-01T00:00:02.999"),
-                                LocalDateTime.parse("1970-01-01T00:00")),
-                        Row.of(
-                                "ACME",
-                                2,
-                                LocalDateTime.parse("1970-01-01T00:00:05.999"),
-                                LocalDateTime.parse("1970-01-01T00:00:03")));
+        final String sql =
+                "SELECT\n"
+                        + "  symbol,\n"
+                        + "  SUM(price) as price,\n"
+                        + "  TUMBLE_ROWTIME(matchRowtime, interval '3' second) 
as rowTime,\n"
+                        + "  TUMBLE_START(matchRowtime, interval '3' second) 
as startTime\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  PARTITION BY symbol\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    A.price as price,\n"
+                        + "    A.tax as tax,\n"
+                        + "    MATCH_ROWTIME() as matchRowtime\n"
+                        + "  ONE ROW PER MATCH\n"
+                        + "  PATTERN (A)\n"
+                        + "  DEFINE\n"
+                        + "    A AS A.price > 0\n"
+                        + ") AS T\n"
+                        + "GROUP BY symbol, TUMBLE(matchRowtime, interval '3' 
second)";
+
+        assertTableResult(
+                sql,
+                Row.of(
+                        "ACME",
+                        3,
+                        LocalDateTime.parse("1970-01-01T00:00:02.999"),
+                        LocalDateTime.parse("1970-01-01T00:00")),
+                Row.of(
+                        "ACME",
+                        2,
+                        LocalDateTime.parse("1970-01-01T00:00:05.999"),
+                        LocalDateTime.parse("1970-01-01T00:00:03")));
     }
 
     @Test
@@ -530,27 +522,27 @@ class MatchRecognizeITCase {
                                 .column("tax", DataTypes.INT())
                                 .columnByExpression("ts", 
"TO_TIMESTAMP_LTZ(tstamp, 3)")
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT *\n"
-                                + "FROM Ticker\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    FIRST(DOWN.tstamp) AS start_tstamp,\n"
-                                + "    LAST(DOWN.tstamp) AS bottom_tstamp,\n"
-                                + "    UP.tstamp AS end_tstamp,\n"
-                                + "    FIRST(DOWN.price + DOWN.tax + 1) AS 
bottom_total,\n"
-                                + "    UP.price + UP.tax AS end_total\n"
-                                + "  ONE ROW PER MATCH\n"
-                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
-                                + "  PATTERN (DOWN{2,} UP)\n"
-                                + "  DEFINE\n"
-                                + "    DOWN AS price < LAST(DOWN.price, 1) OR 
LAST(DOWN.price, 1) IS NULL,\n"
-                                + "    UP AS price < FIRST(DOWN.price)\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(6L, 7L, 8L, 33, 33));
+
+        final String sql =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    FIRST(DOWN.tstamp) AS start_tstamp,\n"
+                        + "    LAST(DOWN.tstamp) AS bottom_tstamp,\n"
+                        + "    UP.tstamp AS end_tstamp,\n"
+                        + "    FIRST(DOWN.price + DOWN.tax + 1) AS 
bottom_total,\n"
+                        + "    UP.price + UP.tax AS end_total\n"
+                        + "  ONE ROW PER MATCH\n"
+                        + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                        + "  PATTERN (DOWN{2,} UP)\n"
+                        + "  DEFINE\n"
+                        + "    DOWN AS price < LAST(DOWN.price, 1) OR 
LAST(DOWN.price, 1) IS NULL,\n"
+                        + "    UP AS price < FIRST(DOWN.price)\n"
+                        + ") AS T";
+
+        assertTableResult(sql, Row.of(6L, 7L, 8L, 33, 33));
     }
 
     @Test
@@ -579,25 +571,24 @@ class MatchRecognizeITCase {
                                 .column("tax", DataTypes.INT())
                                 .build()));
 
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT *\n"
-                                + "FROM Ticker\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  PARTITION BY symbol\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    DOWN.tax AS bottom_tax,\n"
-                                + "    UP.tax AS end_tax\n"
-                                + "  ONE ROW PER MATCH\n"
-                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
-                                + "  PATTERN (DOWN UP)\n"
-                                + "  DEFINE\n"
-                                + "    DOWN AS DOWN.price = 13,\n"
-                                + "    UP AS UP.price = 20\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of("ACME", 3, 4));
+        final String sql =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  PARTITION BY symbol\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    DOWN.tax AS bottom_tax,\n"
+                        + "    UP.tax AS end_tax\n"
+                        + "  ONE ROW PER MATCH\n"
+                        + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                        + "  PATTERN (DOWN UP)\n"
+                        + "  DEFINE\n"
+                        + "    DOWN AS DOWN.price = 13,\n"
+                        + "    UP AS UP.price = 20\n"
+                        + ") AS T";
+
+        assertTableResult(sql, Row.of("ACME", 3, 4));
     }
 
     @Test
@@ -628,38 +619,37 @@ class MatchRecognizeITCase {
                                 .column("price", DataTypes.INT())
                                 .columnByExpression("ts", 
"TO_TIMESTAMP_LTZ(tstamp, 3)")
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT *\n"
-                                + "FROM Ticker\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    FIRST(id, 0) as id0,\n"
-                                + "    FIRST(id, 1) as id1,\n"
-                                + "    FIRST(id, 2) as id2,\n"
-                                + "    FIRST(id, 3) as id3,\n"
-                                + "    FIRST(id, 4) as id4,\n"
-                                + "    FIRST(id, 5) as id5,\n"
-                                + "    FIRST(id, 6) as id6,\n"
-                                + "    FIRST(id, 7) as id7,\n"
-                                + "    LAST(id, 0) as id8,\n"
-                                + "    LAST(id, 1) as id9,\n"
-                                + "    LAST(id, 2) as id10,\n"
-                                + "    LAST(id, 3) as id11,\n"
-                                + "    LAST(id, 4) as id12,\n"
-                                + "    LAST(id, 5) as id13,\n"
-                                + "    LAST(id, 6) as id14,\n"
-                                + "    LAST(id, 7) as id15\n"
-                                + "  ONE ROW PER MATCH\n"
-                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
-                                + "  PATTERN (`DOWN\"`{2,} UP)\n"
-                                + "  DEFINE\n"
-                                + "    `DOWN\"` AS price < LAST(price, 1) OR 
LAST(price, 1) IS NULL,\n"
-                                + "    UP AS price = FIRST(price) AND price > 
FIRST(price, 3) AND price = LAST(price, 7)\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 
3, 2, 1));
+        final String sql =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    FIRST(id, 0) as id0,\n"
+                        + "    FIRST(id, 1) as id1,\n"
+                        + "    FIRST(id, 2) as id2,\n"
+                        + "    FIRST(id, 3) as id3,\n"
+                        + "    FIRST(id, 4) as id4,\n"
+                        + "    FIRST(id, 5) as id5,\n"
+                        + "    FIRST(id, 6) as id6,\n"
+                        + "    FIRST(id, 7) as id7,\n"
+                        + "    LAST(id, 0) as id8,\n"
+                        + "    LAST(id, 1) as id9,\n"
+                        + "    LAST(id, 2) as id10,\n"
+                        + "    LAST(id, 3) as id11,\n"
+                        + "    LAST(id, 4) as id12,\n"
+                        + "    LAST(id, 5) as id13,\n"
+                        + "    LAST(id, 6) as id14,\n"
+                        + "    LAST(id, 7) as id15\n"
+                        + "  ONE ROW PER MATCH\n"
+                        + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                        + "  PATTERN (`DOWN\"`{2,} UP)\n"
+                        + "  DEFINE\n"
+                        + "    `DOWN\"` AS price < LAST(price, 1) OR 
LAST(price, 1) IS NULL,\n"
+                        + "    UP AS price = FIRST(price) AND price > 
FIRST(price, 3) AND price = LAST(price, 7)\n"
+                        + ") AS T";
+
+        assertTableResult(sql, Row.of(1, 2, 3, 4, 5, 6, 7, 8, 8, 7, 6, 5, 4, 
3, 2, 1));
     }
 
     @Test
@@ -686,25 +676,25 @@ class MatchRecognizeITCase {
                                 .column("tax", DataTypes.INT())
                                 .columnByExpression("ts", 
"TO_TIMESTAMP_LTZ(tstamp, 3)")
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT *\n"
-                                + "FROM Ticker\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    FIRST(DOWN.price) as first,\n"
-                                + "    LAST(DOWN.price) as last,\n"
-                                + "    FIRST(DOWN.price, 5) as nullPrice\n"
-                                + "  ONE ROW PER MATCH\n"
-                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
-                                + "  PATTERN (DOWN{2,} UP)\n"
-                                + "  DEFINE\n"
-                                + "    DOWN AS price < LAST(DOWN.price, 1) OR 
LAST(DOWN.price, 1) IS NULL,\n"
-                                + "    UP AS price > LAST(DOWN.price)\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(19, 13, null));
+
+        final String sql =
+                "SELECT *\n"
+                        + "FROM Ticker\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    FIRST(DOWN.price) as first,\n"
+                        + "    LAST(DOWN.price) as last,\n"
+                        + "    FIRST(DOWN.price, 5) as nullPrice\n"
+                        + "  ONE ROW PER MATCH\n"
+                        + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                        + "  PATTERN (DOWN{2,} UP)\n"
+                        + "  DEFINE\n"
+                        + "    DOWN AS price < LAST(DOWN.price, 1) OR 
LAST(DOWN.price, 1) IS NULL,\n"
+                        + "    UP AS price > LAST(DOWN.price)\n"
+                        + ") AS T";
+
+        assertTableResult(sql, Row.of(19, 13, null));
     }
 
     /**
@@ -754,35 +744,36 @@ class MatchRecognizeITCase {
                                 .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
         tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT *\n"
-                                + "FROM MyTable\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    FIRST(id) as startId,\n"
-                                + "    SUM(A.price) AS sumA,\n"
-                                + "    COUNT(D.price) AS countD,\n"
-                                + "    SUM(D.price) as sumD,\n"
-                                + "    weightedAvg(price, weight) as wAvg,\n"
-                                + "    AVG(B.price) AS avgB,\n"
-                                + "    SUM(B.price * B.rate) as sumExprB,\n"
-                                + "    LAST(id) as endId\n"
-                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
-                                + "  PATTERN (A+ B+ C D? E)\n"
-                                + "  DEFINE\n"
-                                + "    A AS SUM(A.price) < 6,\n"
-                                + "    B AS SUM(B.price * B.rate) < 
SUM(A.price) AND\n"
-                                + "      SUM(B.price * B.rate) > 0.2 AND\n"
-                                + "      SUM(B.price) >= 1 AND\n"
-                                + "      AVG(B.price) >= 1 AND\n"
-                                + "      weightedAvg(price, weight) > 1\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(
-                        Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8),
-                        Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12));
+
+        final String sql =
+                "SELECT *\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    FIRST(id) as startId,\n"
+                        + "    SUM(A.price) AS sumA,\n"
+                        + "    COUNT(D.price) AS countD,\n"
+                        + "    SUM(D.price) as sumD,\n"
+                        + "    weightedAvg(price, weight) as wAvg,\n"
+                        + "    AVG(B.price) AS avgB,\n"
+                        + "    SUM(B.price * B.rate) as sumExprB,\n"
+                        + "    LAST(id) as endId\n"
+                        + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                        + "  PATTERN (A+ B+ C D? E)\n"
+                        + "  DEFINE\n"
+                        + "    A AS SUM(A.price) < 6,\n"
+                        + "    B AS SUM(B.price * B.rate) < SUM(A.price) AND\n"
+                        + "      SUM(B.price * B.rate) > 0.2 AND\n"
+                        + "      SUM(B.price) >= 1 AND\n"
+                        + "      AVG(B.price) >= 1 AND\n"
+                        + "      weightedAvg(price, weight) > 1\n"
+                        + ") AS T";
+
+        assertTableResult(
+                sql,
+                Row.of(1, 5, 0L, null, 2L, 3, 3.4D, 8),
+                Row.of(9, 4, 0L, null, 3L, 4, 3.2D, 12));
     }
 
     @Test
@@ -816,27 +807,27 @@ class MatchRecognizeITCase {
                                 .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                                 .build()));
         tEnv.createTemporarySystemFunction("weightedAvg", new WeightedAvg());
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT *\n"
-                                + "FROM MyTable\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY ts\n"
-                                + "  MEASURES\n"
-                                + "    SUM(A.price) as sumA,\n"
-                                + "    COUNT(A.id) as countAId,\n"
-                                + "    COUNT(A.price) as countAPrice,\n"
-                                + "    COUNT(*) as countAll,\n"
-                                + "    COUNT(price) as countAllPrice,\n"
-                                + "    LAST(id) as endId\n"
-                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
-                                + "  PATTERN (A+ C)\n"
-                                + "  DEFINE\n"
-                                + "    A AS SUM(A.price) < 30,\n"
-                                + "    C AS C.name = 'c'\n"
-                                + ") AS T");
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(29, 7L, 5L, 8L, 6L, 8));
+
+        final String sql =
+                "SELECT *\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY ts\n"
+                        + "  MEASURES\n"
+                        + "    SUM(A.price) as sumA,\n"
+                        + "    COUNT(A.id) as countAId,\n"
+                        + "    COUNT(A.price) as countAPrice,\n"
+                        + "    COUNT(*) as countAll,\n"
+                        + "    COUNT(price) as countAllPrice,\n"
+                        + "    LAST(id) as endId\n"
+                        + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                        + "  PATTERN (A+ C)\n"
+                        + "  DEFINE\n"
+                        + "    A AS SUM(A.price) < 30,\n"
+                        + "    C AS C.name = 'c'\n"
+                        + ") AS T";
+
+        assertTableResult(sql, Row.of(29, 7L, 5L, 8L, 6L, 8));
     }
 
     @Test
@@ -851,21 +842,21 @@ class MatchRecognizeITCase {
                                 .column("name", DataTypes.STRING())
                                 .columnByExpression("proctime", "PROCTIME()")
                                 .build()));
-        TableResult tableResult =
-                tEnv.executeSql(
-                        "SELECT T.aid\n"
-                                + "FROM MyTable\n"
-                                + "MATCH_RECOGNIZE (\n"
-                                + "  ORDER BY proctime\n"
-                                + "  MEASURES\n"
-                                + "    A.id AS aid,\n"
-                                + "    A.proctime AS aProctime,\n"
-                                + "    LAST(A.proctime + INTERVAL '1' second) 
as calculatedField\n"
-                                + "  PATTERN (A)\n"
-                                + "  DEFINE\n"
-                                + "    A AS proctime >= (CURRENT_TIMESTAMP - 
INTERVAL '1' day)\n"
-                                + ") AS T");
-        
assertThat(CollectionUtil.iteratorToList(tableResult.collect())).containsExactly(Row.of(1));
+
+        final String sql =
+                "SELECT T.aid\n"
+                        + "FROM MyTable\n"
+                        + "MATCH_RECOGNIZE (\n"
+                        + "  ORDER BY proctime\n"
+                        + "  MEASURES\n"
+                        + "    A.id AS aid,\n"
+                        + "    A.proctime AS aProctime,\n"
+                        + "    LAST(A.proctime + INTERVAL '1' second) as 
calculatedField\n"
+                        + "  PATTERN (A)\n"
+                        + "  DEFINE\n"
+                        + "    A AS proctime >= (CURRENT_TIMESTAMP - INTERVAL 
'1' day)\n"
+                        + ") AS T";
+        assertTableResult(sql, Row.of(1));
     }
 
     @Test
@@ -906,29 +897,28 @@ class MatchRecognizeITCase {
         jobParameters.setString("prefix", prefix);
         jobParameters.setString("start", Integer.toString(startFrom));
         env.getConfig().setGlobalJobParameters(jobParameters);
-        TableResult tableResult =
-                tEnv.executeSql(
-                        String.format(
-                                "SELECT *\n"
-                                        + "FROM MyTable\n"
-                                        + "MATCH_RECOGNIZE (\n"
-                                        + "  ORDER BY ts\n"
-                                        + "  MEASURES\n"
-                                        + "    FIRST(id) as firstId,\n"
-                                        + "    prefix(A.name) as 
prefixedNameA,\n"
-                                        + "    countFrom(A.price) as 
countFromA,\n"
-                                        + "    LAST(id) as lastId\n"
-                                        + "  AFTER MATCH SKIP PAST LAST ROW\n"
-                                        + "  PATTERN (A+ C)\n"
-                                        + "  DEFINE\n"
-                                        + "    A AS prefix(A.name) = '%s:a' 
AND countFrom(A.price) <= %d\n"
-                                        + ") AS T",
-                                prefix, 4 + 4));
-        assertThat(CollectionUtil.iteratorToList(tableResult.collect()))
-                .containsExactly(Row.of(1, "PREF:a", 8, 5), Row.of(7, 
"PREF:a", 6, 9));
+        String sql =
+                String.format(
+                        "SELECT *\n"
+                                + "FROM MyTable\n"
+                                + "MATCH_RECOGNIZE (\n"
+                                + "  ORDER BY ts\n"
+                                + "  MEASURES\n"
+                                + "    FIRST(id) as firstId,\n"
+                                + "    prefix(A.name) as prefixedNameA,\n"
+                                + "    countFrom(A.price) as countFromA,\n"
+                                + "    LAST(id) as lastId\n"
+                                + "  AFTER MATCH SKIP PAST LAST ROW\n"
+                                + "  PATTERN (A+ C)\n"
+                                + "  DEFINE\n"
+                                + "    A AS prefix(A.name) = '%s:a' AND 
countFrom(A.price) <= %d\n"
+                                + ") AS T",
+                        prefix, 4 + 4);
+
+        assertTableResult(sql, Row.of(1, "PREF:a", 8, 5), Row.of(7, "PREF:a", 
6, 9));
     }
 
-    /** Test prefixing function.. */
+    /** Test prefixing function... */
     public static class PrefixingScalarFunc extends ScalarFunction {
 
         private String prefix = "ERROR_VALUE";
@@ -981,4 +971,17 @@ class MatchRecognizeITCase {
             countAcc.count += value;
         }
     }
+
+    private void assertTableResult(String sql, Row... expected) {
+        TableResult tableResult = tEnv.executeSql(sql);
+        
assertThat(CollectionUtil.iteratorToList(tableResult.collect())).containsExactly(expected);
+
+        // Also check that same query is able to compile and return same 
result if it is used in
+        // view
+        // test cases for FLINK-39293
+        tEnv.executeSql("CREATE VIEW test_view AS \n" + sql);
+        TableResult tableResultWithView = tEnv.executeSql("SELECT * FROM 
test_view");
+        
assertThat(CollectionUtil.iteratorToList(tableResultWithView.collect()))
+                .containsExactly(expected);
+    }
 }

Reply via email to