twalthr commented on code in PR #27381:
URL: https://github.com/apache/flink/pull/27381#discussion_r2661753195


##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlAlterTableDrop.java:
##########
@@ -0,0 +1,184 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.table;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name MODIFY 
column/constraint/watermark clause.
+ *
+ * <p>Example: DDL like the below for modify column/constraint/watermark.
+ *
+ * <pre>{@code
+ * -- modify single column
+ * ALTER TABLE mytable MODIFY new_column STRING COMMENT 'new_column docs';
+ *
+ * -- modify multiple columns, constraint, and watermark
+ * ALTER TABLE mytable MODIFY (
+ *     log_ts STRING COMMENT 'log timestamp string' FIRST,
+ *     ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
+ *     col_meta int metadata from 'mk1' virtual AFTER col_b,
+ *     PRIMARY KEY (id) NOT ENFORCED,
+ *     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
+ * );
+ * }</pre>
+ */
+public abstract class SqlAlterTableDrop extends SqlAlterTableSchema {
+
+    public SqlAlterTableDrop(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList modifiedColumns,
+            List<SqlTableConstraint> constraints,
+            @Nullable SqlWatermark watermark,
+            boolean ifTableExists) {
+        super(pos, tableName, modifiedColumns, constraints, watermark, 
ifTableExists);
+    }
+
+    public SqlAlterTableDrop(SqlParserPos pos, SqlIdentifier tableName, 
boolean ifTableExists) {
+        super(pos, tableName, SqlNodeList.EMPTY, List.of(), null, 
ifTableExists);
+    }
+
+    protected abstract void unparseDropOperation(SqlWriter writer, int 
leftPrec, int rightPrec);
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        return List.of(tableIdentifier);
+    }
+
+    @Override
+    protected String getAlterOperation() {
+        return "DROP";
+    }
+
+    @Override
+    public void unparseAlterOperation(SqlWriter writer, int leftPrec, int 
rightPrec) {
+        super.unparseAlterOperation(writer, leftPrec, rightPrec);
+        unparseDropOperation(writer, leftPrec, rightPrec);
+    }
+
+    /** ALTER TABLE [IF EXISTS ][catalog_name.][db_name.]table_name DROP 
PRIMARY KEY. */
+    public static class SqlAlterTableDropPrimaryKey extends SqlAlterTableDrop {
+
+        public SqlAlterTableDropPrimaryKey(
+                SqlParserPos pos, SqlIdentifier tableName, boolean 
ifTableExists) {
+            super(pos, tableName, ifTableExists);
+        }
+
+        @Override
+        protected void unparseDropOperation(SqlWriter writer, int leftPrec, 
int rightPrec) {
+            writer.keyword("PRIMARY KEY");
+        }
+    }
+
+    /**
+     * ALTER TABLE [IF EXISTS ][catalog_name.][db_name.]table_name DROP 
CONSTRAINT constraint_name.
+     */
+    public static class SqlAlterTableDropConstraint extends SqlAlterTableDrop {
+        private final SqlIdentifier constraintName;
+
+        public SqlAlterTableDropConstraint(
+                SqlParserPos pos,
+                SqlIdentifier tableName,
+                SqlIdentifier constraintName,
+                boolean ifTableExists) {
+            super(pos, tableName, ifTableExists);
+            this.constraintName = constraintName;
+        }
+
+        public SqlIdentifier getConstraintName() {
+            return constraintName;
+        }
+
+        @Override
+        protected void unparseDropOperation(SqlWriter writer, int leftPrec, 
int rightPrec) {
+            writer.keyword("CONSTRAINT");
+            constraintName.unparse(writer, leftPrec, rightPrec);
+        }
+    }
+
+    /** ALTER TABLE [IF EXISTS ][catalog_name.][db_name.]table_name DROP 
PRIMARY KEY. */

Review Comment:
   Seems this is the doc of DROP PRIMARY KEY, not DropWatermark.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropFunctionConverter.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlDropFunction;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
+
+/** Convert DROP FUNCTION statement. */
+public class SqlDropFunctionConverter implements 
SqlNodeConverter<SqlDropFunction> {
+    @Override
+    public Operation convertSqlNode(SqlDropFunction sqlDropFunction, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlDropFunction.getFullName());
+        if (sqlDropFunction.isSystemFunction()) {
+            return new DropTempSystemFunctionOperation(

Review Comment:
   nit: Interesting that we don't check for isTemporary here. I hope the parser 
catches this.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java:
##########
@@ -992,10 +994,63 @@ private static List<TestSpec> 
createWithInvalidFreshness() {
                         "Materialized table freshness only support SECOND, 
MINUTE, HOUR, DAY as the time unit."));
     }
 
+    private static Collection<TestSpec> alterDrop() {
+        return List.of(
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl DROP WATERMARK",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The current materialized table does not 
define any watermark strategy."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl_without_constraint 
DROP PRIMARY KEY",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The current materialized table does not 
define any primary key."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl DROP CONSTRAINT 
invalid_constraint_name",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The current materialized table does not 
define a primary key constraint named 'invalid_constraint_name'. Available 
constraint name: ['ct1']."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl DROP 
invalid_column_name",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The column `invalid_column_name` does not 
exist in the base table."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl DROP 
invalid_column_name",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The column `invalid_column_name` does not 
exist in the base table."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl DROP (a, b, a)",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "Duplicate column `a`."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl DROP a",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The column `a` is used as the partition 
keys."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl_with_metadata DROP 
a",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The column `a` is used as the primary 
key."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl_with_metadata DROP 
b",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The column `b` is used as a distribution 
key."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl_with_metadata DROP 
t",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The column `t` is referenced by watermark 
expression."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl_with_metadata DROP 
d",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "Column(s) ('d') are used in query."),
+                TestSpec.of(
+                        "ALTER MATERIALIZED TABLE base_mtbl_with_metadata DROP 
m_p",
+                        "Failed to execute ALTER MATERIALIZED TABLE 
statement.\n"
+                                + "The column m_p is persisted column. 
Dropping of persisted columns is not supported."));

Review Comment:
   ```suggestion
                                   + "The column `m_p` is persisted column. 
Dropping of persisted columns is not supported."));
   ```



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlAlterTableDrop.java:
##########
@@ -0,0 +1,184 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl.table;
+
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.ddl.SqlWatermark;
+import org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
+
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * SqlNode to describe ALTER TABLE [IF EXISTS] table_name MODIFY 
column/constraint/watermark clause.

Review Comment:
   Seems this is the doc of MODIFY, not AlterTableDrop.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -1650,7 +1650,7 @@ void testFailedToAlterTableAddColumn() throws Exception {
         // refer to a nested inner field
         assertThatThrownBy(() -> parse("alter table tb1 add (x string after 
e.f2)"))
                 .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Alter nested row type is not supported 
yet.");
+                .hasMessageContaining("Alter nested row type e.f2 is not 
supported yet.");

Review Comment:
   nit:
   ```suggestion
                   .hasMessageContaining("Altering the nested row type `e`.`f2` 
is not supported yet.");
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/MergeTableLikeUtil.java:
##########
@@ -56,7 +55,7 @@
 import java.util.function.Function;
 
 /** A utility class with logic for handling the {@code CREATE TABLE ... LIKE} 
clause. */
-class MergeTableLikeUtil {
+public class MergeTableLikeUtil {

Review Comment:
   nit: Add `@Internal` annotation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to