This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c0a4768e141 [FLINK-39304][table] Support `START_MODE` clause in the 
parser
c0a4768e141 is described below

commit c0a4768e141ab30ba44c14e637e63858729db4a6
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 27 22:22:04 2026 +0100

    [FLINK-39304][table] Support `START_MODE` clause in the parser
---
 .../src/main/codegen/data/Parser.tdd               |  17 +++
 .../src/main/codegen/includes/parserImpls.ftl      | 109 ++++++++++++++++---
 .../org/apache/flink/sql/parser/SqlParseUtils.java |  32 ++++++
 .../apache/flink/sql/parser/SqlUnparseUtils.java   |  11 ++
 .../SqlCreateMaterializedTable.java                |  10 ++
 .../SqlCreateOrAlterMaterializedTable.java         |   2 +
 .../parser/ddl/materializedtable/SqlStartMode.java | 115 +++++++++++++++++++++
 .../flink/sql/parser/utils/ParserResource.java     |   4 +
 .../MaterializedTableStatementParserTest.java      |  76 ++++++++++++++
 9 files changed, 364 insertions(+), 12 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 92487c2ab36..74339361d79 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -69,6 +69,8 @@
     
"org.apache.flink.sql.parser.ddl.materializedtable.SqlAlterMaterializedTableSchema.SqlAlterMaterializedTableModifySchema"
     
"org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMaterializedTable"
     
"org.apache.flink.sql.parser.ddl.materializedtable.SqlDropMaterializedTable"
+    "org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode"
+    
"org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode.SqlStartModeKind"
     "org.apache.flink.sql.parser.ddl.model.SqlAlterModel"
     "org.apache.flink.sql.parser.ddl.model.SqlAlterModelRename"
     "org.apache.flink.sql.parser.ddl.model.SqlAlterModelReset"
@@ -183,6 +185,7 @@
     "org.apache.calcite.sql.SqlDrop"
     "org.apache.calcite.sql.SqlExplicitModelOperator"
     "org.apache.calcite.sql.SqlIntervalLiteral"
+    "org.apache.calcite.runtime.Resources"
     "java.util.ArrayList"
     "java.util.Collections"
     "java.util.HashSet"
@@ -217,6 +220,9 @@
     "ENFORCED"
     "ESTIMATED_COST"
     "EXTENDED"
+    "FROM_BEGINNING"
+    "FROM_NOW"
+    "FROM_TIMESTAMP"
     "FUNCTIONS"
     "FRESHNESS"
     "HASH"
@@ -245,7 +251,11 @@
     "REFRESH_MODE"
     "REMOVE"
     "RENAME"
+    "RESUME_OR_FROM_BEGINNING"
+    "RESUME_OR_FROM_NOW"
+    "RESUME_OR_FROM_TIMESTAMP"
     "SCALA"
+    "START_MODE"
     "STATISTICS"
     "STOP"
     "STRING"
@@ -360,6 +370,9 @@
     "FORTRAN"
     "FOUND"
     "FRAC_SECOND"
+    "FROM_BEGINNING"
+    "FROM_NOW"
+    "FROM_TIMESTAMP"
     "G"
     "GENERAL"
     "GENERATED"
@@ -469,6 +482,9 @@
     "RETURNED_OCTET_LENGTH"
     "RETURNED_SQLSTATE"
     "RETURNING"
+    "RESUME_OR_FROM_BEGINNING"
+    "RESUME_OR_FROM_NOW"
+    "RESUME_OR_FROM_TIMESTAMP"
     "ROLE"
     "ROUTINE"
     "ROUTINE_CATALOG"
@@ -546,6 +562,7 @@
     "SQL_TSI_YEAR"
     "SQL_VARBINARY"
     "SQL_VARCHAR"
+    "START_MODE"
     "STATE"
     "STOP"
     "STRUCTURE"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 7b081a38f31..5944f533616 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1912,6 +1912,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, 
boolean replace, boolean isT
     SqlParserPos pos = startPos;
     boolean isColumnsIdentifiersOnly = false;
     boolean isOrAlter = false;
+    SqlStartMode startMode = null;
 }
 {
     [
@@ -1964,15 +1965,73 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, 
boolean replace, boolean isT
         propertyList = Properties()
     ]
     [
-        <FRESHNESS> <EQ>
-        freshness = Expression(ExprContext.ACCEPT_NON_QUERY)
+        <START_MODE> <EQ>
         {
-            if (!(freshness instanceof SqlIntervalLiteral))
+            SqlLiteral startModeKindLiteral = null;
+            SqlLiteral startModeLiteral = null;
+        }
+        (
+            <FROM_BEGINNING>
             {
-                throw SqlUtil.newContextException(
-                getPos(),
-                ParserResource.RESOURCE.unsupportedFreshnessType());
+                startModeKindLiteral = 
SqlStartModeKind.FROM_BEGINNING.symbol(getPos());
+            }
+            |
+            <FROM_NOW>
+            {
+                startModeKindLiteral = 
SqlStartModeKind.FROM_NOW.symbol(getPos());
+            }
+            [
+                <LPAREN>
+                {
+                    startModeLiteral = 
IntervalLiteralOrError(Expression(ExprContext.ACCEPT_NON_QUERY),
+                        ParserResource.RESOURCE.unsupportedStartModeType());
+                }
+                <RPAREN>
+            ]
+            |
+            <FROM_TIMESTAMP>
+            {
+                startModeKindLiteral = 
SqlStartModeKind.FROM_TIMESTAMP.symbol(getPos());
+            }
+            <LPAREN>
+                startModeLiteral = TimestampLiteral()
+            <RPAREN>
+            |
+            <RESUME_OR_FROM_BEGINNING>
+            {
+                startModeKindLiteral = 
SqlStartModeKind.RESUME_OR_FROM_BEGINNING.symbol(getPos());
+            }
+            |
+            <RESUME_OR_FROM_NOW>
+            {
+                startModeKindLiteral = 
SqlStartModeKind.RESUME_OR_FROM_NOW.symbol(getPos());
             }
+            [
+                <LPAREN>
+                {
+                    startModeLiteral = 
IntervalLiteralOrError(Expression(ExprContext.ACCEPT_NON_QUERY),
+                        ParserResource.RESOURCE.unsupportedStartModeType());
+                }
+                <RPAREN>
+            ]
+            |
+            <RESUME_OR_FROM_TIMESTAMP>
+            {
+                startModeKindLiteral = 
SqlStartModeKind.RESUME_OR_FROM_TIMESTAMP.symbol(getPos());
+            }
+            <LPAREN>
+                startModeLiteral = TimestampLiteral()
+            <RPAREN>
+        )
+        {
+            startMode = new SqlStartMode(startModeKindLiteral, 
startModeLiteral, getPos());
+        }
+    ]
+    [
+        <FRESHNESS> <EQ>
+        {
+            freshness = 
IntervalLiteralOrError(Expression(ExprContext.ACCEPT_NON_QUERY),
+            ParserResource.RESOURCE.unsupportedFreshnessType());
         }
     ]
     [
@@ -2004,6 +2063,7 @@ SqlCreate SqlCreateOrAlterMaterializedTable(Span s, 
boolean replace, boolean isT
             propertyList,
             (SqlIntervalLiteral) freshness,
             refreshMode,
+            startMode,
             asQuery,
             isOrAlter);
     }
@@ -2085,12 +2145,9 @@ SqlAlterMaterializedTable SqlAlterMaterializedTable() :
         |
         <SET>
         (
-            <FRESHNESS> <EQ> freshness = 
Expression(ExprContext.ACCEPT_NON_QUERY) {
-                if (!(freshness instanceof SqlIntervalLiteral)) {
-                    throw SqlUtil.newContextException(
-                        getPos(),
-                        ParserResource.RESOURCE.unsupportedFreshnessType());
-                }
+            <FRESHNESS> <EQ>  {
+                freshness = 
IntervalLiteralOrError(Expression(ExprContext.ACCEPT_NON_QUERY),
+                    ParserResource.RESOURCE.unsupportedFreshnessType());
                 return new SqlAlterMaterializedTableFreshness(
                     startPos.plus(getPos()),
                     tableIdentifier,
@@ -3847,3 +3904,31 @@ SqlCharStringLiteral Comment() :
         return SqlLiteral.createCharString(p, getPos());
     }
 }
+
+SqlIntervalLiteral IntervalLiteralOrError(SqlNode expression, Resources.ExInst 
error) :
+{
+}
+{
+    {
+        if (expression instanceof SqlIntervalLiteral) {
+           return (SqlIntervalLiteral) expression;
+        }
+        throw SqlUtil.newContextException(getPos(), error);
+    }
+}
+
+SqlLiteral TimestampLiteral() :
+{
+        final String p;
+        final Span s;
+}
+{
+        LOOKAHEAD(2)
+        <TIMESTAMP> { s = span(); } p = SimpleStringLiteral() {
+            return SqlLiteral.createUnknown("TIMESTAMP", p, s.end(this));
+        }
+        |
+        <TIMESTAMP> { s = span(); } <WITH> <LOCAL> <TIME> <ZONE> p = 
SimpleStringLiteral() {
+            return SqlLiteral.createUnknown("TIMESTAMP WITH LOCAL TIME ZONE", 
p, s.end(this));
+        }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
index e115a0bc6c9..f710410e3cd 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -115,4 +116,35 @@ public class SqlParseUtils {
         }
         return 
sqlNodeList.getList().stream().map(mapper).collect(Collectors.toSet());
     }
+
+    @Nullable
+    public static <T extends Enum<T>> T extractEnum(
+            @Nullable SqlLiteral literal, Class<T> enumClass) {
+        if (literal == null) {
+            return null;
+        }
+
+        final String value = literal.toValue();
+        if (value == null) {
+            return null;
+        }
+
+        try {
+            return Enum.valueOf(enumClass, value);
+        } catch (IllegalArgumentException e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Invalid value '%s' for enum %s. Valid values are: 
%s",
+                            value,
+                            enumClass.getSimpleName(),
+                            String.join(", ", getEnumValues(enumClass))),
+                    e);
+        }
+    }
+
+    private static <T extends Enum<T>> List<String> getEnumValues(Class<T> 
enumClass) {
+        return Arrays.stream(enumClass.getEnumConstants())
+                .map(Enum::name)
+                .collect(Collectors.toList());
+    }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
index c99f836786f..619f7a5e932 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.sql.parser.ddl.SqlDistribution;
 import org.apache.flink.sql.parser.ddl.SqlRefreshMode;
 import org.apache.flink.sql.parser.ddl.SqlWatermark;
 import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode;
 
 import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlIntervalLiteral;
@@ -116,6 +117,16 @@ public class SqlUnparseUtils {
         freshness.unparse(writer, leftPrec, rightPrec);
     }
 
+    public static void unparseStartMode(SqlStartMode sqlStartMode, SqlWriter 
writer) {
+        if (sqlStartMode == null) {
+            return;
+        }
+        writer.newlineAndIndent();
+        writer.keyword("START_MODE");
+        writer.keyword("=");
+        writer.keyword(sqlStartMode.toString());
+    }
+
     public static void unparseRefreshMode(SqlRefreshMode refreshMode, 
SqlWriter writer) {
         if (refreshMode == null) {
             return;
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
index 2c36b861d19..33bb12c06c2 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
@@ -67,6 +67,8 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
 
     private final @Nullable SqlRefreshMode refreshMode;
 
+    private final @Nullable SqlStartMode startMode;
+
     private final SqlNode asQuery;
 
     public SqlCreateMaterializedTable(
@@ -82,6 +84,7 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
             SqlNodeList propertyList,
             @Nullable SqlIntervalLiteral freshness,
             @Nullable SqlRefreshMode refreshMode,
+            @Nullable SqlStartMode startMode,
             SqlNode asQuery) {
         super(operator, pos, tableName, false, false, false, propertyList, 
comment);
         this.columnList = columnList;
@@ -93,6 +96,7 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
         requireNonNull(propertyList, "propertyList should not be null");
         this.freshness = freshness;
         this.refreshMode = refreshMode;
+        this.startMode = startMode;
         this.asQuery = requireNonNull(asQuery, "asQuery should not be null");
     }
 
@@ -136,6 +140,11 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
         return refreshMode;
     }
 
+    @Nullable
+    public SqlStartMode getStartMode() {
+        return startMode;
+    }
+
     public SqlNode getAsQuery() {
         return asQuery;
     }
@@ -173,6 +182,7 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
         SqlUnparseUtils.unparseDistribution(distribution, writer, leftPrec, 
rightPrec);
         SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer, 
leftPrec, rightPrec);
         SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, 
rightPrec);
+        SqlUnparseUtils.unparseStartMode(startMode, writer);
         SqlUnparseUtils.unparseFreshness(freshness, true, writer, leftPrec, 
rightPrec);
         SqlUnparseUtils.unparseRefreshMode(refreshMode, writer);
         SqlUnparseUtils.unparseAsQuery(asQuery, writer, leftPrec, rightPrec);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
index e41961b2687..b1859ba6114 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateOrAlterMaterializedTable.java
@@ -55,6 +55,7 @@ public class SqlCreateOrAlterMaterializedTable extends 
SqlCreateMaterializedTabl
             SqlNodeList propertyList,
             @Nullable SqlIntervalLiteral freshness,
             @Nullable SqlRefreshMode refreshMode,
+            @Nullable SqlStartMode startMode,
             SqlNode asQuery,
             boolean isOrAlter) {
         super(
@@ -70,6 +71,7 @@ public class SqlCreateOrAlterMaterializedTable extends 
SqlCreateMaterializedTabl
                 propertyList,
                 freshness,
                 refreshMode,
+                startMode,
                 asQuery);
     }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlStartMode.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlStartMode.java
new file mode 100644
index 00000000000..a8507b940b9
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlStartMode.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.sql.parser.SqlParseUtils;
+
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlTimestampLiteral;
+import org.apache.calcite.sql.SqlUnknownLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import javax.annotation.Nullable;
+
+/** Start mode configuration for materialized tables. */
+@Internal
+public class SqlStartMode {
+    private final SqlStartModeKind kind;
+    private final @Nullable SqlTimestampLiteral timestampLiteral;
+    private final @Nullable SqlIntervalLiteral intervalLiteral;
+
+    public SqlStartModeKind getKind() {
+        return kind;
+    }
+
+    @Nullable
+    public SqlTimestampLiteral getTimestampLiteral() {
+        return timestampLiteral;
+    }
+
+    @Nullable
+    public SqlIntervalLiteral getIntervalLiteral() {
+        return intervalLiteral;
+    }
+
+    public enum SqlStartModeKind {
+        FROM_BEGINNING,
+        FROM_NOW,
+        FROM_TIMESTAMP,
+        RESUME_OR_FROM_BEGINNING,
+        RESUME_OR_FROM_NOW,
+        RESUME_OR_FROM_TIMESTAMP;
+
+        /**
+         * Creates a parse-tree node representing an occurrence of this 
keyword at a particular
+         * position in the parsed text.
+         */
+        public SqlLiteral symbol(SqlParserPos pos) {
+            return SqlLiteral.createSymbol(this, pos);
+        }
+
+        public boolean isFromNow() {
+            return this == FROM_NOW || this == RESUME_OR_FROM_NOW;
+        }
+
+        public boolean isFromTimestamp() {
+            return this == FROM_TIMESTAMP || this == RESUME_OR_FROM_TIMESTAMP;
+        }
+
+        public boolean isCreateOrAlterOption() {
+            return this == RESUME_OR_FROM_BEGINNING
+                    || this == RESUME_OR_FROM_NOW
+                    || this == RESUME_OR_FROM_TIMESTAMP;
+        }
+    }
+
+    public SqlStartMode(SqlLiteral kind, @Nullable SqlLiteral literal, 
SqlParserPos pos) {
+        this.kind = SqlParseUtils.extractEnum(kind, SqlStartModeKind.class);
+        if (literal == null) {
+            this.intervalLiteral = null;
+            this.timestampLiteral = null;
+        } else if (literal instanceof SqlIntervalLiteral) {
+            this.intervalLiteral = (SqlIntervalLiteral) literal;
+            this.timestampLiteral = null;
+        } else {
+            this.intervalLiteral = null;
+
+            final SqlUnknownLiteral unknownLiteral = (SqlUnknownLiteral) 
literal;
+            this.timestampLiteral =
+                    (SqlTimestampLiteral)
+                            
unknownLiteral.resolve(SqlTypeName.lookup(unknownLiteral.tag));
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(kind.name());
+
+        if (kind.isFromNow() && intervalLiteral != null) {
+            sb.append("(").append(intervalLiteral).append(")");
+        } else if (kind.isFromTimestamp() && timestampLiteral != null) {
+            sb.append("(").append(timestampLiteral.toString().replace("_", " 
")).append(")");
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 323fe5da0aa..5e1672865a4 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -71,6 +71,10 @@ public interface ParserResource {
             "MATERIALIZED TABLE only supports define interval type FRESHNESS, 
please refer to the materialized table document.")
     Resources.ExInst<ParseException> unsupportedFreshnessType();
 
+    @Resources.BaseMessage(
+            "START_MODE literal must be an interval for FROM_NOW and 
RESUME_OR_FROM_NOW modes.")
+    Resources.ExInst<ParseException> unsupportedStartModeType();
+
     @Resources.BaseMessage("CREATE TEMPORARY MATERIALIZED TABLE is not 
supported.")
     Resources.ExInst<ParseException> 
createTemporaryMaterializedTableUnsupported();
 
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
index 1ea1855e223..559d4c320d5 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -665,6 +665,82 @@ class MaterializedTableStatementParserTest {
         sql(sql).fails("DROP TEMPORARY MATERIALIZED TABLE is not supported.");
     }
 
+    @Test
+    void testStartMode() {
+        final String sql1 =
+                "create materialized table tbl1 start_mode = from_beginning as 
select * from t";
+        sql(sql1)
+                .ok(
+                        "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE = 
FROM_BEGINNING\nAS\nSELECT *\nFROM `T`");
+
+        final String sql2 =
+                "create materialized table tbl1 start_mode = from_now as 
select * from t";
+        sql(sql2)
+                .ok(
+                        "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE = 
FROM_NOW\nAS\nSELECT *\nFROM `T`");
+
+        final String sql3 =
+                "create materialized table tbl1 start_mode = from_now(INTERVAL 
'1' HOUR) as select * from t";
+        sql(sql3)
+                .ok(
+                        "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE = 
FROM_NOW(INTERVAL '1' HOUR)\nAS\nSELECT *\nFROM `T`");
+
+        final String sql4 =
+                "create materialized table tbl1 start_mode = 
from_timestamp(TIMESTAMP '2023-04-22 21:37:58') as select * from t";
+        sql(sql4)
+                .ok(
+                        "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE = 
FROM_TIMESTAMP(TIMESTAMP '2023-04-22 21:37:58')\nAS\nSELECT *\nFROM `T`");
+
+        final String sql5 =
+                // allowed on parser level, in operation there is a validation 
for CREATE OR ALTER
+                // support only
+                "create materialized table tbl1 start_mode = 
resume_or_from_beginning as select * from t";
+        sql(sql5)
+                .ok(
+                        "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE = 
RESUME_OR_FROM_BEGINNING\nAS\nSELECT *\nFROM `T`");
+
+        final String sql6 =
+                // allowed on parser level, in operation there is a validation 
for CREATE OR ALTER
+                // support only
+                "create materialized table tbl1 start_mode = 
resume_or_from_now as select * from t";
+        sql(sql6)
+                .ok(
+                        "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE = 
RESUME_OR_FROM_NOW\nAS\nSELECT *\nFROM `T`");
+
+        final String sql7 =
+                // allowed on parser level, in operation there is a validation 
for CREATE OR ALTER
+                // support only
+                "create materialized table tbl1 start_mode = 
resume_or_from_now(interval '2' minutes) as select * from t";
+        sql(sql7)
+                .ok(
+                        "CREATE MATERIALIZED TABLE `TBL1`\nSTART_MODE = 
RESUME_OR_FROM_NOW(INTERVAL '2' MINUTE)\nAS\nSELECT *\nFROM `T`");
+
+        final String sql8 =
+                // allowed on parser level, in operation there is a validation 
for CREATE OR ALTER
+                // support only
+                "create materialized table tbl1 with ('format' = 'json') 
start_mode = resume_or_from_timestamp(TIMESTAMP WITH LOCAL TIME ZONE 
'2023-04-22 21:37:58') as select * from t";
+        sql(sql8)
+                .ok(
+                        "CREATE MATERIALIZED TABLE `TBL1`\nWITH (\n"
+                                + "  'format' = 'json'\n"
+                                + ")\nSTART_MODE = 
RESUME_OR_FROM_TIMESTAMP(TIMESTAMP WITH LOCAL TIME ZONE '2023-04-22 
21:37:58')\nAS\nSELECT *\nFROM `T`");
+
+        final String sql9 =
+                "create materialized table tbl1 start_mode = 
from_now(timestamp ^'2023-04-22 21:37:58'^) as select * from t";
+        sql(sql9)
+                .fails(
+                        "START_MODE literal must be an interval for FROM_NOW 
and RESUME_OR_FROM_NOW modes.");
+
+        final String sql10 =
+                "create materialized table tbl1 start_mode = 
resume_or_from_timestamp(^interval^ '2' minutes) as select * from t";
+        sql(sql10)
+                .fails(
+                        "Encountered \"interval\" at line 1, column 70.\n"
+                                + "Was expecting:\n"
+                                + "    \"TIMESTAMP\" ...\n"
+                                + "    ");
+    }
+
     public SqlParserFixture fixture() {
         return SqlParserFixture.DEFAULT.withConfig(
                 c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY));

Reply via email to