This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 330f524d185 [FLINK-35191][table-api] Support alter materialized table related syntax: suspend, resume, refresh, set and reset 330f524d185 is described below commit 330f524d185d575ceb679a6c587e9c39612e844c Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Mon Apr 29 10:21:12 2024 +0800 [FLINK-35191][table-api] Support alter materialized table related syntax: suspend, resume, refresh, set and reset This closes #24737 --- .../src/main/codegen/data/Parser.tdd | 12 ++ .../src/main/codegen/includes/parserImpls.ftl | 94 +++++++++++ .../sql/parser/ddl/SqlAlterMaterializedTable.java | 61 +++++++ .../ddl/SqlAlterMaterializedTableFreshness.java | 60 +++++++ .../ddl/SqlAlterMaterializedTableOptions.java | 67 ++++++++ .../ddl/SqlAlterMaterializedTableRefresh.java | 61 +++++++ .../ddl/SqlAlterMaterializedTableRefreshMode.java | 62 +++++++ .../parser/ddl/SqlAlterMaterializedTableReset.java | 67 ++++++++ .../ddl/SqlAlterMaterializedTableResume.java | 74 ++++++++ .../ddl/SqlAlterMaterializedTableSuspend.java | 47 ++++++ .../flink/sql/parser/utils/ParserResource.java | 2 +- .../MaterializedTableStatementParserTest.java | 187 ++++++++++++++++++++- 12 files changed, 792 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index dfb43353a4b..100e9edd2fb 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -35,6 +35,14 @@ "org.apache.flink.sql.parser.ddl.SqlAddPartitions.AlterTableAddPartitionContext" "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" "org.apache.flink.sql.parser.ddl.SqlAlterFunction" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableFreshness" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableOptions" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefreshMode" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableReset" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume" + "org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend" "org.apache.flink.sql.parser.ddl.SqlAlterTable" "org.apache.flink.sql.parser.ddl.SqlAlterTable.AlterTableContext" "org.apache.flink.sql.parser.ddl.SqlAlterTableAdd" @@ -191,6 +199,9 @@ "STATISTICS" "STOP" "STRING" + "SUSPEND" + "REFRESH" + "RESUME" "TABLES" "TIMESTAMP_LTZ" "TRY_CAST" @@ -581,6 +592,7 @@ "SqlShowCreate()" "SqlReplaceTable()" "SqlRichDescribeTable()" + "SqlAlterMaterializedTable()" "SqlAlterTable()" "SqlAlterView()" "SqlShowModules()" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index b52f41aa951..95509e7b8da 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1779,6 +1779,100 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar } } +/** +* Parses alter materialized table. +*/ +SqlAlterMaterializedTable SqlAlterMaterializedTable() : +{ + SqlParserPos startPos; + SqlIdentifier tableIdentifier; + SqlNodeList propertyList = SqlNodeList.EMPTY; + SqlNodeList propertyKeyList = SqlNodeList.EMPTY; + SqlNodeList partSpec = SqlNodeList.EMPTY; + SqlNode freshness = null; +} +{ + <ALTER> <MATERIALIZED> <TABLE> { startPos = getPos();} + tableIdentifier = CompoundIdentifier() + ( + <SUSPEND> + { + return new SqlAlterMaterializedTableSuspend(startPos, tableIdentifier); + } + | + <RESUME> + [ <WITH> propertyList = TableProperties() ] + { + return new SqlAlterMaterializedTableResume( + startPos, + tableIdentifier, + propertyList); + } + | + <REFRESH> + [ <PARTITION> { + partSpec = new SqlNodeList(getPos()); + PartitionSpecCommaList(partSpec); + } + ] + { + return new SqlAlterMaterializedTableRefresh( + startPos.plus(getPos()), + tableIdentifier, + partSpec); + } + | + <SET> + ( + <FRESHNESS> <EQ> freshness = Expression(ExprContext.ACCEPT_NON_QUERY) { + if (!(freshness instanceof SqlIntervalLiteral)) { + throw SqlUtil.newContextException( + getPos(), + ParserResource.RESOURCE.unsupportedFreshnessType()); + } + return new SqlAlterMaterializedTableFreshness( + startPos.plus(getPos()), + tableIdentifier, + (SqlIntervalLiteral) freshness); + } + | + <REFRESH_MODE> <EQ> + ( + <FULL> { + return new SqlAlterMaterializedTableRefreshMode( + startPos.plus(getPos()), + tableIdentifier, + SqlRefreshMode.FULL.symbol(getPos())); + } + | + <CONTINUOUS> { + return new SqlAlterMaterializedTableRefreshMode( + startPos.plus(getPos()), + tableIdentifier, + SqlRefreshMode.CONTINUOUS.symbol(getPos())); + } + ) + | + propertyList = TableProperties() + { + return new SqlAlterMaterializedTableOptions( + startPos.plus(getPos()), + tableIdentifier, + propertyList); + } + ) + | + <RESET> + propertyKeyList = TablePropertyKeys() + { + return new SqlAlterMaterializedTableReset( + startPos.plus(getPos()), + tableIdentifier, + propertyKeyList); + } + ) +} + /** * Parses an INSERT statement. */ diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java new file mode 100644 index 00000000000..f8037b05e4c --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTable.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import static java.util.Objects.requireNonNull; + +/** + * Abstract class to describe statements like ALTER MATERIALIZED TABLE [catalogName.] + * [dataBasesName.]tableName ... + */ +public abstract class SqlAlterMaterializedTable extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("ALTER MATERIALIZED TABLE", SqlKind.ALTER_TABLE); + + protected final SqlIdentifier tableIdentifier; + + public SqlAlterMaterializedTable(SqlParserPos pos, SqlIdentifier tableName) { + super(pos); + this.tableIdentifier = requireNonNull(tableName, "tableName should not be null"); + } + + public SqlIdentifier getTableName() { + return tableIdentifier; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("ALTER MATERIALIZED TABLE"); + tableIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableFreshness.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableFreshness.java new file mode 100644 index 00000000000..45f068a995f --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableFreshness.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlIntervalLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +/** + * SqlNode to describe ALTER MATERIALIZED TABLE [catalogName.] [dataBasesName.]tableName SET + * FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY } clause. + */ +public class SqlAlterMaterializedTableFreshness extends SqlAlterMaterializedTable { + + private final SqlIntervalLiteral freshness; + + public SqlAlterMaterializedTableFreshness( + SqlParserPos pos, SqlIdentifier tableName, SqlIntervalLiteral freshness) { + super(pos, tableName); + this.freshness = freshness; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(getTableName(), freshness); + } + + public SqlIntervalLiteral getFreshness() { + return freshness; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("SET FRESHNESS"); + writer.keyword("="); + freshness.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableOptions.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableOptions.java new file mode 100644 index 00000000000..74c10e4de7e --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableOptions.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +/** + * SqlNode to describe ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SET ('key' = + * 'val') clause. + */ +public class SqlAlterMaterializedTableOptions extends SqlAlterMaterializedTable { + + private final SqlNodeList propertyList; + + public SqlAlterMaterializedTableOptions( + SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) { + super(pos, tableName); + this.propertyList = propertyList; + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(getTableName(), propertyList); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("SET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java new file mode 100644 index 00000000000..26ec1bcd7c7 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * SqlNode to describe ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH + * [PARTITION (key1=val1, key2=val2, ...)] clause. + */ +public class SqlAlterMaterializedTableRefresh extends SqlAlterMaterializedTable { + + private final SqlNodeList partitionSpec; + + public SqlAlterMaterializedTableRefresh( + SqlParserPos pos, SqlIdentifier tableName, @Nullable SqlNodeList partitionSpec) { + super(pos, tableName); + this.partitionSpec = partitionSpec; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(getTableName(), partitionSpec); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("REFRESH"); + if (partitionSpec != null && partitionSpec.size() > 0) { + writer.keyword("PARTITION"); + partitionSpec.unparse( + writer, getOperator().getLeftPrec(), getOperator().getRightPrec()); + } + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefreshMode.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefreshMode.java new file mode 100644 index 00000000000..a0bf98951f1 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefreshMode.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * SqlNode to describe ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SET REFRESH_MODE + * = { FULL | CONTINUOUS } clause. + */ +public class SqlAlterMaterializedTableRefreshMode extends SqlAlterMaterializedTable { + + private final SqlLiteral sqlRefreshMode; + + public SqlAlterMaterializedTableRefreshMode( + SqlParserPos pos, SqlIdentifier tableName, SqlLiteral sqlRefreshMode) { + super(pos, tableName); + this.sqlRefreshMode = requireNonNull(sqlRefreshMode, "refreshMode should not be null"); + } + + public SqlLiteral getSqlRefreshMode() { + return sqlRefreshMode; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(getTableName()); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("SET REFRESH_MODE"); + writer.keyword("="); + sqlRefreshMode.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableReset.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableReset.java new file mode 100644 index 00000000000..20f99eab91d --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableReset.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +/** + * SqlNode to describe ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESET ('key') + * clause. + */ +public class SqlAlterMaterializedTableReset extends SqlAlterMaterializedTable { + + private final SqlNodeList propertyKeyList; + + public SqlAlterMaterializedTableReset( + SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyKeyList) { + super(pos, tableName); + this.propertyKeyList = propertyKeyList; + } + + public SqlNodeList getPropertyKeyList() { + return propertyKeyList; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(getTableName(), propertyKeyList); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("RESET"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode propertyKey : propertyKeyList) { + SqlUnparseUtils.printIndent(writer); + propertyKey.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableResume.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableResume.java new file mode 100644 index 00000000000..657f4cd6610 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableResume.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * SqlNode to describe ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME clause. + */ +public class SqlAlterMaterializedTableResume extends SqlAlterMaterializedTable { + + private final SqlNodeList propertyList; + + public SqlAlterMaterializedTableResume( + SqlParserPos pos, SqlIdentifier tableName, SqlNodeList propertyList) { + super(pos, tableName); + + this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(getTableName(), propertyList); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("RESUME"); + + if (propertyList.size() > 0) { + writer.newlineAndIndent(); + writer.keyword("WITH"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSuspend.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSuspend.java new file mode 100644 index 00000000000..56907fdb545 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableSuspend.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import java.util.List; + +/** + * SqlNode to describe ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND clause. + */ +public class SqlAlterMaterializedTableSuspend extends SqlAlterMaterializedTable { + public SqlAlterMaterializedTableSuspend(SqlParserPos pos, SqlIdentifier tableName) { + super(pos, tableName); + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of(getTableName()); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + super.unparse(writer, leftPrec, rightPrec); + writer.keyword("SUSPEND"); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 4f6a5fcf739..d2ae7a1c911 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -51,7 +51,7 @@ public interface ParserResource { Resources.ExInst<ParseException> bucketCountMustBePositiveInteger(); @Resources.BaseMessage( - "CREATE MATERIALIZED TABLE only supports interval type FRESHNESS, please refer to the materialized table document.") + "MATERIALIZED TABLE only supports define interval type FRESHNESS, please refer to the materialized table document.") Resources.ExInst<ParseException> unsupportedFreshnessType(); @Resources.BaseMessage("CREATE TEMPORARY MATERIALIZED TABLE is not supported.") diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java index 8818e97c835..0d6ab7ca45f 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java @@ -118,7 +118,7 @@ public class MaterializedTableStatementParserTest { + "FRESHNESS = ^123^\n" + "AS SELECT a, b, h, t m FROM source"; sql(sql).fails( - "CREATE MATERIALIZED TABLE only supports interval type FRESHNESS, please refer to the materialized table document."); + "MATERIALIZED TABLE only supports define interval type FRESHNESS, please refer to the materialized table document."); final String sql2 = "CREATE MATERIALIZED TABLE tbl1\n" @@ -199,6 +199,191 @@ public class MaterializedTableStatementParserTest { sql(sql).fails("REPLACE MATERIALIZED TABLE is not supported."); } + @Test + void testAlterMaterializedTableSuspend() { + final String sql = "ALTER MATERIALIZED TABLE tb1 SUSPEND"; + final String expect = "ALTER MATERIALIZED TABLE `TB1` SUSPEND"; + sql(sql).ok(expect); + + final String sql2 = "ALTER MATERIALIZED TABLE tb1 SUSPEND ^PARTITION^"; + sql(sql2) + .fails( + "Encountered \"PARTITION\" at line 1, column 38.\n" + + "Was expecting:\n" + + " <EOF> \n" + + " "); + + final String sql3 = "ALTER MATERIALIZED TABLE tb^1^"; + sql(sql3) + .fails( + "Encountered \"<EOF>\" at line 1, column 28.\n" + + "Was expecting one of:\n" + + " \"RESET\" ...\n" + + " \"SET\" ...\n" + + " \"SUSPEND\" ...\n" + + " \"REFRESH\" ...\n" + + " \"RESUME\" ...\n" + + " \".\" ...\n" + + " "); + } + + @Test + void testAlterMaterializedTableResume() { + final String sql1 = + "ALTER MATERIALIZED TABLE tb1 RESUME\n" + + "WITH (\n" + + " 'group.id' = 'testGroup',\n" + + " 'topic' = 'test'\n" + + ")"; + final String expect1 = + "ALTER MATERIALIZED TABLE `TB1` RESUME\n" + + "WITH (\n" + + " 'group.id' = 'testGroup',\n" + + " 'topic' = 'test'\n" + + ")"; + sql(sql1).ok(expect1); + + final String sql2 = "ALTER MATERIALIZED TABLE tb1 RESUME"; + final String expect2 = "ALTER MATERIALIZED TABLE `TB1` RESUME"; + sql(sql2).ok(expect2); + + final String sql3 = "ALTER MATERIALIZED TABLE tb1 RESUME ^PARTITION^"; + sql(sql3) + .fails( + "Encountered \"PARTITION\" at line 1, column 37.\n" + + "Was expecting one of:\n" + + " <EOF> \n" + + " \"WITH\" ...\n" + + " "); + } + + @Test + void testAlterMaterializedTableRefresh() { + final String sql1 = "ALTER MATERIALIZED TABLE tbl1 REFRESH"; + final String expected1 = "ALTER MATERIALIZED TABLE `TBL1` REFRESH"; + sql(sql1).ok(expected1); + + final String sql2 = + "ALTER MATERIALIZED TABLE tbl1 REFRESH \n" + + " PARTITION (part1 = 2023, part2 = 2024)"; + final String expected2 = + "ALTER MATERIALIZED TABLE `TBL1` REFRESH " + + "PARTITION (`PART1` = 2023, `PART2` = 2024)"; + sql(sql2).ok(expected2); + + final String sql3 = "ALTER MATERIALIZED TABLE tbl1 REFRESH PARTITION(^)^"; + sql(sql3) + .fails( + "Encountered \"\\)\" at line 1, column 49.\n" + + "Was expecting one of:\n" + + " <BRACKET_QUOTED_IDENTIFIER> ...\n" + + " <QUOTED_IDENTIFIER> ...\n" + + " <BACK_QUOTED_IDENTIFIER> ...\n" + + " <BIG_QUERY_BACK_QUOTED_IDENTIFIER> ...\n" + + " <HYPHENATED_IDENTIFIER> ...\n" + + " <IDENTIFIER> ...\n" + + " <UNICODE_QUOTED_IDENTIFIER> ...\n" + + " "); + } + + @Test + void testAlterMaterializedTableRefreshMode() { + final String sql1 = "ALTER MATERIALIZED TABLE tbl1 SET REFRESH_MODE = FULL"; + final String expect1 = "ALTER MATERIALIZED TABLE `TBL1` SET REFRESH_MODE = FULL"; + sql(sql1).ok(expect1); + + final String sql2 = "ALTER MATERIALIZED TABLE tbl1 SET REFRESH_MODE = CONTINUOUS"; + final String expect2 = "ALTER MATERIALIZED TABLE `TBL1` SET REFRESH_MODE = CONTINUOUS"; + sql(sql2).ok(expect2); + + final String sql3 = "ALTER MATERIALIZED TABLE tbl1 SET REFRESH_MOD^E^"; + sql(sql3) + .fails( + "Encountered \"<EOF>\" at line 1, column 46.\n" + + "Was expecting:\n" + + " \"=\" ...\n" + + " "); + + final String sql4 = "ALTER MATERIALIZED TABLE tbl1 SET REFRESH_MODE = ^NONE^"; + sql(sql4) + .fails( + "Encountered \"NONE\" at line 1, column 50.\n" + + "Was expecting one of:\n" + + " \"FULL\" ...\n" + + " \"CONTINUOUS\" ...\n" + + " "); + } + + @Test + void testAlterMaterializedTableFreshness() { + final String sql1 = "ALTER MATERIALIZED TABLE tbl1 SET FRESHNESS = INTERVAL '1' DAY"; + final String expect1 = "ALTER MATERIALIZED TABLE `TBL1` SET FRESHNESS = INTERVAL '1' DAY"; + sql(sql1).ok(expect1); + + final String sql2 = "ALTER MATERIALIZED TABLE tbl1 SET FRESHNESS = INTERVAL 1 ^DAY^"; + sql(sql2) + .fails( + "MATERIALIZED TABLE only supports define interval type FRESHNESS, please refer to the materialized table document."); + + final String sql3 = "ALTER MATERIALIZED TABLE tbl1 SET FRESHNES^S^"; + sql(sql3) + .fails( + "Encountered \"<EOF>\" at line 1, column 43.\n" + + "Was expecting:\n" + + " \"=\" ...\n" + + " "); + } + + @Test + void testAlterMaterializedTableSet() { + final String sql1 = + "ALTER MATERIALIZED TABLE tbl1 SET (\n" + + " 'key1' = 'val1',\n" + + " 'key2' = 'val2'\n" + + ")"; + final String expect1 = + "ALTER MATERIALIZED TABLE `TBL1` SET (\n" + + " 'key1' = 'val1',\n" + + " 'key2' = 'val2'\n" + + ")"; + sql(sql1).ok(expect1); + + final String sql2 = "ALTER MATERIALIZED TABLE tbl1 SET ()"; + final String expect2 = "ALTER MATERIALIZED TABLE `TBL1` SET (\n" + ")"; + + sql(sql2).ok(expect2); + + final String sql3 = "ALTER MATERIALIZED TABLE tbl1 SE^T^"; + sql(sql3) + .fails( + "Encountered \"<EOF>\" at line 1, column 33.\n" + + "Was expecting one of:\n" + + " \"FRESHNESS\" ...\n" + + " \"REFRESH_MODE\" ...\n" + + " \"\\(\" ...\n" + + " "); + } + + @Test + void testAlterMaterializedTableReset() { + final String sql1 = "ALTER MATERIALIZED TABLE tbl1 RESET ('key1', 'key2')"; + final String expect1 = + "ALTER MATERIALIZED TABLE `TBL1` RESET (\n" + " 'key1',\n" + " 'key2'\n" + ")"; + sql(sql1).ok(expect1); + + final String sql2 = "ALTER MATERIALIZED TABLE tbl1 RESET ()"; + final String expect2 = "ALTER MATERIALIZED TABLE `TBL1` RESET (\n" + ")"; + sql(sql2).ok(expect2); + + final String sql3 = "ALTER MATERIALIZED TABLE tbl1 RESE^T^"; + sql(sql3) + .fails( + "Encountered \"<EOF>\" at line 1, column 35.\n" + + "Was expecting:\n" + + " \"\\(\" ...\n" + + " "); + } + public SqlParserFixture fixture() { return SqlParserFixture.DEFAULT.withConfig( c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY));