LadyForest commented on code in PR #21571:
URL: https://github.com/apache/flink/pull/21571#discussion_r1059214774


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -115,77 +120,135 @@ public Schema applySchemaChange(SqlAlterTableSchema 
alterTableSchema, Schema ori
     }
 
     public Schema applySchemaChange(
-            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable 
originalTable) {
-        String oldColumnName = 
getColumnName(renameColumn.getOriginColumnIdentifier());
+            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable 
originTable) {
+        String originColumnName = 
getColumnName(renameColumn.getOriginColumnIdentifier());
         String newColumnName = 
getColumnName(renameColumn.getNewColumnIdentifier());
         List<String> tableColumns =
-                originalTable.getResolvedSchema().getColumns().stream()
+                originTable.getResolvedSchema().getColumns().stream()
                         .map(Column::getName)
                         .collect(Collectors.toList());
-        // validate old column is exists or new column isn't duplicated or old 
column isn't
-        // referenced by computed column
+        // validate origin column is exists, new column name does not collide 
with existed column
+        // names, and origin column isn't referenced by computed column
         validateColumnName(
-                oldColumnName,
+                originColumnName,
                 newColumnName,
                 tableColumns,
-                originalTable.getResolvedSchema(),
-                ((CatalogTable) 
originalTable.getResolvedTable()).getPartitionKeys());
+                originTable.getResolvedSchema(),
+                ((CatalogTable) 
originTable.getResolvedTable()).getPartitionKeys());
+        validateWatermark(originTable, originColumnName, tableColumns);
 
-        // validate old column isn't referenced by watermark
-        List<WatermarkSpec> watermarkSpecs = 
originalTable.getResolvedSchema().getWatermarkSpecs();
-        watermarkSpecs.forEach(
-                watermarkSpec -> {
-                    String rowtimeAttribute = 
watermarkSpec.getRowtimeAttribute();
-                    Set<String> referencedColumns =
-                            ColumnReferenceFinder.findReferencedColumn(
-                                    watermarkSpec.getWatermarkExpression(), 
tableColumns);
-                    if (oldColumnName.equals(rowtimeAttribute)
-                            || referencedColumns.contains(oldColumnName)) {
-                        throw new ValidationException(
-                                String.format(
-                                        "Old column %s is referred by 
watermark expression %s, "
-                                                + "currently doesn't allow to 
rename column which is "
-                                                + "referred by watermark 
expression.",
-                                        oldColumnName, 
watermarkSpec.asSummaryString()));
+        // generate new schema
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (column.getName().equals(originColumnName)) {
+                        buildNewColumnFromOriginColumn(builder, column, 
newColumnName);
+                    } else {
+                        builder.fromColumns(Collections.singletonList(column));
                     }
                 });
+        buildUpdatedPrimaryKey(
+                schemaBuilder,
+                originTable,
+                (pk) -> pk.equals(originColumnName) ? newColumnName : pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
 
-        Schema.Builder builder = Schema.newBuilder();
-        // build column
-        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
-        originSchema
-                .getColumns()
+    public Schema applySchemaChange(
+            SqlAlterTableDropColumn dropColumn, ContextResolvedTable 
originTable) {
+        List<String> tableColumns =
+                originTable.getResolvedSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        Set<String> primaryKeys = new HashSet<>();
+        originTable
+                .getResolvedSchema()
+                .getPrimaryKey()
+                .ifPresent(pk -> primaryKeys.addAll(pk.getColumns()));
+        Set<String> columnsToDrop = new HashSet<>();
+        dropColumn
+                .getColumnList()
                 .forEach(
-                        column -> {
-                            if (oldColumnName.equals(column.getName())) {
-                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
-                            } else {
-                                
builder.fromColumns(Collections.singletonList(column));
+                        identifier -> {
+                            String name = getColumnName((SqlIdentifier) 
identifier);
+                            if (!columnsToDrop.add(name)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "%sDuplicate column `%s`.", 
EX_MSG_PREFIX, name));
                             }
                         });
-        // build primary key
-        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = 
originSchema.getPrimaryKey();
-        if (originPrimaryKey.isPresent()) {
-            List<String> originPrimaryKeyNames = 
originPrimaryKey.get().getColumnNames();
-            String constrainName = originPrimaryKey.get().getConstraintName();
-            List<String> newPrimaryKeyNames =
-                    originPrimaryKeyNames.stream()
-                            .map(pkName -> pkName.equals(oldColumnName) ? 
newColumnName : pkName)
-                            .collect(Collectors.toList());
-            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
-        }
-
-        // build watermark
-        originSchema
-                .getWatermarkSpecs()
-                .forEach(
-                        watermarkSpec ->
-                                builder.watermark(
-                                        watermarkSpec.getColumnName(),
-                                        
watermarkSpec.getWatermarkExpression()));
 
-        // generate new schema
-        return builder.build();
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        for (SqlNode columnIdentifier : dropColumn.getColumnList()) {
+            String columnToDrop = getColumnName((SqlIdentifier) 
columnIdentifier);
+            // validate the column to drop exists in the table schema, is not 
a primary key and
+            // does not derive any computed column
+            validateColumnName(
+                    columnToDrop,
+                    tableColumns,
+                    originTable.getResolvedSchema(),
+                    ((CatalogTable) 
originTable.getResolvedTable()).getPartitionKeys(),
+                    primaryKeys,
+                    columnsToDrop);
+            validateWatermark(originTable, columnToDrop, tableColumns);
+        }
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> {
+                    if (!columnsToDrop.contains(column.getName())) {
+                        builder.fromColumns(Collections.singletonList(column));
+                    }
+                });
+        buildUpdatedPrimaryKey(schemaBuilder, originTable, (pk) -> pk);
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    public Schema applySchemaChange(
+            SqlAlterTableDropConstraint dropConstraint, ContextResolvedTable 
originTable) {
+        Optional<UniqueConstraint> pkConstraint = 
originTable.getResolvedSchema().getPrimaryKey();
+        if (!pkConstraint.isPresent()) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define any primary 
key.", EX_MSG_PREFIX));
+        }
+        SqlIdentifier constraintIdentifier = 
dropConstraint.getConstraintName();
+        String constraintName = pkConstraint.get().getName();
+        if (constraintIdentifier != null
+                && !constraintIdentifier.getSimple().equals(constraintName)) {
+            throw new ValidationException(
+                    String.format(
+                            "%sThe base table does not define a primary key 
constraint named '%s'. "
+                                    + "Available constraint name: ['%s'].",
+                            EX_MSG_PREFIX, constraintIdentifier.getSimple(), 
constraintName));
+        }
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        buildUpdatedColumn(
+                schemaBuilder,
+                originTable,
+                (builder, column) -> 
builder.fromColumns(Collections.singletonList(column)));
+        buildUpdatedWatermark(schemaBuilder, originTable);
+        return schemaBuilder.build();
+    }
+
+    public Schema applySchemaChange(ContextResolvedTable originTable) {

Review Comment:
   > It's hard for other to understand what's the usage of this method from the 
signature. How about
   > 
   > ```
   > applySchemaChange(SqlAlterWatermark, ContextResolvedTable)
   > ```
   
   I agree that the method is not straightforward, but `SqlAlterWatermark` is 
not used. How about adding the method description?
   ```java
   /** Convert ALTER TABLE DROP WATERMARK to generate an updated {@link 
Schema}. */
   applySchemaChange(ContextResolvedTable)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to