This is an automated email from the ASF dual-hosted git repository. yuxia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 76d2a8d0deb [FLINK-32516][table] Support to parse [CREATE OR ] REPLACE TABLE AS statement (#22934) 76d2a8d0deb is described below commit 76d2a8d0deb8a81a98ed82b6c0613f79bf2a800c Author: zhangmang <zhangma...@163.com> AuthorDate: Tue Jul 4 16:25:42 2023 +0800 [FLINK-32516][table] Support to parse [CREATE OR ] REPLACE TABLE AS statement (#22934) --- .../src/main/codegen/data/Parser.tdd | 2 + .../src/main/codegen/includes/parserImpls.ftl | 108 +++++++- .../flink/sql/parser/ddl/SqlReplaceTableAs.java | 273 +++++++++++++++++++++ .../flink/sql/parser/FlinkSqlParserImplTest.java | 111 +++++++++ 4 files changed, 483 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 49f94a64262..3e04377e47e 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -69,6 +69,7 @@ "org.apache.flink.sql.parser.ddl.SqlDropTable" "org.apache.flink.sql.parser.ddl.SqlDropView" "org.apache.flink.sql.parser.ddl.SqlRemoveJar" + "org.apache.flink.sql.parser.ddl.SqlReplaceTableAs" "org.apache.flink.sql.parser.ddl.SqlReset" "org.apache.flink.sql.parser.ddl.SqlSet" "org.apache.flink.sql.parser.ddl.SqlTableColumn" @@ -562,6 +563,7 @@ "SqlShowTables()" "SqlShowColumns()" "SqlShowCreate()" + "SqlReplaceTable()" "SqlRichDescribeTable()" "SqlAlterTable()" "SqlAlterView()" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 7f6fc56b2aa..27b0307172f 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -1336,17 +1336,32 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) : <AS> asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) { - return new SqlCreateTableAs(startPos.plus(getPos()), - tableName, - columnList, - constraints, - propertyList, - partitionColumns, - watermark, - comment, - asQuery, - isTemporary, - ifNotExists); + if (replace) { + return new SqlReplaceTableAs(startPos.plus(getPos()), + tableName, + columnList, + constraints, + propertyList, + partitionColumns, + watermark, + comment, + asQuery, + isTemporary, + ifNotExists, + true); + } else { + return new SqlCreateTableAs(startPos.plus(getPos()), + tableName, + columnList, + constraints, + propertyList, + partitionColumns, + watermark, + comment, + asQuery, + isTemporary, + ifNotExists); + } } ] { @@ -1441,6 +1456,77 @@ SqlDrop SqlDropTable(Span s, boolean replace, boolean isTemporary) : } } +/** +* Parser a REPLACE TABLE AS statement +*/ +SqlNode SqlReplaceTable() : +{ + SqlIdentifier tableName; + SqlCharStringLiteral comment = null; + SqlNode asQuery = null; + SqlNodeList propertyList = SqlNodeList.EMPTY; + SqlParserPos pos; + boolean isTemporary = false; + List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>(); + SqlWatermark watermark = null; + SqlNodeList columnList = SqlNodeList.EMPTY; + SqlNodeList partitionColumns = SqlNodeList.EMPTY; + boolean ifNotExists = false; +} +{ + <REPLACE> + [ + <TEMPORARY> { isTemporary = true; } + ] + <TABLE> + + ifNotExists = IfNotExistsOpt() + + tableName = CompoundIdentifier() + [ + <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();} + TableColumn(ctx) + ( + <COMMA> TableColumn(ctx) + )* + { + pos = getPos(); + columnList = new SqlNodeList(ctx.columnList, pos); + constraints = ctx.constraints; + watermark = ctx.watermark; + } + <RPAREN> + ] + [ <COMMENT> <QUOTED_STRING> { + String p = SqlParserUtil.parseString(token.image); + comment = SqlLiteral.createCharString(p, getPos()); + }] + [ + <PARTITIONED> <BY> + partitionColumns = ParenthesizedSimpleIdentifierList() + ] + [ + <WITH> + propertyList = TableProperties() + ] + <AS> + asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) + { + return new SqlReplaceTableAs(getPos(), + tableName, + columnList, + constraints, + propertyList, + partitionColumns, + watermark, + comment, + asQuery, + isTemporary, + ifNotExists, + false); + } +} + /** * Parses an INSERT statement. */ diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java new file mode 100644 index 00000000000..472dc28535b --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.ddl; + +import org.apache.flink.sql.parser.ExtendedSqlNode; +import org.apache.flink.sql.parser.SqlConstraintValidator; +import org.apache.flink.sql.parser.SqlUnparseUtils; +import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint; +import org.apache.flink.sql.parser.error.SqlValidateException; + +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlCreate; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.ImmutableNullableList; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * {@link SqlNode} to describe the [CREATE OR] REPLACE TABLE AS (RTAS) syntax. The RTAS would create + * a pipeline to compute the result of the given query and create or replace the derived table. + * + * <p>Notes: REPLACE TABLE AS: the derived table must exist. CREATE OR REPLACE TABLE AS: create the + * derived table if it does not exist, otherwise replace it. + * + * <p>Example: + * + * <pre>{@code + * CREATE TABLE base_table ( + * id BIGINT, + * name STRING, + * time TIMESTAMP, + * PRIMARY KEY(id) + * ) WITH ( + * ‘connector’ = ‘kafka’, + * ‘connector.starting-offset’: ‘12345’, + * ‘format’ = ‘json’ + * ) + * + * CREATE OR REPLACE TABLE derived_table + * WITH ( + * 'connector' = 'jdbc', + * 'url' = 'http://localhost:10000', + * 'table-name' = 'syncedTable' + * ) + * AS SELECT * FROM base_table; + * }</pre> + */ +public class SqlReplaceTableAs extends SqlCreate implements ExtendedSqlNode { + + public static final SqlSpecialOperator REPLACE_OPERATOR = + new SqlSpecialOperator("REPLACE TABLE AS", SqlKind.OTHER_DDL); + + public static final SqlSpecialOperator CREATE_OR_REPLACE_OPERATOR = + new SqlSpecialOperator("CREATE OR REPLACE TABLE AS", SqlKind.OTHER_DDL); + + private final SqlIdentifier tableName; + + private final SqlNodeList columnList; + + private final SqlNodeList propertyList; + + private final List<SqlTableConstraint> tableConstraints; + + private final SqlNodeList partitionKeyList; + + private final SqlWatermark watermark; + + private final SqlCharStringLiteral comment; + + private final boolean isTemporary; + + private final boolean isCreateOrReplace; + + private final SqlNode asQuery; + + public SqlReplaceTableAs( + SqlParserPos pos, + SqlIdentifier tableName, + SqlNodeList columnList, + List<SqlTableConstraint> tableConstraints, + SqlNodeList propertyList, + SqlNodeList partitionKeyList, + @Nullable SqlWatermark watermark, + @Nullable SqlCharStringLiteral comment, + SqlNode asQuery, + boolean isTemporary, + boolean ifNotExists, + boolean isCreateOrReplace) { + super( + isCreateOrReplace ? CREATE_OR_REPLACE_OPERATOR : REPLACE_OPERATOR, + pos, + true, + ifNotExists); + + this.tableName = requireNonNull(tableName, "tableName should not be null"); + this.columnList = requireNonNull(columnList, "columnList should not be null"); + this.tableConstraints = + requireNonNull(tableConstraints, "table constraints should not be null"); + this.propertyList = requireNonNull(propertyList, "propertyList should not be null"); + this.partitionKeyList = + requireNonNull(partitionKeyList, "partitionKeyList should not be null"); + this.watermark = watermark; + this.comment = comment; + this.isTemporary = isTemporary; + + this.asQuery = asQuery; + this.isCreateOrReplace = isCreateOrReplace; + } + + @Override + public @Nonnull List<SqlNode> getOperandList() { + return ImmutableNullableList.of( + tableName, + columnList, + new SqlNodeList(tableConstraints, SqlParserPos.ZERO), + propertyList, + partitionKeyList, + watermark, + comment, + asQuery); + } + + @Override + public void validate() throws SqlValidateException { + SqlConstraintValidator.validateAndChangeColumnNullability(tableConstraints, columnList); + // The following features are not currently supported by RTAS, but may be supported in the + // future + String errorMsg = + isCreateOrReplace ? "CREATE OR REPLACE TABLE AS SELECT" : "REPLACE TABLE AS SELECT"; + + if (isIfNotExists()) { + throw new SqlValidateException( + getParserPosition(), + errorMsg + " syntax does not support IF NOT EXISTS statements yet."); + } + + if (isTemporary()) { + throw new SqlValidateException( + getParserPosition(), + errorMsg + " syntax does not support temporary table yet."); + } + + if (getColumnList().size() > 0) { + throw new SqlValidateException( + getParserPosition(), + errorMsg + " syntax does not support to specify explicit columns yet."); + } + + if (getWatermark().isPresent()) { + throw new SqlValidateException( + getParserPosition(), + errorMsg + " syntax does not support to specify explicit watermark yet."); + } + if (getPartitionKeyList().size() > 0) { + throw new SqlValidateException( + getParserPosition(), + errorMsg + " syntax does not support to create partitioned table yet."); + } + if (getFullConstraints().stream().anyMatch(SqlTableConstraint::isPrimaryKey)) { + throw new SqlValidateException( + getParserPosition(), + errorMsg + " syntax does not support primary key constraints yet."); + } + } + + public SqlNode getAsQuery() { + return asQuery; + } + + public boolean isCreateOrReplace() { + return isCreateOrReplace; + } + + public SqlIdentifier getTableName() { + return tableName; + } + + public SqlNodeList getColumnList() { + return columnList; + } + + public SqlNodeList getPropertyList() { + return propertyList; + } + + public SqlNodeList getPartitionKeyList() { + return partitionKeyList; + } + + public List<SqlTableConstraint> getTableConstraints() { + return tableConstraints; + } + + public Optional<SqlWatermark> getWatermark() { + return Optional.ofNullable(watermark); + } + + public Optional<SqlCharStringLiteral> getComment() { + return Optional.ofNullable(comment); + } + + public boolean isIfNotExists() { + return ifNotExists; + } + + public boolean isTemporary() { + return isTemporary; + } + + /** Returns the column constraints plus the table constraints. */ + public List<SqlTableConstraint> getFullConstraints() { + return SqlConstraintValidator.getFullConstraints(tableConstraints, columnList); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + if (isCreateOrReplace) { + writer.keyword("CREATE OR"); + } + writer.keyword("REPLACE TABLE"); + tableName.unparse(writer, leftPrec, rightPrec); + + if (comment != null) { + writer.newlineAndIndent(); + writer.keyword("COMMENT"); + comment.unparse(writer, leftPrec, rightPrec); + } + + if (this.propertyList.size() > 0) { + writer.keyword("WITH"); + SqlWriter.Frame withFrame = writer.startList("(", ")"); + for (SqlNode property : propertyList) { + SqlUnparseUtils.printIndent(writer); + property.unparse(writer, leftPrec, rightPrec); + } + writer.newlineAndIndent(); + writer.endList(withFrame); + } + + writer.newlineAndIndent(); + writer.keyword("AS"); + writer.newlineAndIndent(); + this.asQuery.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 5e9842bd865..0958e6c58d5 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -2524,6 +2524,117 @@ class FlinkSqlParserImplTest extends SqlParserTest { "CREATE TABLE AS SELECT syntax does not support to create partitioned table yet.")); } + @Test + void testReplaceTableAsSelect() { + // test replace table as select without options + sql("REPLACE TABLE t AS SELECT * FROM b").ok("REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`"); + + // test replace table as select with options + sql("REPLACE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b") + .ok("REPLACE TABLE `T` WITH (\n 'test' = 'zm'\n)\nAS\nSELECT *\nFROM `B`"); + + // test replace table as select with tmp table + sql("REPLACE TEMPORARY TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "REPLACE TABLE AS SELECT syntax does not support temporary table yet.")); + + // test replace table as select with explicit columns + sql("REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "REPLACE TABLE AS SELECT syntax does not support to specify explicit columns yet.")); + + // test replace table as select with watermark + sql("REPLACE TABLE t (watermark FOR ts AS ts - interval '3' second) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "REPLACE TABLE AS SELECT syntax does not support to specify explicit watermark yet.")); + + // test replace table as select with constraints + sql("REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls " + + "if the constraint checks are performed on the incoming/outgoing data. " + + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode")); + + sql("REPLACE TABLE t (PRIMARY KEY (col1), PRIMARY KEY (col2) NOT ENFORCED) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node(new ValidationMatcher().fails("Duplicate primary key definition")); + + sql("REPLACE TABLE t (UNIQUE (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node(new ValidationMatcher().fails("UNIQUE constraint is not supported yet")); + + // test replace table as select with partition key + sql("REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "REPLACE TABLE AS SELECT syntax does not support to create partitioned table yet.")); + } + + @Test + void testCreateOrReplaceTableAsSelect() { + // test create or replace table as select without options + sql("CREATE OR REPLACE TABLE t AS SELECT * FROM b") + .ok("CREATE OR REPLACE TABLE `T`\nAS\nSELECT *\nFROM `B`"); + + // test create or replace table as select with options + sql("CREATE OR REPLACE TABLE t WITH ('test' = 'zm') AS SELECT * FROM b") + .ok( + "CREATE OR REPLACE TABLE `T` WITH (\n 'test' = 'zm'\n)\nAS\nSELECT *\nFROM `B`"); + + // test create or replace table as select with create table like + sql("CREATE OR REPLACE TABLE t (col1 string) WITH ('test' = 'zm') like b ^AS^ SELECT col1 FROM b") + .fails("(?s).*Encountered \"AS\" at line 1, column 69.*"); + + // test create or replace table as select with tmp table + sql("CREATE OR REPLACE TEMPORARY TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "CREATE OR REPLACE TABLE AS SELECT syntax does not support temporary table yet.")); + + // test create or replace table as select with explicit columns + sql("CREATE OR REPLACE TABLE t (col1 string) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "CREATE OR REPLACE TABLE AS SELECT syntax does not support to specify explicit columns yet.")); + + // test create or replace table as select with watermark + sql("CREATE OR REPLACE TABLE t (watermark FOR ts AS ts - interval '3' second) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "CREATE OR REPLACE TABLE AS SELECT syntax does not support to specify explicit watermark yet.")); + // test create or replace table as select with constraints + sql("CREATE OR REPLACE TABLE t (PRIMARY KEY (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "Flink doesn't support ENFORCED mode for PRIMARY KEY constraint. ENFORCED/NOT ENFORCED controls " + + "if the constraint checks are performed on the incoming/outgoing data. " + + "Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode")); + + sql("CREATE OR REPLACE TABLE t (PRIMARY KEY (col1), PRIMARY KEY (col2) NOT ENFORCED) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node(new ValidationMatcher().fails("Duplicate primary key definition")); + + sql("CREATE OR REPLACE TABLE t (UNIQUE (col1)) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node(new ValidationMatcher().fails("UNIQUE constraint is not supported yet")); + + // test create or replace table as select with partition key + sql("CREATE OR REPLACE TABLE t PARTITIONED BY(col1) WITH ('test' = 'zm') AS SELECT col1 FROM b") + .node( + new ValidationMatcher() + .fails( + "CREATE OR REPLACE TABLE AS SELECT syntax does not support to create partitioned table yet.")); + } + @Test void testShowJobs() { sql("show jobs").ok("SHOW JOBS");