This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fb9354d8e1fc7bc17bc59b874089a457badacbe8 Author: Jane Chan <qingyue....@gmail.com> AuthorDate: Wed Dec 28 20:10:29 2022 +0800 [FLINK-22137][sql-parser] Support ALTER TABLE DROP syntax --- .../src/main/codegen/data/Parser.tdd | 3 ++ .../src/main/codegen/includes/parserImpls.ftl | 46 ++++++++++++++--- .../apache/flink/sql/parser/SqlUnparseUtils.java | 4 +- ...onstraint.java => SqlAlterTableDropColumn.java} | 58 +++++++++++++--------- .../parser/ddl/SqlAlterTableDropConstraint.java | 9 ++-- ...raint.java => SqlAlterTableDropPrimaryKey.java} | 31 +++--------- ...traint.java => SqlAlterTableDropWatermark.java} | 40 +++++++-------- .../flink/sql/parser/FlinkSqlParserImplTest.java | 40 +++++++++++++++ .../planner/operations/AlterSchemaConverter.java | 3 +- 9 files changed, 152 insertions(+), 82 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 378c3c6f0a1..a117fdc7da0 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -38,7 +38,10 @@ "org.apache.flink.sql.parser.ddl.SqlAlterTableAdd" "org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint" "org.apache.flink.sql.parser.ddl.SqlAlterTableCompact" + "org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn" "org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint" + "org.apache.flink.sql.parser.ddl.SqlAlterTableDropPrimaryKey" + "org.apache.flink.sql.parser.ddl.SqlAlterTableDropWatermark" "org.apache.flink.sql.parser.ddl.SqlAlterTableModify" "org.apache.flink.sql.parser.ddl.SqlAlterTableOptions" "org.apache.flink.sql.parser.ddl.SqlAlterTableRename" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 57424abc495..e24b39e544d 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -684,13 +684,45 @@ SqlAlterTable SqlAlterTable() : } | - <DROP> <CONSTRAINT> - constraintName = SimpleIdentifier() { - return new SqlAlterTableDropConstraint( - tableIdentifier, - constraintName, - startPos.plus(getPos())); - } + <DROP> + ( + { SqlIdentifier columnName = null; } + columnName = CompoundIdentifier() { + return new SqlAlterTableDropColumn( + startPos.plus(getPos()), + tableIdentifier, + new SqlNodeList( + Collections.singletonList(columnName), + getPos())); + } + | + { Pair<SqlNodeList, SqlNodeList> columnWithTypePair = null; } + columnWithTypePair = ParenthesizedCompoundIdentifierList() { + return new SqlAlterTableDropColumn( + startPos.plus(getPos()), + tableIdentifier, + columnWithTypePair.getKey()); + } + | + <PRIMARY> <KEY> { + return new SqlAlterTableDropPrimaryKey( + startPos.plus(getPos()), + tableIdentifier); + } + | + <CONSTRAINT> constraintName = SimpleIdentifier() { + return new SqlAlterTableDropConstraint( + startPos.plus(getPos()), + tableIdentifier, + constraintName); + } + | + <WATERMARK> { + return new SqlAlterTableDropWatermark( + startPos.plus(getPos()), + tableIdentifier); + } + ) | [ <PARTITION> diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java index 9f0299ec147..4d9fd47a90d 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java @@ -25,6 +25,8 @@ import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlWriter; +import javax.annotation.Nullable; + import java.util.List; /** Utils to unparse DDLs. */ @@ -38,7 +40,7 @@ public class SqlUnparseUtils { int rightPrec, SqlNodeList columnList, List<SqlTableConstraint> constraints, - SqlWatermark watermark) { + @Nullable SqlWatermark watermark) { SqlWriter.Frame frame = writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")"); for (SqlNode column : columnList) { printIndent(writer); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java similarity index 52% copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java index 5bfd6e39216..534a7a7a593 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,44 +18,56 @@ package org.apache.flink.sql.parser.ddl; +import org.apache.flink.sql.parser.SqlUnparseUtils; + import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.util.ImmutableNullableList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; -/** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT constraint_name. */ -public class SqlAlterTableDropConstraint extends SqlAlterTable { - private final SqlIdentifier constraintName; - - /** - * Creates an alter table drop constraint node. - * - * @param tableID Table ID - * @param constraintName Constraint name - * @param pos Parser position - */ - public SqlAlterTableDropConstraint( - SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos pos) { - super(pos, tableID); - this.constraintName = constraintName; - } +/** + * SqlNode to describe ALTER TABLE table_name DROP column clause. + * + * <p>Example: DDL like the below for drop column. + * + * <pre>{@code + * -- drop single column + * ALTER TABLE prod.db.sample DROP col1; + * + * -- drop multiple columns + * ALTER TABLE prod.db.sample DROP (col1, col2, col3); + * }</pre> + */ +public class SqlAlterTableDropColumn extends SqlAlterTable { + + private final SqlNodeList columnList; - public SqlIdentifier getConstraintName() { - return constraintName; + public SqlAlterTableDropColumn( + SqlParserPos pos, SqlIdentifier tableName, SqlNodeList columnList) { + super(pos, tableName); + this.columnList = columnList; } @Override public List<SqlNode> getOperandList() { - return ImmutableNullableList.of(getTableName(), this.constraintName); + return Arrays.asList(tableIdentifier, columnList); + } + + public SqlNodeList getColumnList() { + return columnList; } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { super.unparse(writer, leftPrec, rightPrec); - writer.keyword("DROP CONSTRAINT"); - this.constraintName.unparse(writer, leftPrec, rightPrec); + writer.keyword("DROP"); + // unparse table column + SqlUnparseUtils.unparseTableSchema( + writer, leftPrec, rightPrec, columnList, Collections.emptyList(), null); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java index 5bfd6e39216..59b6b85c13e 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java @@ -28,18 +28,19 @@ import java.util.List; /** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT constraint_name. */ public class SqlAlterTableDropConstraint extends SqlAlterTable { + private final SqlIdentifier constraintName; /** * Creates an alter table drop constraint node. * - * @param tableID Table ID - * @param constraintName Constraint name * @param pos Parser position + * @param tableName Table name + * @param constraintName Constraint name */ public SqlAlterTableDropConstraint( - SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos pos) { - super(pos, tableID); + SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier constraintName) { + super(pos, tableName); this.constraintName = constraintName; } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java similarity index 56% copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java index 5bfd6e39216..fa7d48efd0d 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,40 +22,25 @@ import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.util.ImmutableNullableList; +import java.util.Collections; import java.util.List; -/** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT constraint_name. */ -public class SqlAlterTableDropConstraint extends SqlAlterTable { - private final SqlIdentifier constraintName; +/** ALTER TABLE [catalog_name.][db_name.]table_name DROP PRIMARY KEY. */ +public class SqlAlterTableDropPrimaryKey extends SqlAlterTable { - /** - * Creates an alter table drop constraint node. - * - * @param tableID Table ID - * @param constraintName Constraint name - * @param pos Parser position - */ - public SqlAlterTableDropConstraint( - SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos pos) { - super(pos, tableID); - this.constraintName = constraintName; - } - - public SqlIdentifier getConstraintName() { - return constraintName; + public SqlAlterTableDropPrimaryKey(SqlParserPos pos, SqlIdentifier tableName) { + super(pos, tableName); } @Override public List<SqlNode> getOperandList() { - return ImmutableNullableList.of(getTableName(), this.constraintName); + return Collections.singletonList(tableIdentifier); } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { super.unparse(writer, leftPrec, rightPrec); - writer.keyword("DROP CONSTRAINT"); - this.constraintName.unparse(writer, leftPrec, rightPrec); + writer.keyword("DROP PRIMARY KEY"); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java similarity index 56% copy from flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java copy to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java index 5bfd6e39216..593a77634cd 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,40 +22,34 @@ import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.util.ImmutableNullableList; +import java.util.Collections; import java.util.List; -/** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT constraint_name. */ -public class SqlAlterTableDropConstraint extends SqlAlterTable { - private final SqlIdentifier constraintName; - - /** - * Creates an alter table drop constraint node. - * - * @param tableID Table ID - * @param constraintName Constraint name - * @param pos Parser position - */ - public SqlAlterTableDropConstraint( - SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos pos) { - super(pos, tableID); - this.constraintName = constraintName; - } +/** + * SqlNode to describe ALTER TABLE table_name DROP watermark clause. + * + * <p>Example: DDL like the below for drop watermark. + * + * <pre>{@code + * -- drop watermark + * ALTER TABLE prod.db.sample DROP WATERMARK; + * }</pre> + */ +public class SqlAlterTableDropWatermark extends SqlAlterTable { - public SqlIdentifier getConstraintName() { - return constraintName; + public SqlAlterTableDropWatermark(SqlParserPos pos, SqlIdentifier tableName) { + super(pos, tableName); } @Override public List<SqlNode> getOperandList() { - return ImmutableNullableList.of(getTableName(), this.constraintName); + return Collections.emptyList(); } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { super.unparse(writer, leftPrec, rightPrec); - writer.keyword("DROP CONSTRAINT"); - this.constraintName.unparse(writer, leftPrec, rightPrec); + writer.keyword("DROP WATERMARK"); } } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index fea81a9fe2d..5a3498e6ab7 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -563,6 +563,46 @@ class FlinkSqlParserImplTest extends SqlParserTest { sql(sql1).ok(expected1); } + @Test + public void testAlterTableDropSingleColumn() { + sql("alter table t1 drop id").ok("ALTER TABLE `T1` DROP (\n" + " `ID`\n" + ")"); + + sql("alter table t1 drop (id)").ok("ALTER TABLE `T1` DROP (\n" + " `ID`\n" + ")"); + + sql("alter table t1 drop tuple.id") + .ok("ALTER TABLE `T1` DROP (\n" + " `TUPLE`.`ID`\n" + ")"); + } + + @Test + public void testAlterTableDropMultipleColumn() { + sql("alter table t1 drop (id, ts, tuple.f0, tuple.f1)") + .ok( + "ALTER TABLE `T1` DROP (\n" + + " `ID`,\n" + + " `TS`,\n" + + " `TUPLE`.`F0`,\n" + + " `TUPLE`.`F1`\n" + + ")"); + } + + @Test + public void testAlterTableDropPrimaryKey() { + sql("alter table t1 drop primary key").ok("ALTER TABLE `T1` DROP PRIMARY KEY"); + } + + @Test + public void testAlterTableDropConstraint() { + sql("alter table t1 drop constraint ct").ok("ALTER TABLE `T1` DROP CONSTRAINT `CT`"); + + sql("alter table t1 drop constrain^t^") + .fails("(?s).*Encountered \"<EOF>\" at line 1, column 30.\n.*"); + } + + @Test + public void testAlterTableDropWatermark() { + sql("alter table t1 drop watermark").ok("ALTER TABLE `T1` DROP WATERMARK"); + } + @Test void testAlterTableReset() { sql("alter table t1 reset ('key1')").ok("ALTER TABLE `T1` RESET (\n 'key1'\n)"); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java index 9376e2c2527..b00038ce20f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java @@ -35,6 +35,7 @@ import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.SchemaResolver; import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.expressions.SqlCallExpression; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; @@ -94,7 +95,7 @@ public class AlterSchemaConverter { public Schema applySchemaChange( SqlAlterTableSchema alterTableSchema, - Schema originalSchema, + Schema originSchema, List<TableChange> tableChangeCollector) { AlterSchemaStrategy strategy = computeAlterSchemaStrategy(alterTableSchema); SchemaConverter converter =