This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c0a4768e141 [FLINK-39304][table] Support `START_MODE` clause in the
parser
c0a4768e141 is described below
commit c0a4768e141ab30ba44c14e637e63858729db4a6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 27 22:22:04 2026 +0100
[FLINK-39304][table] Support `START_MODE` clause in the parser
---
.../src/main/codegen/data/Parser.tdd | 17 +++
.../src/main/codegen/includes/parserImpls.ftl | 109 ++++++++++++++++---
.../org/apache/flink/sql/parser/SqlParseUtils.java | 32 ++++++
.../apache/flink/sql/parser/SqlUnparseUtils.java | 11 ++
.../SqlCreateMaterializedTable.java | 10 ++
.../SqlCreateOrAlterMaterializedTable.java | 2 +
.../parser/ddl/materializedtable/SqlStartMode.java | 115 +++++++++++++++++++++
.../flink/sql/parser/utils/ParserResource.java | 4 +
.../MaterializedTableStatementParserTest.java | 76 ++++++++++++++
9 files changed, 364 insertions(+), 12 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 92487c2ab36..74339361d79 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -69,6 +69,8 @@
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableModifySchema"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMaterializedTable"
"org.apache.flink.sql.parser.ddl.materializedtable.SqlDropMaterializedTable"
+ "org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode"
+
"org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode.SqlStartModeKind"
"org.apache.flink.sql.parser.ddl.model.SqlAlterModel"
"org.apache.flink.sql.parser.ddl.model.SqlAlterModelRename"
"org.apache.flink.sql.parser.ddl.model.SqlAlterModelReset"
@@ -183,6 +185,7 @@
"org.apache.calcite.sql.SqlDrop"
"org.apache.calcite.sql.SqlExplicitModelOperator"
"org.apache.calcite.sql.SqlIntervalLiteral"
+ "org.apache.calcite.runtime.Resources"
"java.util.ArrayList"
"java.util.Collections"
"java.util.HashSet"
@@ -217,6 +220,9 @@
"ENFORCED"
"ESTIMATED_COST"
"EXTENDED"
+ "FROM_BEGINNING"
+ "FROM_NOW"
+ "FROM_TIMESTAMP"
"FUNCTIONS"
"FRESHNESS"
"HASH"
@@ -245,7 +251,11 @@
"REFRESH_MODE"
"REMOVE"
"RENAME"
+ "RESUME_OR_FROM_BEGINNING"
+ "RESUME_OR_FROM_NOW"
+ "RESUME_OR_FROM_TIMESTAMP"
"SCALA"
+ "START_MODE"
"STATISTICS"
"STOP"
"STRING"
@@ -360,6 +370,9 @@
"FORTRAN"
"FOUND"
"FRAC_SECOND"
+ "FROM_BEGINNING"
+ "FROM_NOW"
+ "FROM_TIMESTAMP"
"G"
"GENERAL"
"GENERATED"
@@ -469,6 +482,9 @@
"RETURNED_OCTET_LENGTH"
"RETURNED_SQLSTATE"
"RETURNING"
+ "RESUME_OR_FROM_BEGINNING"
+ "RESUME_OR_FROM_NOW"
+ "RESUME_OR_FROM_TIMESTAMP"
"ROLE"
"ROUTINE"
"ROUTINE_CATALOG"
@@ -546,6 +562,7 @@
"SQL_TSI_YEAR"
"SQL_VARBINARY"
"SQL_VARCHAR"
+ "START_MODE"
"STATE"
"STOP"
"STRUCTURE"
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 7b081a38f31..5944f533616 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1912,6 +1912,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s,
boolean replace, boolean isT
SqlParserPos pos = startPos;
boolean isColumnsIdentifiersOnly = false;
boolean isOrAlter = false;
+ SqlStartMode startMode = null;
}
{
[
@@ -1964,15 +1965,73 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s,
boolean replace, boolean isT
propertyList = Properties()
]
[
- <FRESHNESS> <EQ>
- freshness = Expression(ExprContext.ACCEPT_NON_QUERY)
+ <START_MODE> <EQ>
{
- if (!(freshness instanceof SqlIntervalLiteral))
+ SqlLiteral startModeKindLiteral = null;
+ SqlLiteral startModeLiteral = null;
+ }
+ (
+ <FROM_BEGINNING>
{
- throw SqlUtil.newContextException(
- getPos(),
- ParserResource.RESOURCE.unsupportedFreshnessType());
+ startModeKindLiteral =
SqlStartModeKind.FROM_BEGINNING.symbol(getPos());
+ }
+ |
+ <FROM_NOW>
+ {
+ startModeKindLiteral =
SqlStartModeKind.FROM_NOW.symbol(getPos());
+ }
+ [
+ <LPAREN>
+ {
+ startModeLiteral =
IntervalLiteralOrError(Expression(ExprContext.ACCEPT_NON_QUERY),
+ ParserResource.RESOURCE.unsupportedStartModeType());
+ }
+ <RPAREN>
+ ]
+ |
+ <FROM_TIMESTAMP>
+ {
+ startModeKindLiteral =
SqlStartModeKind.FROM_TIMESTAMP.symbol(getPos());
+ }
+ <LPAREN>
+ startModeLiteral = TimestampLiteral()
+ <RPAREN>
+ |
+ <RESUME_OR_FROM_BEGINNING>
+ {
+ startModeKindLiteral =
SqlStartModeKind.RESUME_OR_FROM_BEGINNING.symbol(getPos());
+ }
+ |
+ <RESUME_OR_FROM_NOW>
+ {
+ startModeKindLiteral =
SqlStartModeKind.RESUME_OR_FROM_NOW.symbol(getPos());
}
+ [
+ <LPAREN>
+ {
+ startModeLiteral =
IntervalLiteralOrError(Expression(ExprContext.ACCEPT_NON_QUERY),
+ ParserResource.RESOURCE.unsupportedStartModeType());
+ }
+ <RPAREN>
+ ]
+ |
+ <RESUME_OR_FROM_TIMESTAMP>
+ {
+ startModeKindLiteral =
SqlStartModeKind.RESUME_OR_FROM_TIMESTAMP.symbol(getPos());
+ }
+ <LPAREN>
+ startModeLiteral = TimestampLiteral()
+ <RPAREN>
+ )
+ {
+ startMode = new SqlStartMode(startModeKindLiteral,
startModeLiteral, getPos());
+ }
+ ]
+ [
+ <FRESHNESS> <EQ>
+ {
+ freshness =
IntervalLiteralOrError(Expression(ExprContext.ACCEPT_NON_QUERY),
+ ParserResource.RESOURCE.unsupportedFreshnessType());
}
]
[
@@ -2004,6 +2063,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s,
boolean replace, boolean isT
propertyList,
(SqlIntervalLiteral) freshness,
refreshMode,
+ startMode,
asQuery,
isOrAlter);
}
@@ -2085,12 +2145,9 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
|
<SET>
(
- <FRESHNESS> <EQ> freshness =
Expression(ExprContext.ACCEPT_NON_QUERY) {
- if (!(freshness instanceof SqlIntervalLiteral)) {
- throw SqlUtil.newContextException(
- getPos(),
- ParserResource.RESOURCE.unsupportedFreshnessType());
- }
+ <FRESHNESS> <EQ> {
+ freshness =
IntervalLiteralOrError(Expression(ExprContext.ACCEPT_NON_QUERY),
+ ParserResource.RESOURCE.unsupportedFreshnessType());
return new SqlAlterMaterializedTableFreshness(
startPos.plus(getPos()),
tableIdentifier,
@@ -3847,3 +3904,31 @@ SqlCharStringLiteral Comment() :
return SqlLiteral.createCharString(p, getPos());
}
}
+
+SqlIntervalLiteral IntervalLiteralOrError(SqlNode expression, Resources.ExInst
error) :
+{
+}
+{
+ {
+ if (expression instanceof SqlIntervalLiteral) {
+ return (SqlIntervalLiteral) expression;
+ }
+ throw SqlUtil.newContextException(getPos(), error);
+ }
+}
+
+SqlLiteral TimestampLiteral() :
+{
+ final String p;
+ final Span s;
+}
+{
+ LOOKAHEAD(2)
+ <TIMESTAMP> { s = span(); } p = SimpleStringLiteral() {
+ return SqlLiteral.createUnknown("TIMESTAMP", p, s.end(this));
+ }
+ |
+ <TIMESTAMP> { s = span(); } <WITH> <LOCAL> <TIME> <ZONE> p =
SimpleStringLiteral() {
+ return SqlLiteral.createUnknown("TIMESTAMP WITH LOCAL TIME ZONE",
p, s.end(this));
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
index e115a0bc6c9..f710410e3cd 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -115,4 +116,35 @@ public class SqlParseUtils {
}
return
sqlNodeList.getList().stream().map(mapper).collect(Collectors.toSet());
}
+
+ @Nullable
+ public static <T extends Enum<T>> T extractEnum(
+ @Nullable SqlLiteral literal, Class<T> enumClass) {
+ if (literal == null) {
+ return null;
+ }
+
+ final String value = literal.toValue();
+ if (value == null) {
+ return null;
+ }
+
+ try {
+ return Enum.valueOf(enumClass, value);
+ } catch (IllegalArgumentException e) {
+ throw new RuntimeException(
+ String.format(
+ "Invalid value '%s' for enum %s. Valid values are:
%s",
+ value,
+ enumClass.getSimpleName(),
+ String.join(", ", getEnumValues(enumClass))),
+ e);
+ }
+ }
+
+ private static <T extends Enum<T>> List<String> getEnumValues(Class<T>
enumClass) {
+ return Arrays.stream(enumClass.getEnumConstants())
+ .map(Enum::name)
+ .collect(Collectors.toList());
+ }
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
index c99f836786f..619f7a5e932 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.ddl.SqlDistribution;
import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
import org.apache.flink.sql.parser.ddl.SqlWatermark;
import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlIntervalLiteral;
@@ -116,6 +117,16 @@ public class SqlUnparseUtils {
freshness.unparse(writer, leftPrec, rightPrec);
}
+ public static void unparseStartMode(SqlStartMode sqlStartMode, SqlWriter
writer) {
+ if (sqlStartMode == null) {
+ return;
+ }
+ writer.newlineAndIndent();
+ writer.keyword("START_MODE");
+ writer.keyword("=");
+ writer.keyword(sqlStartMode.toString());
+ }
+
public static void unparseRefreshMode(SqlRefreshMode refreshMode,
SqlWriter writer) {
if (refreshMode == null) {
return;
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
index 2c36b861d19..33bb12c06c2 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
@@ -67,6 +67,8 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
private final @Nullable SqlRefreshMode refreshMode;
+ private final @Nullable SqlStartMode startMode;
+
private final SqlNode asQuery;
public SqlCreateMaterializedTable(
@@ -82,6 +84,7 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
SqlNodeList propertyList,
@Nullable SqlIntervalLiteral freshness,
@Nullable SqlRefreshMode refreshMode,
+ @Nullable SqlStartMode startMode,
SqlNode asQuery) {
super(operator, pos, tableName, false, false, false, propertyList,
comment);
this.columnList = columnList;
@@ -93,6 +96,7 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
requireNonNull(propertyList, "propertyList should not be null");
this.freshness = freshness;
this.refreshMode = refreshMode;
+ this.startMode = startMode;
this.asQuery = requireNonNull(asQuery, "asQuery should not be null");
}
@@ -136,6 +140,11 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
return refreshMode;
}
+ @Nullable
+ public SqlStartMode getStartMode() {
+ return startMode;
+ }
+
public SqlNode getAsQuery() {
return asQuery;
}
@@ -173,6 +182,7 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
SqlUnparseUtils.unparseDistribution(distribution, writer, leftPrec,
rightPrec);
SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer,
leftPrec, rightPrec);
SqlUnparseUtils.unparseProperties(properties, writer, leftPrec,
rightPrec);
+ SqlUnparseUtils.unparseStartMode(startMode, writer);
SqlUnparseUtils.unparseFreshness(freshness, true, writer, leftPrec,
rightPrec);
SqlUnparseUtils.unparseRefreshMode(refreshMode, writer);
SqlUnparseUtils.unparseAsQuery(asQuery, writer, leftPrec, rightPrec);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
index e41961b2687..b1859ba6114 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
@@ -55,6 +55,7 @@ public class SqlCreateOrAlterMaterializedTable extends
SqlCreateMaterializedTabl
SqlNodeList propertyList,
@Nullable SqlIntervalLiteral freshness,
@Nullable SqlRefreshMode refreshMode,
+ @Nullable SqlStartMode startMode,
SqlNode asQuery,
boolean isOrAlter) {
super(
@@ -70,6 +71,7 @@ public class SqlCreateOrAlterMaterializedTable extends
SqlCreateMaterializedTabl
propertyList,
freshness,
refreshMode,
+ startMode,
asQuery);
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlStartMode.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlStartMode.java
new file mode 100644
index 00000000000..a8507b940b9
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlStartMode.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.SqlParseUtils;
+
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlTimestampLiteral;
+import org.apache.calcite.sql.SqlUnknownLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import javax.annotation.Nullable;
+
+/** Start mode configuration for materialized tables. */
+@Internal
+public class SqlStartMode {
+ private final SqlStartModeKind kind;
+ private final @Nullable SqlTimestampLiteral timestampLiteral;
+ private final @Nullable SqlIntervalLiteral intervalLiteral;
+
+ public SqlStartModeKind getKind() {
+ return kind;
+ }
+
+ @Nullable
+ public SqlTimestampLiteral getTimestampLiteral() {
+ return timestampLiteral;
+ }
+
+ @Nullable
+ public SqlIntervalLiteral getIntervalLiteral() {
+ return intervalLiteral;
+ }
+
+ public enum SqlStartModeKind {
+ FROM_BEGINNING,
+ FROM_NOW,
+ FROM_TIMESTAMP,
+ RESUME_OR_FROM_BEGINNING,
+ RESUME_OR_FROM_NOW,
+ RESUME_OR_FROM_TIMESTAMP;
+
+ /**
+ * Creates a parse-tree node representing an occurrence of this
keyword at a particular
+ * position in the parsed text.
+ */
+ public SqlLiteral symbol(SqlParserPos pos) {
+ return SqlLiteral.createSymbol(this, pos);
+ }
+
+ public boolean isFromNow() {
+ return this == FROM_NOW || this == RESUME_OR_FROM_NOW;
+ }
+
+ public boolean isFromTimestamp() {
+ return this == FROM_TIMESTAMP || this == RESUME_OR_FROM_TIMESTAMP;
+ }
+
+ public boolean isCreateOrAlterOption() {
+ return this == RESUME_OR_FROM_BEGINNING
+ || this == RESUME_OR_FROM_NOW
+ || this == RESUME_OR_FROM_TIMESTAMP;
+ }
+ }
+
+ public SqlStartMode(SqlLiteral kind, @Nullable SqlLiteral literal,
SqlParserPos pos) {
+ this.kind = SqlParseUtils.extractEnum(kind, SqlStartModeKind.class);
+ if (literal == null) {
+ this.intervalLiteral = null;
+ this.timestampLiteral = null;
+ } else if (literal instanceof SqlIntervalLiteral) {
+ this.intervalLiteral = (SqlIntervalLiteral) literal;
+ this.timestampLiteral = null;
+ } else {
+ this.intervalLiteral = null;
+
+ final SqlUnknownLiteral unknownLiteral = (SqlUnknownLiteral)
literal;
+ this.timestampLiteral =
+ (SqlTimestampLiteral)
+
unknownLiteral.resolve(SqlTypeName.lookup(unknownLiteral.tag));
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(kind.name());
+
+ if (kind.isFromNow() && intervalLiteral != null) {
+ sb.append("(").append(intervalLiteral).append(")");
+ } else if (kind.isFromTimestamp() && timestampLiteral != null) {
+ sb.append("(").append(timestampLiteral.toString().replace("_", "
")).append(")");
+ }
+ return sb.toString();
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 323fe5da0aa..5e1672865a4 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -71,6 +71,10 @@ public interface ParserResource {
"MATERIALIZED TABLE only supports define interval type FRESHNESS,
please refer to the materialized table document.")
Resources.ExInst<ParseException> unsupportedFreshnessType();
+ @Resources.BaseMessage(
+ "START_MODE literal must be an interval for FROM_NOW and
RESUME_OR_FROM_NOW modes.")
+ Resources.ExInst<ParseException> unsupportedStartModeType();
+
@Resources.BaseMessage("CREATE TEMPORARY MATERIALIZED TABLE is not
supported.")
Resources.ExInst<ParseException>
createTemporaryMaterializedTableUnsupported();
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
index 1ea1855e223..559d4c320d5 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -665,6 +665,82 @@ class MaterializedTableStatementParserTest {
sql(sql).fails("DROP TEMPORARY MATERIALIZED TABLE is not supported.");
}
+ @Test
+ void testStartMode() {
+ final String sql1 =
+ "create materialized table tbl1 start_mode = from_beginning as
select * from t";
+ sql(sql1)
+ .ok(
+ "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE =
FROM_BEGINNING\nAS\nSELECT *\nFROM `T`");
+
+ final String sql2 =
+ "create materialized table tbl1 start_mode = from_now as
select * from t";
+ sql(sql2)
+ .ok(
+ "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE =
FROM_NOW\nAS\nSELECT *\nFROM `T`");
+
+ final String sql3 =
+ "create materialized table tbl1 start_mode = from_now(INTERVAL
'1' HOUR) as select * from t";
+ sql(sql3)
+ .ok(
+ "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE =
FROM_NOW(INTERVAL '1' HOUR)\nAS\nSELECT *\nFROM `T`");
+
+ final String sql4 =
+ "create materialized table tbl1 start_mode =
from_timestamp(TIMESTAMP '2023-04-22 21:37:58') as select * from t";
+ sql(sql4)
+ .ok(
+ "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE =
FROM_TIMESTAMP(TIMESTAMP '2023-04-22 21:37:58')\nAS\nSELECT *\nFROM `T`");
+
+ final String sql5 =
+ // allowed on parser level, in operation there is a validation
for CREATE OR ALTER
+ // support only
+ "create materialized table tbl1 start_mode =
resume_or_from_beginning as select * from t";
+ sql(sql5)
+ .ok(
+ "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE =
RESUME_OR_FROM_BEGINNING\nAS\nSELECT *\nFROM `T`");
+
+ final String sql6 =
+ // allowed on parser level, in operation there is a validation
for CREATE OR ALTER
+ // support only
+ "create materialized table tbl1 start_mode =
resume_or_from_now as select * from t";
+ sql(sql6)
+ .ok(
+ "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE =
RESUME_OR_FROM_NOW\nAS\nSELECT *\nFROM `T`");
+
+ final String sql7 =
+ // allowed on parser level, in operation there is a validation
for CREATE OR ALTER
+ // support only
+ "create materialized table tbl1 start_mode =
resume_or_from_now(interval '2' minutes) as select * from t";
+ sql(sql7)
+ .ok(
+ "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE =
RESUME_OR_FROM_NOW(INTERVAL '2' MINUTE)\nAS\nSELECT *\nFROM `T`");
+
+ final String sql8 =
+ // allowed on parser level, in operation there is a validation
for CREATE OR ALTER
+ // support only
+ "create materialized table tbl1 with ('format' = 'json')
start_mode = resume_or_from_timestamp(TIMESTAMP WITH LOCAL TIME ZONE
'2023-04-22 21:37:58') as select * from t";
+ sql(sql8)
+ .ok(
+ "CREATE MATERIALIZED TABLE `TBL1`\nWITH (\n"
+ + " 'format' = 'json'\n"
+ + ")\nSTART_MODE =
RESUME_OR_FROM_TIMESTAMP(TIMESTAMP WITH LOCAL TIME ZONE '2023-04-22
21:37:58')\nAS\nSELECT *\nFROM `T`");
+
+ final String sql9 =
+ "create materialized table tbl1 start_mode =
from_now(timestamp ^'2023-04-22 21:37:58'^) as select * from t";
+ sql(sql9)
+ .fails(
+ "START_MODE literal must be an interval for FROM_NOW
and RESUME_OR_FROM_NOW modes.");
+
+ final String sql10 =
+ "create materialized table tbl1 start_mode =
resume_or_from_timestamp(^interval^ '2' minutes) as select * from t";
+ sql(sql10)
+ .fails(
+ "Encountered \"interval\" at line 1, column 70.\n"
+ + "Was expecting:\n"
+ + " \"TIMESTAMP\" ...\n"
+ + " ");
+ }
+
public SqlParserFixture fixture() {
return SqlParserFixture.DEFAULT.withConfig(
c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY));