lsyldliu commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r848005643


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java:
##########
@@ -144,6 +147,150 @@ public static Operation convertChangeColumn(
         // TODO: handle watermark and constraints
     }
 
+    public static Operation convertRenameColumn(
+            ObjectIdentifier tableIdentifier,
+            String originColumnName,
+            String newColumnName,
+            CatalogTable catalogTable,
+            ResolvedSchema originResolveSchema) {
+        Schema originSchema = catalogTable.getUnresolvedSchema();
+        List<String> tableColumns =
+                originSchema.getColumns().stream()
+                        .map(Schema.UnresolvedColumn::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column is duplicated or old 
column is
+        // referenced by computed column
+        validateColumnName(originColumnName, newColumnName, tableColumns, 
originResolveSchema);
+
+        // validate old column is referenced by watermark
+        List<org.apache.flink.table.catalog.WatermarkSpec> watermarkSpecs =
+                originResolveSchema.getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = 
watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), 
tableColumns);
+                    if (originColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(originColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referenced by 
watermark expression %s, "
+                                                + "currently doesn't allow to 
rename column which is "
+                                                + "referenced by watermark 
expression.",
+                                        originColumnName, 
watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        originSchema
+                .getColumns()
+                .forEach(
+                        column -> {
+                            if (originColumnName.equals(column.getName())) {
+                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
+                            } else {
+                                buildNewColumnFromOriginColumn(builder, 
column, column.getName());
+                            }
+                        });
+        // build primary key
+        Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = 
originSchema.getPrimaryKey();
+        if (originPrimaryKey.isPresent()) {
+            List<String> originPrimaryKeyNames = 
originPrimaryKey.get().getColumnNames();
+            String constrainName = originPrimaryKey.get().getConstraintName();
+            List<String> newPrimaryKeyNames =
+                    originPrimaryKeyNames.stream()
+                            .map(pkName -> pkName.equals(originColumnName) ? 
newColumnName : pkName)
+                            .collect(Collectors.toList());
+            builder.primaryKeyNamed(constrainName, newPrimaryKeyNames);
+        }
+
+        // build watermark
+        originSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        watermarkSpec ->
+                                builder.watermark(
+                                        watermarkSpec.getColumnName(),
+                                        
watermarkSpec.getWatermarkExpression()));
+
+        // build partition key
+        List<String> newPartitionKeys =
+                catalogTable.getPartitionKeys().stream()
+                        .map(name -> name.equals(originColumnName) ? 
newColumnName : name)
+                        .collect(Collectors.toList());
+
+        // generate new schema
+        return new AlterTableSchemaOperation(
+                tableIdentifier,
+                CatalogTable.of(
+                        builder.build(),
+                        catalogTable.getComment(),
+                        newPartitionKeys,
+                        catalogTable.getOptions()));
+    }
+
+    private static void validateColumnName(
+            String originColumnName,
+            String newColumnName,
+            List<String> tableColumns,
+            ResolvedSchema originResolvedSchema) {
+        // validate old column
+        if (!tableColumns.contains(originColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "Old column %s not found in table schema for 
RENAME COLUMN",
+                            originColumnName));
+        }
+
+        // validate new column
+        if (tableColumns.contains(newColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "New column %s already existed in table schema for 
RENAME COLUMN",
+                            newColumnName));
+        }
+
+        // validate old column name is referenced by computed column case
+        originResolvedSchema.getColumns().stream()
+                .filter(column -> column instanceof Column.ComputedColumn)
+                .forEach(
+                        column -> {
+                            Column.ComputedColumn computedColumn = 
(Column.ComputedColumn) column;
+                            Set<String> referencedColumn =
+                                    ColumnReferenceFinder.findReferencedColumn(
+                                            computedColumn.getExpression(), 
tableColumns);
+                            if (referencedColumn.contains(originColumnName)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Old column %s is referenced 
by computed column %s, currently doesn't "
+                                                        + "allow to rename 
column which is referenced by computed column.",
+                                                originColumnName,
+                                                
computedColumn.asSummaryString()));
+                            }
+                        });
+    }
+
+    private static void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, 
String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) 
originColumn).getExpression());
+        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(
+                    columnName, ((Schema.UnresolvedPhysicalColumn) 
originColumn).getDataType());
+        } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+            Schema.UnresolvedMetadataColumn metadataColumn =
+                    (Schema.UnresolvedMetadataColumn) originColumn;
+            builder.columnByMetadata(
+                    columnName,
+                    metadataColumn.getDataType(),
+                    metadataColumn.getMetadataKey(),
+                    metadataColumn.isVirtual());
+        }

Review Comment:
   Your suggestion is good, but I think here is not needed to validate it 
because of the the exception will be throw in create table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to