This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 330f524d185 [FLINK-35191][table-api] Support alter materialized table 
related syntax: suspend, resume, refresh, set and reset
330f524d185 is described below

commit 330f524d185d575ceb679a6c587e9c39612e844c
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Mon Apr 29 10:21:12 2024 +0800

    [FLINK-35191][table-api] Support alter materialized table related syntax: 
suspend, resume, refresh, set and reset
    
    This closes #24737
---
 .../src/main/codegen/data/Parser.tdd               |  12 ++
 .../src/main/codegen/includes/parserImpls.ftl      |  94 +++++++++++
 .../sql/parser/ddl/SqlAlterMaterializedTable.java  |  61 +++++++
 .../ddl/SqlAlterMaterializedTableFreshness.java    |  60 +++++++
 .../ddl/SqlAlterMaterializedTableOptions.java      |  67 ++++++++
 .../ddl/SqlAlterMaterializedTableRefresh.java      |  61 +++++++
 .../ddl/SqlAlterMaterializedTableRefreshMode.java  |  62 +++++++
 .../parser/ddl/SqlAlterMaterializedTableReset.java |  67 ++++++++
 .../ddl/SqlAlterMaterializedTableResume.java       |  74 ++++++++
 .../ddl/SqlAlterMaterializedTableSuspend.java      |  47 ++++++
 .../flink/sql/parser/utils/ParserResource.java     |   2 +-
 .../MaterializedTableStatementParserTest.java      | 187 ++++++++++++++++++++-
 12 files changed, 792 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index dfb43353a4b..100e9edd2fb 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -35,6 +35,14 @@
     
"org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext"
     "org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
     "org.apache.flink.sql.parser.ddl.SqlAlterFunction"
+    "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable"
+    "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableFreshness"
+    "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableOptions"
+    "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefreshMode"
+    "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableReset"
+    "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh"
+    "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume"
+    "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend"
     "org.apache.flink.sql.parser.ddl.SqlAlterTable"
     "org.apache.flink.sql.parser.ddl.SqlAlterTable.AlterTableContext"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableAdd"
@@ -191,6 +199,9 @@
     "STATISTICS"
     "STOP"
     "STRING"
+    "SUSPEND"
+    "REFRESH"
+    "RESUME"
     "TABLES"
     "TIMESTAMP_LTZ"
     "TRY_CAST"
@@ -581,6 +592,7 @@
     "SqlShowCreate()"
     "SqlReplaceTable()"
     "SqlRichDescribeTable()"
+    "SqlAlterMaterializedTable()"
     "SqlAlterTable()"
     "SqlAlterView()"
     "SqlShowModules()"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index b52f41aa951..95509e7b8da 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1779,6 +1779,100 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean 
replace, boolean isTemporar
     }
 }
 
+/**
+* Parses alter materialized table.
+*/
+SqlAlterMaterializedTable SqlAlterMaterializedTable() :
+{
+    SqlParserPos startPos;
+    SqlIdentifier tableIdentifier;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+    SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
+    SqlNodeList partSpec = SqlNodeList.EMPTY;
+    SqlNode freshness = null;
+}
+{
+    <ALTER> <MATERIALIZED> <TABLE> { startPos = getPos();}
+    tableIdentifier = CompoundIdentifier()
+    (
+        <SUSPEND>
+            {
+                return new SqlAlterMaterializedTableSuspend(startPos, 
tableIdentifier);
+            }
+        |
+        <RESUME>
+        [ <WITH> propertyList = TableProperties() ]
+            {
+                return new SqlAlterMaterializedTableResume(
+                    startPos,
+                    tableIdentifier,
+                    propertyList);
+            }
+        |
+        <REFRESH>
+        [ <PARTITION> {
+                partSpec = new SqlNodeList(getPos());
+                PartitionSpecCommaList(partSpec);
+            }
+        ]
+            {
+                return new SqlAlterMaterializedTableRefresh(
+                    startPos.plus(getPos()),
+                    tableIdentifier,
+                    partSpec);
+            }
+        |
+        <SET>
+        (
+            <FRESHNESS> <EQ> freshness = 
Expression(ExprContext.ACCEPT_NON_QUERY) {
+                if (!(freshness instanceof SqlIntervalLiteral)) {
+                    throw SqlUtil.newContextException(
+                        getPos(),
+                        ParserResource.RESOURCE.unsupportedFreshnessType());
+                }
+                return new SqlAlterMaterializedTableFreshness(
+                    startPos.plus(getPos()),
+                    tableIdentifier,
+                    (SqlIntervalLiteral) freshness);
+            }
+            |
+            <REFRESH_MODE> <EQ>
+            (
+                <FULL> {
+                    return new SqlAlterMaterializedTableRefreshMode(
+                        startPos.plus(getPos()),
+                        tableIdentifier,
+                        SqlRefreshMode.FULL.symbol(getPos()));
+                }
+                |
+                <CONTINUOUS> {
+                    return new SqlAlterMaterializedTableRefreshMode(
+                        startPos.plus(getPos()),
+                        tableIdentifier,
+                        SqlRefreshMode.CONTINUOUS.symbol(getPos()));
+                }
+            )
+            |
+            propertyList = TableProperties()
+            {
+                return new SqlAlterMaterializedTableOptions(
+                    startPos.plus(getPos()),
+                    tableIdentifier,
+                    propertyList);
+            }
+        )
+        |
+        <RESET>
+            propertyKeyList = TablePropertyKeys()
+            {
+                return new SqlAlterMaterializedTableReset(
+                    startPos.plus(getPos()),
+                    tableIdentifier,
+                    propertyKeyList);
+            }
+    )
+}
+
 /**
 * Parses an INSERT statement.
 */
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java
new file mode 100644
index 00000000000..f8037b05e4c
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Abstract class to describe statements like ALTER MATERIALIZED TABLE 
[catalogName.]
+ * [dataBasesName.]tableName ...
+ */
+public abstract class SqlAlterMaterializedTable extends SqlCall {
+
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("ALTER MATERIALIZED TABLE", 
SqlKind.ALTER_TABLE);
+
+    protected final SqlIdentifier tableIdentifier;
+
+    public SqlAlterMaterializedTable(SqlParserPos pos, SqlIdentifier 
tableName) {
+        super(pos);
+        this.tableIdentifier = requireNonNull(tableName, "tableName should not 
be null");
+    }
+
+    public SqlIdentifier getTableName() {
+        return tableIdentifier;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("ALTER MATERIALIZED TABLE");
+        tableIdentifier.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableFreshness.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableFreshness.java
new file mode 100644
index 00000000000..45f068a995f
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableFreshness.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER MATERIALIZED TABLE [catalogName.] 
[dataBasesName.]tableName SET
+ * FRESHNESS = INTERVAL '&lt;num&gt;' { SECOND | MINUTE | HOUR | DAY } clause.
+ */
+public class SqlAlterMaterializedTableFreshness extends 
SqlAlterMaterializedTable {
+
+    private final SqlIntervalLiteral freshness;
+
+    public SqlAlterMaterializedTableFreshness(
+            SqlParserPos pos, SqlIdentifier tableName, SqlIntervalLiteral 
freshness) {
+        super(pos, tableName);
+        this.freshness = freshness;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(getTableName(), freshness);
+    }
+
+    public SqlIntervalLiteral getFreshness() {
+        return freshness;
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("SET FRESHNESS");
+        writer.keyword("=");
+        freshness.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableOptions.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableOptions.java
new file mode 100644
index 00000000000..74c10e4de7e
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableOptions.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name SET ('key' =
+ * 'val') clause.
+ */
+public class SqlAlterMaterializedTableOptions extends 
SqlAlterMaterializedTable {
+
+    private final SqlNodeList propertyList;
+
+    public SqlAlterMaterializedTableOptions(
+            SqlParserPos pos, SqlIdentifier tableName, SqlNodeList 
propertyList) {
+        super(pos, tableName);
+        this.propertyList = propertyList;
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(getTableName(), propertyList);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("SET");
+        SqlWriter.Frame withFrame = writer.startList("(", ")");
+        for (SqlNode property : propertyList) {
+            SqlUnparseUtils.printIndent(writer);
+            property.unparse(writer, leftPrec, rightPrec);
+        }
+        writer.newlineAndIndent();
+        writer.endList(withFrame);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java
new file mode 100644
index 00000000000..26ec1bcd7c7
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name REFRESH
+ * [PARTITION (key1=val1, key2=val2, ...)] clause.
+ */
+public class SqlAlterMaterializedTableRefresh extends 
SqlAlterMaterializedTable {
+
+    private final SqlNodeList partitionSpec;
+
+    public SqlAlterMaterializedTableRefresh(
+            SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList 
partitionSpec) {
+        super(pos, tableName);
+        this.partitionSpec = partitionSpec;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(getTableName(), partitionSpec);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("REFRESH");
+        if (partitionSpec != null && partitionSpec.size() > 0) {
+            writer.keyword("PARTITION");
+            partitionSpec.unparse(
+                    writer, getOperator().getLeftPrec(), 
getOperator().getRightPrec());
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefreshMode.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefreshMode.java
new file mode 100644
index 00000000000..a0bf98951f1
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefreshMode.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * SqlNode to describe ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name SET REFRESH_MODE
+ * = { FULL | CONTINUOUS } clause.
+ */
+public class SqlAlterMaterializedTableRefreshMode extends 
SqlAlterMaterializedTable {
+
+    private final SqlLiteral sqlRefreshMode;
+
+    public SqlAlterMaterializedTableRefreshMode(
+            SqlParserPos pos, SqlIdentifier tableName, SqlLiteral 
sqlRefreshMode) {
+        super(pos, tableName);
+        this.sqlRefreshMode = requireNonNull(sqlRefreshMode, "refreshMode 
should not be null");
+    }
+
+    public SqlLiteral getSqlRefreshMode() {
+        return sqlRefreshMode;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(getTableName());
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("SET REFRESH_MODE");
+        writer.keyword("=");
+        sqlRefreshMode.unparse(writer, leftPrec, rightPrec);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableReset.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableReset.java
new file mode 100644
index 00000000000..20f99eab91d
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableReset.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name RESET ('key')
+ * clause.
+ */
+public class SqlAlterMaterializedTableReset extends SqlAlterMaterializedTable {
+
+    private final SqlNodeList propertyKeyList;
+
+    public SqlAlterMaterializedTableReset(
+            SqlParserPos pos, SqlIdentifier tableName, SqlNodeList 
propertyKeyList) {
+        super(pos, tableName);
+        this.propertyKeyList = propertyKeyList;
+    }
+
+    public SqlNodeList getPropertyKeyList() {
+        return propertyKeyList;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(getTableName(), propertyKeyList);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("RESET");
+        SqlWriter.Frame withFrame = writer.startList("(", ")");
+        for (SqlNode propertyKey : propertyKeyList) {
+            SqlUnparseUtils.printIndent(writer);
+            propertyKey.unparse(writer, leftPrec, rightPrec);
+        }
+        writer.newlineAndIndent();
+        writer.endList(withFrame);
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableResume.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableResume.java
new file mode 100644
index 00000000000..657f4cd6610
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableResume.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * SqlNode to describe ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name RESUME clause.
+ */
+public class SqlAlterMaterializedTableResume extends SqlAlterMaterializedTable 
{
+
+    private final SqlNodeList propertyList;
+
+    public SqlAlterMaterializedTableResume(
+            SqlParserPos pos, SqlIdentifier tableName, SqlNodeList 
propertyList) {
+        super(pos, tableName);
+
+        this.propertyList = requireNonNull(propertyList, "propertyList should 
not be null");
+    }
+
+    public SqlNodeList getPropertyList() {
+        return propertyList;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(getTableName(), propertyList);
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("RESUME");
+
+        if (propertyList.size() > 0) {
+            writer.newlineAndIndent();
+            writer.keyword("WITH");
+            SqlWriter.Frame withFrame = writer.startList("(", ")");
+            for (SqlNode property : propertyList) {
+                SqlUnparseUtils.printIndent(writer);
+                property.unparse(writer, leftPrec, rightPrec);
+            }
+            writer.newlineAndIndent();
+            writer.endList(withFrame);
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSuspend.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSuspend.java
new file mode 100644
index 00000000000..56907fdb545
--- /dev/null
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSuspend.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name SUSPEND clause.
+ */
+public class SqlAlterMaterializedTableSuspend extends 
SqlAlterMaterializedTable {
+    public SqlAlterMaterializedTableSuspend(SqlParserPos pos, SqlIdentifier 
tableName) {
+        super(pos, tableName);
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(getTableName());
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        super.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("SUSPEND");
+    }
+}
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 4f6a5fcf739..d2ae7a1c911 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -51,7 +51,7 @@ public interface ParserResource {
     Resources.ExInst<ParseException> bucketCountMustBePositiveInteger();
 
     @Resources.BaseMessage(
-            "CREATE MATERIALIZED TABLE only supports interval type FRESHNESS, 
please refer to the materialized table document.")
+            "MATERIALIZED TABLE only supports define interval type FRESHNESS, 
please refer to the materialized table document.")
     Resources.ExInst<ParseException> unsupportedFreshnessType();
 
     @Resources.BaseMessage("CREATE TEMPORARY MATERIALIZED TABLE is not 
supported.")
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
index 8818e97c835..0d6ab7ca45f 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java
@@ -118,7 +118,7 @@ public class MaterializedTableStatementParserTest {
                         + "FRESHNESS = ^123^\n"
                         + "AS SELECT a, b, h, t m FROM source";
         sql(sql).fails(
-                        "CREATE MATERIALIZED TABLE only supports interval type 
FRESHNESS, please refer to the materialized table document.");
+                        "MATERIALIZED TABLE only supports define interval type 
FRESHNESS, please refer to the materialized table document.");
 
         final String sql2 =
                 "CREATE MATERIALIZED TABLE tbl1\n"
@@ -199,6 +199,191 @@ public class MaterializedTableStatementParserTest {
         sql(sql).fails("REPLACE MATERIALIZED TABLE is not supported.");
     }
 
+    @Test
+    void testAlterMaterializedTableSuspend() {
+        final String sql = "ALTER MATERIALIZED TABLE tb1 SUSPEND";
+        final String expect = "ALTER MATERIALIZED TABLE `TB1` SUSPEND";
+        sql(sql).ok(expect);
+
+        final String sql2 = "ALTER MATERIALIZED TABLE tb1 SUSPEND ^PARTITION^";
+        sql(sql2)
+                .fails(
+                        "Encountered \"PARTITION\" at line 1, column 38.\n"
+                                + "Was expecting:\n"
+                                + "    <EOF> \n"
+                                + "    ");
+
+        final String sql3 = "ALTER MATERIALIZED TABLE tb^1^";
+        sql(sql3)
+                .fails(
+                        "Encountered \"<EOF>\" at line 1, column 28.\n"
+                                + "Was expecting one of:\n"
+                                + "    \"RESET\" ...\n"
+                                + "    \"SET\" ...\n"
+                                + "    \"SUSPEND\" ...\n"
+                                + "    \"REFRESH\" ...\n"
+                                + "    \"RESUME\" ...\n"
+                                + "    \".\" ...\n"
+                                + "    ");
+    }
+
+    @Test
+    void testAlterMaterializedTableResume() {
+        final String sql1 =
+                "ALTER MATERIALIZED TABLE tb1 RESUME\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'testGroup',\n"
+                        + "  'topic' = 'test'\n"
+                        + ")";
+        final String expect1 =
+                "ALTER MATERIALIZED TABLE `TB1` RESUME\n"
+                        + "WITH (\n"
+                        + "  'group.id' = 'testGroup',\n"
+                        + "  'topic' = 'test'\n"
+                        + ")";
+        sql(sql1).ok(expect1);
+
+        final String sql2 = "ALTER MATERIALIZED TABLE tb1 RESUME";
+        final String expect2 = "ALTER MATERIALIZED TABLE `TB1` RESUME";
+        sql(sql2).ok(expect2);
+
+        final String sql3 = "ALTER MATERIALIZED TABLE tb1 RESUME ^PARTITION^";
+        sql(sql3)
+                .fails(
+                        "Encountered \"PARTITION\" at line 1, column 37.\n"
+                                + "Was expecting one of:\n"
+                                + "    <EOF> \n"
+                                + "    \"WITH\" ...\n"
+                                + "    ");
+    }
+
+    @Test
+    void testAlterMaterializedTableRefresh() {
+        final String sql1 = "ALTER MATERIALIZED TABLE tbl1 REFRESH";
+        final String expected1 = "ALTER MATERIALIZED TABLE `TBL1` REFRESH";
+        sql(sql1).ok(expected1);
+
+        final String sql2 =
+                "ALTER MATERIALIZED TABLE tbl1 REFRESH \n"
+                        + " PARTITION (part1 = 2023, part2 = 2024)";
+        final String expected2 =
+                "ALTER MATERIALIZED TABLE `TBL1` REFRESH "
+                        + "PARTITION (`PART1` = 2023, `PART2` = 2024)";
+        sql(sql2).ok(expected2);
+
+        final String sql3 = "ALTER MATERIALIZED TABLE tbl1 REFRESH 
PARTITION(^)^";
+        sql(sql3)
+                .fails(
+                        "Encountered \"\\)\" at line 1, column 49.\n"
+                                + "Was expecting one of:\n"
+                                + "    <BRACKET_QUOTED_IDENTIFIER> ...\n"
+                                + "    <QUOTED_IDENTIFIER> ...\n"
+                                + "    <BACK_QUOTED_IDENTIFIER> ...\n"
+                                + "    <BIG_QUERY_BACK_QUOTED_IDENTIFIER> 
...\n"
+                                + "    <HYPHENATED_IDENTIFIER> ...\n"
+                                + "    <IDENTIFIER> ...\n"
+                                + "    <UNICODE_QUOTED_IDENTIFIER> ...\n"
+                                + "    ");
+    }
+
+    @Test
+    void testAlterMaterializedTableRefreshMode() {
+        final String sql1 = "ALTER MATERIALIZED TABLE tbl1 SET REFRESH_MODE = 
FULL";
+        final String expect1 = "ALTER MATERIALIZED TABLE `TBL1` SET 
REFRESH_MODE = FULL";
+        sql(sql1).ok(expect1);
+
+        final String sql2 = "ALTER MATERIALIZED TABLE tbl1 SET REFRESH_MODE = 
CONTINUOUS";
+        final String expect2 = "ALTER MATERIALIZED TABLE `TBL1` SET 
REFRESH_MODE = CONTINUOUS";
+        sql(sql2).ok(expect2);
+
+        final String sql3 = "ALTER MATERIALIZED TABLE tbl1 SET REFRESH_MOD^E^";
+        sql(sql3)
+                .fails(
+                        "Encountered \"<EOF>\" at line 1, column 46.\n"
+                                + "Was expecting:\n"
+                                + "    \"=\" ...\n"
+                                + "    ");
+
+        final String sql4 = "ALTER MATERIALIZED TABLE tbl1 SET REFRESH_MODE = 
^NONE^";
+        sql(sql4)
+                .fails(
+                        "Encountered \"NONE\" at line 1, column 50.\n"
+                                + "Was expecting one of:\n"
+                                + "    \"FULL\" ...\n"
+                                + "    \"CONTINUOUS\" ...\n"
+                                + "    ");
+    }
+
+    @Test
+    void testAlterMaterializedTableFreshness() {
+        final String sql1 = "ALTER MATERIALIZED TABLE tbl1 SET FRESHNESS = 
INTERVAL '1' DAY";
+        final String expect1 = "ALTER MATERIALIZED TABLE `TBL1` SET FRESHNESS 
= INTERVAL '1' DAY";
+        sql(sql1).ok(expect1);
+
+        final String sql2 = "ALTER MATERIALIZED TABLE tbl1 SET FRESHNESS = 
INTERVAL 1 ^DAY^";
+        sql(sql2)
+                .fails(
+                        "MATERIALIZED TABLE only supports define interval type 
FRESHNESS, please refer to the materialized table document.");
+
+        final String sql3 = "ALTER MATERIALIZED TABLE tbl1 SET FRESHNES^S^";
+        sql(sql3)
+                .fails(
+                        "Encountered \"<EOF>\" at line 1, column 43.\n"
+                                + "Was expecting:\n"
+                                + "    \"=\" ...\n"
+                                + "    ");
+    }
+
+    @Test
+    void testAlterMaterializedTableSet() {
+        final String sql1 =
+                "ALTER MATERIALIZED TABLE tbl1 SET (\n"
+                        + "  'key1' = 'val1',\n"
+                        + "  'key2' = 'val2'\n"
+                        + ")";
+        final String expect1 =
+                "ALTER MATERIALIZED TABLE `TBL1` SET (\n"
+                        + "  'key1' = 'val1',\n"
+                        + "  'key2' = 'val2'\n"
+                        + ")";
+        sql(sql1).ok(expect1);
+
+        final String sql2 = "ALTER MATERIALIZED TABLE tbl1 SET ()";
+        final String expect2 = "ALTER MATERIALIZED TABLE `TBL1` SET (\n" + ")";
+
+        sql(sql2).ok(expect2);
+
+        final String sql3 = "ALTER MATERIALIZED TABLE tbl1 SE^T^";
+        sql(sql3)
+                .fails(
+                        "Encountered \"<EOF>\" at line 1, column 33.\n"
+                                + "Was expecting one of:\n"
+                                + "    \"FRESHNESS\" ...\n"
+                                + "    \"REFRESH_MODE\" ...\n"
+                                + "    \"\\(\" ...\n"
+                                + "    ");
+    }
+
+    @Test
+    void testAlterMaterializedTableReset() {
+        final String sql1 = "ALTER MATERIALIZED TABLE tbl1 RESET ('key1', 
'key2')";
+        final String expect1 =
+                "ALTER MATERIALIZED TABLE `TBL1` RESET (\n" + "  'key1',\n" + 
"  'key2'\n" + ")";
+        sql(sql1).ok(expect1);
+
+        final String sql2 = "ALTER MATERIALIZED TABLE tbl1 RESET ()";
+        final String expect2 = "ALTER MATERIALIZED TABLE `TBL1` RESET (\n" + 
")";
+        sql(sql2).ok(expect2);
+
+        final String sql3 = "ALTER MATERIALIZED TABLE tbl1 RESE^T^";
+        sql(sql3)
+                .fails(
+                        "Encountered \"<EOF>\" at line 1, column 35.\n"
+                                + "Was expecting:\n"
+                                + "    \"\\(\" ...\n"
+                                + "    ");
+    }
+
     public SqlParserFixture fixture() {
         return SqlParserFixture.DEFAULT.withConfig(
                 c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY));


Reply via email to