This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 512f3fbaac8 [FLINK-35190][table-api] Support create materialized table syntax 512f3fbaac8 is described below commit 512f3fbaac8b538f7edda152af0846dc2b213557 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Wed Apr 24 16:17:40 2024 +0800 [FLINK-35190][table-api] Support create materialized table syntax This closes #24707 --- .../src/main/codegen/data/Parser.tdd | 21 +- .../src/main/codegen/includes/parserImpls.ftl | 93 +++++++++ .../sql/parser/ddl/SqlCreateMaterializedTable.java | 203 +++++++++++++++++++ .../flink/sql/parser/ddl/SqlRefreshMode.java | 47 +++++ .../flink/sql/parser/utils/ParserResource.java | 10 + .../MaterializedTableStatementParserTest.java | 214 +++++++++++++++++++++ .../MaterializedTableStatementUnParserTest.java | 46 +++++ 7 files changed, 627 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index d88a9c0f454..dfb43353a4b 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -56,6 +56,7 @@ "org.apache.flink.sql.parser.ddl.SqlCreateCatalog" "org.apache.flink.sql.parser.ddl.SqlCreateDatabase" "org.apache.flink.sql.parser.ddl.SqlCreateFunction" + "org.apache.flink.sql.parser.ddl.SqlCreateMaterializedTable" "org.apache.flink.sql.parser.ddl.SqlCreateTable" "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext" "org.apache.flink.sql.parser.ddl.SqlCreateTableAs" @@ -69,6 +70,7 @@ "org.apache.flink.sql.parser.ddl.SqlDropPartitions.AlterTableDropPartitionsContext" "org.apache.flink.sql.parser.ddl.SqlDropTable" "org.apache.flink.sql.parser.ddl.SqlDropView" + "org.apache.flink.sql.parser.ddl.SqlRefreshMode" "org.apache.flink.sql.parser.ddl.SqlRemoveJar" "org.apache.flink.sql.parser.ddl.SqlReplaceTableAs" "org.apache.flink.sql.parser.ddl.SqlReset" @@ -130,6 +132,7 @@ "org.apache.calcite.sql.SqlAlienSystemTypeNameSpec" "org.apache.calcite.sql.SqlCreate" "org.apache.calcite.sql.SqlDrop" + "org.apache.calcite.sql.SqlIntervalLiteral" "java.util.ArrayList" "java.util.Collections" "java.util.HashSet" @@ -141,14 +144,17 @@ # keyword, please also add it to 'nonReservedKeywords' section. # Please keep the keyword in alphabetical order if new keyword is added. keywords: [ + "ANALYZE" "BUCKETS" "BYTES" "CATALOGS" "CHANGELOG_MODE" + "COLUMNS" "COMMENT" "COMPACT" "COMPILE" - "COLUMNS" + "COMPUTE" + "CONTINUOUS" "DATABASES" "DISTRIBUTED" "DISTRIBUTION" @@ -157,42 +163,43 @@ "ESTIMATED_COST" "EXTENDED" "FUNCTIONS" + "FRESHNESS" "HASH" "IF" "JSON_EXECUTION_PLAN" - "PLAN_ADVICE" "JAR" "JARS" "JOB" "JOBS" "LOAD" "METADATA" + "MATERIALIZED" "MODIFY" "MODULES" "OVERWRITE" "OVERWRITING" "PARTITIONED" "PARTITIONS" + "PLAN_ADVICE" "PROCEDURES" "PYTHON" "RAW" + "REFRESH_MODE" "REMOVE" "RENAME" "SCALA" + "STATISTICS" "STOP" "STRING" "TABLES" + "TIMESTAMP_LTZ" + "TRY_CAST" "UNLOAD" "USE" "VIEWS" "VIRTUAL" "WATERMARK" "WATERMARKS" - "TIMESTAMP_LTZ" - "TRY_CAST" - "ANALYZE" - "COMPUTE" - "STATISTICS" ] # List of keywords from "keywords" section that are not reserved. diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 9ac769e18a2..368230d9819 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1682,6 +1682,97 @@ SqlNode SqlReplaceTable() : } } +/** + * Parses a CREATE MATERIALIZED TABLE statement. +*/ +SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporary) : +{ + final SqlParserPos startPos = s.pos(); + SqlIdentifier tableName; + SqlCharStringLiteral comment = null; + SqlTableConstraint constraint = null; + SqlNodeList partitionColumns = SqlNodeList.EMPTY; + SqlNodeList propertyList = SqlNodeList.EMPTY; + SqlNode freshness = null; + SqlLiteral refreshMode = null; + SqlNode asQuery = null; +} +{ + <MATERIALIZED> + { + if (isTemporary) { + throw SqlUtil.newContextException( + getPos(), + ParserResource.RESOURCE.createTemporaryMaterializedTableUnsupported()); + } + if (replace) { + throw SqlUtil.newContextException( + getPos(), + ParserResource.RESOURCE.replaceMaterializedTableUnsupported()); + } + } + <TABLE> + tableName = CompoundIdentifier() + [ + <LPAREN> + constraint = TableConstraint() + <RPAREN> + ] + [ + <COMMENT> <QUOTED_STRING> + { + String p = SqlParserUtil.parseString(token.image); + comment = SqlLiteral.createCharString(p, getPos()); + } + ] + [ + <PARTITIONED> <BY> + partitionColumns = ParenthesizedSimpleIdentifierList() + ] + [ + <WITH> + propertyList = TableProperties() + ] + <FRESHNESS> <EQ> + freshness = Expression(ExprContext.ACCEPT_NON_QUERY) + { + if (!(freshness instanceof SqlIntervalLiteral)) + { + throw SqlUtil.newContextException( + getPos(), + ParserResource.RESOURCE.unsupportedFreshnessType()); + } + } + [ + <REFRESH_MODE> <EQ> + ( + <FULL> + { + refreshMode = SqlRefreshMode.FULL.symbol(getPos()); + } + | + <CONTINUOUS> + { + refreshMode = SqlRefreshMode.CONTINUOUS.symbol(getPos()); + } + ) + ] + <AS> + asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) + { + return new SqlCreateMaterializedTable( + startPos.plus(getPos()), + tableName, + comment, + constraint, + partitionColumns, + propertyList, + (SqlIntervalLiteral) freshness, + refreshMode, + asQuery); + } +} + /** * Parses an INSERT statement. */ @@ -2187,6 +2278,8 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) : ( create = SqlCreateCatalog(s, replace) | + create = SqlCreateMaterializedTable(s, replace, isTemporary) + | create = SqlCreateTable(s, replace, isTemporary) | create = SqlCreateView(s, replace, isTemporary) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java new file mode 100644 index 00000000000..1630a0f0117 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.SqlUnparseUtils; +import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlCreate; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlIntervalLiteral; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** CREATE MATERIALIZED TABLE DDL sql call. */ +public class SqlCreateMaterializedTable extends SqlCreate { + + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("CREATE MATERIALIZED TABLE", SqlKind.CREATE_TABLE); + + private final SqlIdentifier tableName; + + private final SqlCharStringLiteral comment; + + private final SqlTableConstraint tableConstraint; + + private final SqlNodeList partitionKeyList; + + private final SqlNodeList propertyList; + + private final SqlIntervalLiteral freshness; + + @Nullable private final SqlLiteral refreshMode; + + private final SqlNode asQuery; + + public SqlCreateMaterializedTable( + SqlParserPos pos, + SqlIdentifier tableName, + @Nullable SqlCharStringLiteral comment, + @Nullable SqlTableConstraint tableConstraint, + SqlNodeList partitionKeyList, + SqlNodeList propertyList, + SqlIntervalLiteral freshness, + @Nullable SqlLiteral refreshMode, + SqlNode asQuery) { + super(OPERATOR, pos, false, false); + this.tableName = requireNonNull(tableName, "tableName should not be null"); + this.comment = comment; + this.tableConstraint = tableConstraint; + this.partitionKeyList = + requireNonNull(partitionKeyList, "partitionKeyList should not be null"); + this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); + this.freshness = requireNonNull(freshness, "freshness should not be null"); + this.refreshMode = refreshMode; + this.asQuery = requireNonNull(asQuery, "asQuery should not be null"); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + return ImmutableNullableList.of( + tableName, + comment, + tableConstraint, + partitionKeyList, + propertyList, + freshness, + asQuery); + } + + public SqlIdentifier getTableName() { + return tableName; + } + + public String[] fullTableName() { + return tableName.names.toArray(new String[0]); + } + + public Optional<SqlCharStringLiteral> getComment() { + return Optional.ofNullable(comment); + } + + public Optional<SqlTableConstraint> getTableConstraint() { + return Optional.ofNullable(tableConstraint); + } + + public SqlNodeList getPartitionKeyList() { + return partitionKeyList; + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + public SqlIntervalLiteral getFreshness() { + return freshness; + } + + @Nullable + public Optional<SqlLiteral> getRefreshMode() { + return Optional.ofNullable(refreshMode); + } + + public SqlNode getAsQuery() { + return asQuery; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE MATERIALIZED TABLE"); + tableName.unparse(writer, leftPrec, rightPrec); + + if (tableConstraint != null) { + writer.newlineAndIndent(); + SqlUnparseUtils.unparseTableSchema( + writer, + leftPrec, + rightPrec, + SqlNodeList.EMPTY, + Collections.singletonList(tableConstraint), + null); + } + + if (comment != null) { + writer.newlineAndIndent(); + writer.keyword("COMMENT"); + comment.unparse(writer, leftPrec, rightPrec); + } + + if (partitionKeyList.size() > 0) { + writer.newlineAndIndent(); + writer.keyword("PARTITIONED BY"); + SqlWriter.Frame partitionedByFrame = writer.startList("(", ")"); + partitionKeyList.unparse(writer, leftPrec, rightPrec); + writer.endList(partitionedByFrame); + writer.newlineAndIndent(); + } + + if (propertyList.size() > 0) { + writer.keyword("WITH"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + + writer.newlineAndIndent(); + writer.keyword("FRESHNESS"); + writer.keyword("="); + freshness.unparse(writer, leftPrec, rightPrec); + + if (refreshMode != null) { + writer.newlineAndIndent(); + writer.keyword("REFRESH_MODE"); + writer.keyword("="); + refreshMode.unparse(writer, leftPrec, rightPrec); + } + + writer.newlineAndIndent(); + writer.keyword("AS"); + writer.newlineAndIndent(); + asQuery.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRefreshMode.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRefreshMode.java new file mode 100644 index 00000000000..7646b8b5e09 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlRefreshMode.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.parser.SqlParserPos; + +/** Enumeration of materialized table refresh mode. */ +public enum SqlRefreshMode { + FULL("FULL"), + CONTINUOUS("CONTINUOUS"); + + private final String digest; + + SqlRefreshMode(String digest) { + this.digest = digest; + } + + @Override + public String toString() { + return this.digest; + } + + /** + * Creates a parse-tree node representing an occurrence of this keyword at a particular position + * in the parsed text. + */ + public SqlLiteral symbol(SqlParserPos pos) { + return SqlLiteral.createSymbol(this, pos); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java index 0758b0d5887..4f6a5fcf739 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java @@ -49,4 +49,14 @@ public interface ParserResource { @Resources.BaseMessage("Bucket count must be a positive integer.") Resources.ExInst<ParseException> bucketCountMustBePositiveInteger(); + + @Resources.BaseMessage( + "CREATE MATERIALIZED TABLE only supports interval type FRESHNESS, please refer to the materialized table document.") + Resources.ExInst<ParseException> unsupportedFreshnessType(); + + @Resources.BaseMessage("CREATE TEMPORARY MATERIALIZED TABLE is not supported.") + Resources.ExInst<ParseException> createTemporaryMaterializedTableUnsupported(); + + @Resources.BaseMessage("REPLACE MATERIALIZED TABLE is not supported.") + Resources.ExInst<ParseException> replaceMaterializedTableUnsupported(); } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java new file mode 100644 index 00000000000..8818e97c835 --- /dev/null +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementParserTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser; + +import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; + +import org.apache.calcite.sql.parser.SqlParserFixture; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; + +import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; + +/** Sql parser test for materialized related syntax. * */ +@Execution(CONCURRENT) +public class MaterializedTableStatementParserTest { + + @Test + void testCreateMaterializedTable() { + final String sql = + "CREATE MATERIALIZED TABLE tbl1\n" + + "(\n" + + " PRIMARY KEY (a, b)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS SELECT a, b, h, t m FROM source"; + final String expected = + "CREATE MATERIALIZED TABLE `TBL1`\n" + + "(\n" + + " PRIMARY KEY (`A`, `B`)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (`A`, `H`)\n" + + "WITH (\n" + + " 'group.id' = 'latest',\n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS\n" + + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + + "FROM `SOURCE`"; + sql(sql).ok(expected); + + final String sql2 = + "CREATE MATERIALIZED TABLE tbl1\n" + + "(\n" + + " PRIMARY KEY (a, b)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT a, b, h, t m FROM source"; + final String expected2 = + "CREATE MATERIALIZED TABLE `TBL1`\n" + + "(\n" + + " PRIMARY KEY (`A`, `B`)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "REFRESH_MODE = FULL\n" + + "AS\n" + + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + + "FROM `SOURCE`"; + + sql(sql2).ok(expected2); + + final String sql3 = + "CREATE MATERIALIZED TABLE tbl1\n" + + "COMMENT 'table comment'\n" + + "FRESHNESS = INTERVAL '3' DAY\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT a, b, h, t m FROM source"; + final String expected3 = + "CREATE MATERIALIZED TABLE `TBL1`\n" + + "COMMENT 'table comment'\n" + + "FRESHNESS = INTERVAL '3' DAY\n" + + "REFRESH_MODE = FULL\n" + + "AS\n" + + "SELECT `A`, `B`, `H`, `T` AS `M`\n" + + "FROM `SOURCE`"; + sql(sql3).ok(expected3); + } + + @Test + void testCreateMaterializedTableWithUnsupportedFreshnessInterval() { + final String sql = + "CREATE MATERIALIZED TABLE tbl1\n" + + "(\n" + + " PRIMARY KEY (a, b)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = ^123^\n" + + "AS SELECT a, b, h, t m FROM source"; + sql(sql).fails( + "CREATE MATERIALIZED TABLE only supports interval type FRESHNESS, please refer to the materialized table document."); + + final String sql2 = + "CREATE MATERIALIZED TABLE tbl1\n" + + "(\n" + + " PRIMARY KEY (a, b)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "^AS^ SELECT a, b, h, t m FROM source"; + + sql(sql2) + .fails( + "Encountered \"AS\" at line 11, column 1.\n" + + "Was expecting:\n" + + " \"FRESHNESS\" ...\n" + + " "); + } + + @Test + void testCreateMaterializedTableWithoutAsQuery() { + final String sql = + "CREATE MATERIALIZED TABLE tbl1\n" + + "(\n" + + " PRIMARY KEY (a, b)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "REFRESH_MODE = FULL^\n^"; + sql(sql).fails( + "Encountered \"<EOF>\" at line 12, column 20.\n" + + "Was expecting:\n" + + " \"AS\" ...\n" + + " "); + } + + @Test + void testCreateTemporaryMaterializedTable() { + final String sql = + "CREATE TEMPORARY ^MATERIALIZED^ TABLE tbl1\n" + + "(\n" + + " PRIMARY KEY (a, b)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS SELECT a, b, h, t m FROM source"; + sql(sql).fails("CREATE TEMPORARY MATERIALIZED TABLE is not supported."); + } + + @Test + void testReplaceMaterializedTable() { + final String sql = + "CREATE OR REPLACE ^MATERIALIZED^ TABLE tbl1\n" + + "(\n" + + " PRIMARY KEY (a, b)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a, h)\n" + + "WITH (\n" + + " 'group.id' = 'latest', \n" + + " 'kafka.topic' = 'log.test'\n" + + ")\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "AS SELECT a, b, h, t m FROM source"; + sql(sql).fails("REPLACE MATERIALIZED TABLE is not supported."); + } + + public SqlParserFixture fixture() { + return SqlParserFixture.DEFAULT.withConfig( + c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY)); + } + + protected SqlParserFixture sql(String sql) { + return this.fixture().sql(sql); + } + + protected SqlParserFixture expr(String sql) { + return this.sql(sql).expression(true); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementUnParserTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementUnParserTest.java new file mode 100644 index 00000000000..5121b969167 --- /dev/null +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/MaterializedTableStatementUnParserTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser; + +import org.apache.flink.sql.parser.impl.FlinkSqlParserImpl; + +import org.apache.calcite.sql.parser.SqlParserFixture; +import org.apache.calcite.sql.parser.SqlParserTest; +import org.junit.jupiter.api.parallel.Execution; + +import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT; + +/** + * Extension to {@link MaterializedTableStatementParserTest} that ensures that every expression can + * un-parse successfully. + */ +@Execution(CONCURRENT) +public class MaterializedTableStatementUnParserTest extends MaterializedTableStatementParserTest { + // ~ Constructors ----------------------------------------------------------- + + public MaterializedTableStatementUnParserTest() {} + + // ~ Methods ---------------------------------------------------------------- + + public SqlParserFixture fixture() { + return super.fixture() + .withTester(new SqlParserTest.UnparsingTesterImpl()) + .withConfig(c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY)); + } +}