This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 512f3fbaac8 [FLINK-35190][table-api] Support create materialized table 
syntax
512f3fbaac8 is described below

commit 512f3fbaac8b538f7edda152af0846dc2b213557
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Wed Apr 24 16:17:40 2024 +0800

    [FLINK-35190][table-api] Support create materialized table syntax
    
    This closes #24707
---
 .../src/main/codegen/data/Parser.tdd               |  21 +-
 .../src/main/codegen/includes/parserImpls.ftl      |  93 +++++++++
 .../sql/parser/ddl/SqlCreateMaterializedTable.java | 203 +++++++++++++++++++
 .../flink/sql/parser/ddl/SqlRefreshMode.java       |  47 +++++
 .../flink/sql/parser/utils/ParserResource.java     |  10 +
 .../MaterializedTableStatementParserTest.java      | 214 +++++++++++++++++++++
 .../MaterializedTableStatementUnParserTest.java    |  46 +++++
 7 files changed, 627 insertions(+), 7 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index d88a9c0f454..dfb43353a4b 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -56,6 +56,7 @@
     "org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
     "org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
     "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
+    "org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable"
     "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
     "org.apache.flink.sql.parser.ddl.SqlCreateTableAs"
@@ -69,6 +70,7 @@
     
"org.apache.flink.sql.parser.ddl.SqlDropPartitions.AlterTableDropPartitionsContext"
     "org.apache.flink.sql.parser.ddl.SqlDropTable"
     "org.apache.flink.sql.parser.ddl.SqlDropView"
+    "org.apache.flink.sql.parser.ddl.SqlRefreshMode"
     "org.apache.flink.sql.parser.ddl.SqlRemoveJar"
     "org.apache.flink.sql.parser.ddl.SqlReplaceTableAs"
     "org.apache.flink.sql.parser.ddl.SqlReset"
@@ -130,6 +132,7 @@
     "org.apache.calcite.sql.SqlAlienSystemTypeNameSpec"
     "org.apache.calcite.sql.SqlCreate"
     "org.apache.calcite.sql.SqlDrop"
+    "org.apache.calcite.sql.SqlIntervalLiteral"
     "java.util.ArrayList"
     "java.util.Collections"
     "java.util.HashSet"
@@ -141,14 +144,17 @@
   # keyword, please also add it to 'nonReservedKeywords' section.
   # Please keep the keyword in alphabetical order if new keyword is added.
   keywords: [
+    "ANALYZE"
     "BUCKETS"
     "BYTES"
     "CATALOGS"
     "CHANGELOG_MODE"
+    "COLUMNS"
     "COMMENT"
     "COMPACT"
     "COMPILE"
-    "COLUMNS"
+    "COMPUTE"
+    "CONTINUOUS"
     "DATABASES"
     "DISTRIBUTED"
     "DISTRIBUTION"
@@ -157,42 +163,43 @@
     "ESTIMATED_COST"
     "EXTENDED"
     "FUNCTIONS"
+    "FRESHNESS"
     "HASH"
     "IF"
     "JSON_EXECUTION_PLAN"
-    "PLAN_ADVICE"
     "JAR"
     "JARS"
     "JOB"
     "JOBS"
     "LOAD"
     "METADATA"
+    "MATERIALIZED"
     "MODIFY"
     "MODULES"
     "OVERWRITE"
     "OVERWRITING"
     "PARTITIONED"
     "PARTITIONS"
+    "PLAN_ADVICE"
     "PROCEDURES"
     "PYTHON"
     "RAW"
+    "REFRESH_MODE"
     "REMOVE"
     "RENAME"
     "SCALA"
+    "STATISTICS"
     "STOP"
     "STRING"
     "TABLES"
+    "TIMESTAMP_LTZ"
+    "TRY_CAST"
     "UNLOAD"
     "USE"
     "VIEWS"
     "VIRTUAL"
     "WATERMARK"
     "WATERMARKS"
-    "TIMESTAMP_LTZ"
-    "TRY_CAST"
-    "ANALYZE"
-    "COMPUTE"
-    "STATISTICS"
   ]
 
   # List of keywords from "keywords" section that are not reserved.
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 9ac769e18a2..368230d9819 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1682,6 +1682,97 @@ SqlNode SqlReplaceTable() :
     }
 }
 
+/**
+  * Parses a CREATE MATERIALIZED TABLE statement.
+*/
+SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean 
isTemporary) :
+{
+    final SqlParserPos startPos = s.pos();
+    SqlIdentifier tableName;
+    SqlCharStringLiteral comment = null;
+    SqlTableConstraint constraint = null;
+    SqlNodeList partitionColumns = SqlNodeList.EMPTY;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+    SqlNode freshness = null;
+    SqlLiteral refreshMode = null;
+    SqlNode asQuery = null;
+}
+{
+    <MATERIALIZED>
+    {
+        if (isTemporary) {
+           throw SqlUtil.newContextException(
+                getPos(),
+                
ParserResource.RESOURCE.createTemporaryMaterializedTableUnsupported());
+        }
+        if (replace) {
+           throw SqlUtil.newContextException(
+                getPos(),
+                ParserResource.RESOURCE.replaceMaterializedTableUnsupported());
+        }
+    }
+    <TABLE>
+    tableName = CompoundIdentifier()
+    [
+        <LPAREN>
+        constraint = TableConstraint()
+        <RPAREN>
+    ]
+    [
+        <COMMENT> <QUOTED_STRING>
+        {
+            String p = SqlParserUtil.parseString(token.image);
+            comment = SqlLiteral.createCharString(p, getPos());
+        }
+    ]
+    [
+        <PARTITIONED> <BY>
+        partitionColumns = ParenthesizedSimpleIdentifierList()
+    ]
+    [
+        <WITH>
+        propertyList = TableProperties()
+    ]
+    <FRESHNESS> <EQ>
+    freshness = Expression(ExprContext.ACCEPT_NON_QUERY)
+    {
+        if (!(freshness instanceof SqlIntervalLiteral))
+        {
+            throw SqlUtil.newContextException(
+            getPos(),
+            ParserResource.RESOURCE.unsupportedFreshnessType());
+        }
+    }
+    [
+        <REFRESH_MODE> <EQ>
+        (
+            <FULL>
+            {
+                refreshMode = SqlRefreshMode.FULL.symbol(getPos());
+            }
+            |
+            <CONTINUOUS>
+            {
+                refreshMode = SqlRefreshMode.CONTINUOUS.symbol(getPos());
+            }
+        )
+    ]
+    <AS>
+    asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
+    {
+        return new SqlCreateMaterializedTable(
+            startPos.plus(getPos()),
+            tableName,
+            comment,
+            constraint,
+            partitionColumns,
+            propertyList,
+            (SqlIntervalLiteral) freshness,
+            refreshMode,
+            asQuery);
+    }
+}
+
 /**
 * Parses an INSERT statement.
 */
@@ -2187,6 +2278,8 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) :
     (
         create = SqlCreateCatalog(s, replace)
         |
+        create = SqlCreateMaterializedTable(s, replace, isTemporary)
+        |
         create = SqlCreateTable(s, replace, isTemporary)
         |
         create = SqlCreateView(s, replace, isTemporary)
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
new file mode 100644
index 00000000000..1630a0f0117
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+/** CREATE MATERIALIZED TABLE DDL sql call. */
+public class SqlCreateMaterializedTable extends SqlCreate {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("CREATE MATERIALIZED TABLE", 
SqlKind.CREATE_TABLE);
+
+    private final SqlIdentifier tableName;
+
+    private final SqlCharStringLiteral comment;
+
+    private final SqlTableConstraint tableConstraint;
+
+    private final SqlNodeList partitionKeyList;
+
+    private final SqlNodeList propertyList;
+
+    private final SqlIntervalLiteral freshness;
+
+    @Nullable private final SqlLiteral refreshMode;
+
+    private final SqlNode asQuery;
+
+    public SqlCreateMaterializedTable(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            @Nullable SqlCharStringLiteral comment,
+            @Nullable SqlTableConstraint tableConstraint,
+            SqlNodeList partitionKeyList,
+            SqlNodeList propertyList,
+            SqlIntervalLiteral freshness,
+            @Nullable SqlLiteral refreshMode,
+            SqlNode asQuery) {
+        super(OPERATOR, pos, false, false);
+        this.tableName = requireNonNull(tableName, "tableName should not be 
null");
+        this.comment = comment;
+        this.tableConstraint = tableConstraint;
+        this.partitionKeyList =
+                requireNonNull(partitionKeyList, "partitionKeyList should not 
be null");
+        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
+        this.freshness = requireNonNull(freshness, "freshness should not be 
null");
+        this.refreshMode = refreshMode;
+        this.asQuery = requireNonNull(asQuery, "asQuery should not be null");
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(
+                tableName,
+                comment,
+                tableConstraint,
+                partitionKeyList,
+                propertyList,
+                freshness,
+                asQuery);
+    }
+
+    public SqlIdentifier getTableName() {
+        return tableName;
+    }
+
+    public String[] fullTableName() {
+        return tableName.names.toArray(new String[0]);
+    }
+
+    public Optional<SqlCharStringLiteral> getComment() {
+        return Optional.ofNullable(comment);
+    }
+
+    public Optional<SqlTableConstraint> getTableConstraint() {
+        return Optional.ofNullable(tableConstraint);
+    }
+
+    public SqlNodeList getPartitionKeyList() {
+        return partitionKeyList;
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    public SqlIntervalLiteral getFreshness() {
+        return freshness;
+    }
+
+    @Nullable
+    public Optional<SqlLiteral> getRefreshMode() {
+        return Optional.ofNullable(refreshMode);
+    }
+
+    public SqlNode getAsQuery() {
+        return asQuery;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("CREATE MATERIALIZED TABLE");
+        tableName.unparse(writer, leftPrec, rightPrec);
+
+        if (tableConstraint != null) {
+            writer.newlineAndIndent();
+            SqlUnparseUtils.unparseTableSchema(
+                    writer,
+                    leftPrec,
+                    rightPrec,
+                    SqlNodeList.EMPTY,
+                    Collections.singletonList(tableConstraint),
+                    null);
+        }
+
+        if (comment != null) {
+            writer.newlineAndIndent();
+            writer.keyword("COMMENT");
+            comment.unparse(writer, leftPrec, rightPrec);
+        }
+
+        if (partitionKeyList.size() > 0) {
+            writer.newlineAndIndent();
+            writer.keyword("PARTITIONED BY");
+            SqlWriter.Frame partitionedByFrame = writer.startList("(", ")");
+            partitionKeyList.unparse(writer, leftPrec, rightPrec);
+            writer.endList(partitionedByFrame);
+            writer.newlineAndIndent();
+        }
+
+        if (propertyList.size() > 0) {
+            writer.keyword("WITH");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            for (SqlNode property : propertyList) {
+                SqlUnparseUtils.printIndent(writer);
+                property.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
+
+        writer.newlineAndIndent();
+        writer.keyword("FRESHNESS");
+        writer.keyword("=");
+        freshness.unparse(writer, leftPrec, rightPrec);
+
+        if (refreshMode != null) {
+            writer.newlineAndIndent();
+            writer.keyword("REFRESH_MODE");
+            writer.keyword("=");
+            refreshMode.unparse(writer, leftPrec, rightPrec);
+        }
+
+        writer.newlineAndIndent();
+        writer.keyword("AS");
+        writer.newlineAndIndent();
+        asQuery.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRefreshMode.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRefreshMode.java
new file mode 100644
index 00000000000..7646b8b5e09
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRefreshMode.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/** Enumeration of materialized table refresh mode. */
+public enum SqlRefreshMode {
+    FULL("FULL"),
+    CONTINUOUS("CONTINUOUS");
+
+    private final String digest;
+
+    SqlRefreshMode(String digest) {
+        this.digest = digest;
+    }
+
+    @Override
+    public String toString() {
+        return this.digest;
+    }
+
+    /**
+     * Creates a parse-tree node representing an occurrence of this keyword at 
a particular position
+     * in the parsed text.
+     */
+    public SqlLiteral symbol(SqlParserPos pos) {
+        return SqlLiteral.createSymbol(this, pos);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 0758b0d5887..4f6a5fcf739 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -49,4 +49,14 @@ public interface ParserResource {
 
     @Resources.BaseMessage("Bucket count must be a positive integer.")
     Resources.ExInst<ParseException> bucketCountMustBePositiveInteger();
+
+    @Resources.BaseMessage(
+            "CREATE MATERIALIZED TABLE only supports interval type FRESHNESS, 
please refer to the materialized table document.")
+    Resources.ExInst<ParseException> unsupportedFreshnessType();
+
+    @Resources.BaseMessage("CREATE TEMPORARY MATERIALIZED TABLE is not 
supported.")
+    Resources.ExInst<ParseException> 
createTemporaryMaterializedTableUnsupported();
+
+    @Resources.BaseMessage("REPLACE MATERIALIZED TABLE is not supported.")
+    Resources.ExInst<ParseException> replaceMaterializedTableUnsupported();
 }
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
new file mode 100644
index 00000000000..8818e97c835
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+
+import org.apache.calcite.sql.parser.SqlParserFixture;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/** Sql parser test for materialized related syntax. * */
+@Execution(CONCURRENT)
+public class MaterializedTableStatementParserTest {
+
+    @Test
+    void testCreateMaterializedTable() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(\n"
+                        + "   PRIMARY KEY (a, b)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS SELECT a, b, h, t m FROM source";
+        final String expected =
+                "CREATE MATERIALIZED TABLE `TBL1`\n"
+                        + "(\n"
+                        + "  PRIMARY KEY (`A`, `B`)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (`A`, `H`)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest',\n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS\n"
+                        + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+                        + "FROM `SOURCE`";
+        sql(sql).ok(expected);
+
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(\n"
+                        + "   PRIMARY KEY (a, b)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT a, b, h, t m FROM source";
+        final String expected2 =
+                "CREATE MATERIALIZED TABLE `TBL1`\n"
+                        + "(\n"
+                        + "  PRIMARY KEY (`A`, `B`)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS\n"
+                        + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+                        + "FROM `SOURCE`";
+
+        sql(sql2).ok(expected2);
+
+        final String sql3 =
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "COMMENT 'table comment'\n"
+                        + "FRESHNESS = INTERVAL '3' DAY\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT a, b, h, t m FROM source";
+        final String expected3 =
+                "CREATE MATERIALIZED TABLE `TBL1`\n"
+                        + "COMMENT 'table comment'\n"
+                        + "FRESHNESS = INTERVAL '3' DAY\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS\n"
+                        + "SELECT `A`, `B`, `H`, `T` AS `M`\n"
+                        + "FROM `SOURCE`";
+        sql(sql3).ok(expected3);
+    }
+
+    @Test
+    void testCreateMaterializedTableWithUnsupportedFreshnessInterval() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(\n"
+                        + "   PRIMARY KEY (a, b)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = ^123^\n"
+                        + "AS SELECT a, b, h, t m FROM source";
+        sql(sql).fails(
+                        "CREATE MATERIALIZED TABLE only supports interval type 
FRESHNESS, please refer to the materialized table document.");
+
+        final String sql2 =
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(\n"
+                        + "   PRIMARY KEY (a, b)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "^AS^ SELECT a, b, h, t m FROM source";
+
+        sql(sql2)
+                .fails(
+                        "Encountered \"AS\" at line 11, column 1.\n"
+                                + "Was expecting:\n"
+                                + "    \"FRESHNESS\" ...\n"
+                                + "    ");
+    }
+
+    @Test
+    void testCreateMaterializedTableWithoutAsQuery() {
+        final String sql =
+                "CREATE MATERIALIZED TABLE tbl1\n"
+                        + "(\n"
+                        + "   PRIMARY KEY (a, b)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "REFRESH_MODE = FULL^\n^";
+        sql(sql).fails(
+                        "Encountered \"<EOF>\" at line 12, column 20.\n"
+                                + "Was expecting:\n"
+                                + "    \"AS\" ...\n"
+                                + "    ");
+    }
+
+    @Test
+    void testCreateTemporaryMaterializedTable() {
+        final String sql =
+                "CREATE TEMPORARY ^MATERIALIZED^ TABLE tbl1\n"
+                        + "(\n"
+                        + "   PRIMARY KEY (a, b)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS SELECT a, b, h, t m FROM source";
+        sql(sql).fails("CREATE TEMPORARY MATERIALIZED TABLE is not 
supported.");
+    }
+
+    @Test
+    void testReplaceMaterializedTable() {
+        final String sql =
+                "CREATE OR REPLACE ^MATERIALIZED^ TABLE tbl1\n"
+                        + "(\n"
+                        + "   PRIMARY KEY (a, b)\n"
+                        + ")\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'latest', \n"
+                        + "  'kafka.topic' = 'log.test'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '3' MINUTE\n"
+                        + "AS SELECT a, b, h, t m FROM source";
+        sql(sql).fails("REPLACE MATERIALIZED TABLE is not supported.");
+    }
+
+    public SqlParserFixture fixture() {
+        return SqlParserFixture.DEFAULT.withConfig(
+                c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY));
+    }
+
+    protected SqlParserFixture sql(String sql) {
+        return this.fixture().sql(sql);
+    }
+
+    protected SqlParserFixture expr(String sql) {
+        return this.sql(sql).expression(true);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementUnParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementUnParserTest.java
new file mode 100644
index 00000000000..5121b969167
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementUnParserTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl;
+
+import org.apache.calcite.sql.parser.SqlParserFixture;
+import org.apache.calcite.sql.parser.SqlParserTest;
+import org.junit.jupiter.api.parallel.Execution;
+
+import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
+
+/**
+ * Extension to {@link MaterializedTableStatementParserTest} that ensures that 
every expression can
+ * un-parse successfully.
+ */
+@Execution(CONCURRENT)
+public class MaterializedTableStatementUnParserTest extends 
MaterializedTableStatementParserTest {
+    // ~ Constructors 
-----------------------------------------------------------
+
+    public MaterializedTableStatementUnParserTest() {}
+
+    // ~ Methods 
----------------------------------------------------------------
+
+    public SqlParserFixture fixture() {
+        return super.fixture()
+                .withTester(new SqlParserTest.UnparsingTesterImpl())
+                .withConfig(c -> 
c.withParserFactory(FlinkSqlParserImpl.FACTORY));
+    }
+}

Reply via email to