This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb9354d8e1fc7bc17bc59b874089a457badacbe8
Author: Jane Chan <qingyue....@gmail.com>
AuthorDate: Wed Dec 28 20:10:29 2022 +0800

    [FLINK-22137][sql-parser] Support ALTER TABLE DROP syntax
---
 .../src/main/codegen/data/Parser.tdd               |  3 ++
 .../src/main/codegen/includes/parserImpls.ftl      | 46 ++++++++++++++---
 .../apache/flink/sql/parser/SqlUnparseUtils.java   |  4 +-
 ...onstraint.java => SqlAlterTableDropColumn.java} | 58 +++++++++++++---------
 .../parser/ddl/SqlAlterTableDropConstraint.java    |  9 ++--
 ...raint.java => SqlAlterTableDropPrimaryKey.java} | 31 +++---------
 ...traint.java => SqlAlterTableDropWatermark.java} | 40 +++++++--------
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 40 +++++++++++++++
 .../planner/operations/AlterSchemaConverter.java   |  3 +-
 9 files changed, 152 insertions(+), 82 deletions(-)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd 
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 378c3c6f0a1..a117fdc7da0 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -38,7 +38,10 @@
     "org.apache.flink.sql.parser.ddl.SqlAlterTableAdd"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableAddConstraint"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableCompact"
+    "org.apache.flink.sql.parser.ddl.SqlAlterTableDropColumn"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableDropConstraint"
+    "org.apache.flink.sql.parser.ddl.SqlAlterTableDropPrimaryKey"
+    "org.apache.flink.sql.parser.ddl.SqlAlterTableDropWatermark"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableModify"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableOptions"
     "org.apache.flink.sql.parser.ddl.SqlAlterTableRename"
diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 57424abc495..e24b39e544d 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -684,13 +684,45 @@ SqlAlterTable SqlAlterTable() :
         }
 
     |
-        <DROP> <CONSTRAINT>
-        constraintName = SimpleIdentifier() {
-            return new SqlAlterTableDropConstraint(
-                tableIdentifier,
-                constraintName,
-                startPos.plus(getPos()));
-        }
+     <DROP>
+        (
+            { SqlIdentifier columnName = null; }
+            columnName = CompoundIdentifier() {
+                return new SqlAlterTableDropColumn(
+                            startPos.plus(getPos()),
+                            tableIdentifier,
+                            new SqlNodeList(
+                                Collections.singletonList(columnName),
+                                getPos()));
+            }
+        |
+            { Pair<SqlNodeList, SqlNodeList> columnWithTypePair = null; }
+            columnWithTypePair = ParenthesizedCompoundIdentifierList() {
+                return new SqlAlterTableDropColumn(
+                            startPos.plus(getPos()),
+                            tableIdentifier,
+                            columnWithTypePair.getKey());
+            }
+        |
+            <PRIMARY> <KEY> {
+                return new SqlAlterTableDropPrimaryKey(
+                        startPos.plus(getPos()),
+                        tableIdentifier);
+            }
+        |
+            <CONSTRAINT> constraintName = SimpleIdentifier() {
+                return new SqlAlterTableDropConstraint(
+                            startPos.plus(getPos()),
+                            tableIdentifier,
+                            constraintName);
+            }
+        |
+            <WATERMARK> {
+                return new SqlAlterTableDropWatermark(
+                            startPos.plus(getPos()),
+                            tableIdentifier);
+            }
+        )
     |
         [
             <PARTITION>
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
index 9f0299ec147..4d9fd47a90d 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
@@ -25,6 +25,8 @@ import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlWriter;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /** Utils to unparse DDLs. */
@@ -38,7 +40,7 @@ public class SqlUnparseUtils {
             int rightPrec,
             SqlNodeList columnList,
             List<SqlTableConstraint> constraints,
-            SqlWatermark watermark) {
+            @Nullable SqlWatermark watermark) {
         SqlWriter.Frame frame = 
writer.startList(SqlWriter.FrameTypeEnum.create("sds"), "(", ")");
         for (SqlNode column : columnList) {
             printIndent(writer);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
similarity index 52%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
index 5bfd6e39216..534a7a7a593 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropColumn.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,44 +18,56 @@
 
 package org.apache.flink.sql.parser.ddl;
 
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
-/** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT 
constraint_name. */
-public class SqlAlterTableDropConstraint extends SqlAlterTable {
-    private final SqlIdentifier constraintName;
-
-    /**
-     * Creates an alter table drop constraint node.
-     *
-     * @param tableID Table ID
-     * @param constraintName Constraint name
-     * @param pos Parser position
-     */
-    public SqlAlterTableDropConstraint(
-            SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos 
pos) {
-        super(pos, tableID);
-        this.constraintName = constraintName;
-    }
+/**
+ * SqlNode to describe ALTER TABLE table_name DROP column clause.
+ *
+ * <p>Example: DDL like the below for drop column.
+ *
+ * <pre>{@code
+ * -- drop single column
+ * ALTER TABLE prod.db.sample DROP col1;
+ *
+ * -- drop multiple columns
+ * ALTER TABLE prod.db.sample DROP (col1, col2, col3);
+ * }</pre>
+ */
+public class SqlAlterTableDropColumn extends SqlAlterTable {
+
+    private final SqlNodeList columnList;
 
-    public SqlIdentifier getConstraintName() {
-        return constraintName;
+    public SqlAlterTableDropColumn(
+            SqlParserPos pos, SqlIdentifier tableName, SqlNodeList columnList) 
{
+        super(pos, tableName);
+        this.columnList = columnList;
     }
 
     @Override
     public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(getTableName(), this.constraintName);
+        return Arrays.asList(tableIdentifier, columnList);
+    }
+
+    public SqlNodeList getColumnList() {
+        return columnList;
     }
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         super.unparse(writer, leftPrec, rightPrec);
-        writer.keyword("DROP CONSTRAINT");
-        this.constraintName.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("DROP");
+        // unparse table column
+        SqlUnparseUtils.unparseTableSchema(
+                writer, leftPrec, rightPrec, columnList, 
Collections.emptyList(), null);
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
index 5bfd6e39216..59b6b85c13e 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
@@ -28,18 +28,19 @@ import java.util.List;
 
 /** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT 
constraint_name. */
 public class SqlAlterTableDropConstraint extends SqlAlterTable {
+
     private final SqlIdentifier constraintName;
 
     /**
      * Creates an alter table drop constraint node.
      *
-     * @param tableID Table ID
-     * @param constraintName Constraint name
      * @param pos Parser position
+     * @param tableName Table name
+     * @param constraintName Constraint name
      */
     public SqlAlterTableDropConstraint(
-            SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos 
pos) {
-        super(pos, tableID);
+            SqlParserPos pos, SqlIdentifier tableName, SqlIdentifier 
constraintName) {
+        super(pos, tableName);
         this.constraintName = constraintName;
     }
 
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
similarity index 56%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
index 5bfd6e39216..fa7d48efd0d 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropPrimaryKey.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,40 +22,25 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
 
+import java.util.Collections;
 import java.util.List;
 
-/** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT 
constraint_name. */
-public class SqlAlterTableDropConstraint extends SqlAlterTable {
-    private final SqlIdentifier constraintName;
+/** ALTER TABLE [catalog_name.][db_name.]table_name DROP PRIMARY KEY. */
+public class SqlAlterTableDropPrimaryKey extends SqlAlterTable {
 
-    /**
-     * Creates an alter table drop constraint node.
-     *
-     * @param tableID Table ID
-     * @param constraintName Constraint name
-     * @param pos Parser position
-     */
-    public SqlAlterTableDropConstraint(
-            SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos 
pos) {
-        super(pos, tableID);
-        this.constraintName = constraintName;
-    }
-
-    public SqlIdentifier getConstraintName() {
-        return constraintName;
+    public SqlAlterTableDropPrimaryKey(SqlParserPos pos, SqlIdentifier 
tableName) {
+        super(pos, tableName);
     }
 
     @Override
     public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(getTableName(), this.constraintName);
+        return Collections.singletonList(tableIdentifier);
     }
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         super.unparse(writer, leftPrec, rightPrec);
-        writer.keyword("DROP CONSTRAINT");
-        this.constraintName.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("DROP PRIMARY KEY");
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
similarity index 56%
copy from 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
copy to 
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
index 5bfd6e39216..593a77634cd 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropConstraint.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableDropWatermark.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,40 +22,34 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.util.ImmutableNullableList;
 
+import java.util.Collections;
 import java.util.List;
 
-/** ALTER TABLE [catalog_name.][db_name.]table_name DROP CONSTRAINT 
constraint_name. */
-public class SqlAlterTableDropConstraint extends SqlAlterTable {
-    private final SqlIdentifier constraintName;
-
-    /**
-     * Creates an alter table drop constraint node.
-     *
-     * @param tableID Table ID
-     * @param constraintName Constraint name
-     * @param pos Parser position
-     */
-    public SqlAlterTableDropConstraint(
-            SqlIdentifier tableID, SqlIdentifier constraintName, SqlParserPos 
pos) {
-        super(pos, tableID);
-        this.constraintName = constraintName;
-    }
+/**
+ * SqlNode to describe ALTER TABLE table_name DROP watermark clause.
+ *
+ * <p>Example: DDL like the below for drop watermark.
+ *
+ * <pre>{@code
+ * -- drop watermark
+ *  ALTER TABLE prod.db.sample DROP WATERMARK;
+ * }</pre>
+ */
+public class SqlAlterTableDropWatermark extends SqlAlterTable {
 
-    public SqlIdentifier getConstraintName() {
-        return constraintName;
+    public SqlAlterTableDropWatermark(SqlParserPos pos, SqlIdentifier 
tableName) {
+        super(pos, tableName);
     }
 
     @Override
     public List<SqlNode> getOperandList() {
-        return ImmutableNullableList.of(getTableName(), this.constraintName);
+        return Collections.emptyList();
     }
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
         super.unparse(writer, leftPrec, rightPrec);
-        writer.keyword("DROP CONSTRAINT");
-        this.constraintName.unparse(writer, leftPrec, rightPrec);
+        writer.keyword("DROP WATERMARK");
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index fea81a9fe2d..5a3498e6ab7 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -563,6 +563,46 @@ class FlinkSqlParserImplTest extends SqlParserTest {
         sql(sql1).ok(expected1);
     }
 
+    @Test
+    public void testAlterTableDropSingleColumn() {
+        sql("alter table t1 drop id").ok("ALTER TABLE `T1` DROP (\n" + "  
`ID`\n" + ")");
+
+        sql("alter table t1 drop (id)").ok("ALTER TABLE `T1` DROP (\n" + "  
`ID`\n" + ")");
+
+        sql("alter table t1 drop tuple.id")
+                .ok("ALTER TABLE `T1` DROP (\n" + "  `TUPLE`.`ID`\n" + ")");
+    }
+
+    @Test
+    public void testAlterTableDropMultipleColumn() {
+        sql("alter table t1 drop (id, ts, tuple.f0, tuple.f1)")
+                .ok(
+                        "ALTER TABLE `T1` DROP (\n"
+                                + "  `ID`,\n"
+                                + "  `TS`,\n"
+                                + "  `TUPLE`.`F0`,\n"
+                                + "  `TUPLE`.`F1`\n"
+                                + ")");
+    }
+
+    @Test
+    public void testAlterTableDropPrimaryKey() {
+        sql("alter table t1 drop primary key").ok("ALTER TABLE `T1` DROP 
PRIMARY KEY");
+    }
+
+    @Test
+    public void testAlterTableDropConstraint() {
+        sql("alter table t1 drop constraint ct").ok("ALTER TABLE `T1` DROP 
CONSTRAINT `CT`");
+
+        sql("alter table t1 drop constrain^t^")
+                .fails("(?s).*Encountered \"<EOF>\" at line 1, column 
30.\n.*");
+    }
+
+    @Test
+    public void testAlterTableDropWatermark() {
+        sql("alter table t1 drop watermark").ok("ALTER TABLE `T1` DROP 
WATERMARK");
+    }
+
     @Test
     void testAlterTableReset() {
         sql("alter table t1 reset ('key1')").ok("ALTER TABLE `T1` RESET (\n  
'key1'\n)");
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
index 9376e2c2527..b00038ce20f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaResolver;
 import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.expressions.SqlCallExpression;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
@@ -94,7 +95,7 @@ public class AlterSchemaConverter {
 
     public Schema applySchemaChange(
             SqlAlterTableSchema alterTableSchema,
-            Schema originalSchema,
+            Schema originSchema,
             List<TableChange> tableChangeCollector) {
         AlterSchemaStrategy strategy = 
computeAlterSchemaStrategy(alterTableSchema);
         SchemaConverter converter =

Reply via email to