lsyldliu commented on code in PR #19329: URL: https://github.com/apache/flink/pull/19329#discussion_r848005643
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java: ########## @@ -144,6 +147,150 @@ public static Operation convertChangeColumn( // TODO: handle watermark and constraints } + public static Operation convertRenameColumn( + ObjectIdentifier tableIdentifier, + String originColumnName, + String newColumnName, + CatalogTable catalogTable, + ResolvedSchema originResolveSchema) { + Schema originSchema = catalogTable.getUnresolvedSchema(); + List<String> tableColumns = + originSchema.getColumns().stream() + .map(Schema.UnresolvedColumn::getName) + .collect(Collectors.toList()); + // validate old column is exists or new column is duplicated or old column is + // referenced by computed column + validateColumnName(originColumnName, newColumnName, tableColumns, originResolveSchema); + + // validate old column is referenced by watermark + List<org.apache.flink.table.catalog.WatermarkSpec> watermarkSpecs = + originResolveSchema.getWatermarkSpecs(); + watermarkSpecs.forEach( + watermarkSpec -> { + String rowtimeAttribute = watermarkSpec.getRowtimeAttribute(); + Set<String> referencedColumns = + ColumnReferenceFinder.findReferencedColumn( + watermarkSpec.getWatermarkExpression(), tableColumns); + if (originColumnName.equals(rowtimeAttribute) + || referencedColumns.contains(originColumnName)) { + throw new ValidationException( + String.format( + "Old column %s is referenced by watermark expression %s, " + + "currently doesn't allow to rename column which is " + + "referenced by watermark expression.", + originColumnName, watermarkSpec.asSummaryString())); + } + }); + + Schema.Builder builder = Schema.newBuilder(); + // build column + originSchema + .getColumns() + .forEach( + column -> { + if (originColumnName.equals(column.getName())) { + buildNewColumnFromOriginColumn(builder, column, newColumnName); + } else { + buildNewColumnFromOriginColumn(builder, column, column.getName()); + } + }); + // build primary key + Optional<Schema.UnresolvedPrimaryKey> originPrimaryKey = originSchema.getPrimaryKey(); + if (originPrimaryKey.isPresent()) { + List<String> originPrimaryKeyNames = originPrimaryKey.get().getColumnNames(); + String constrainName = originPrimaryKey.get().getConstraintName(); + List<String> newPrimaryKeyNames = + originPrimaryKeyNames.stream() + .map(pkName -> pkName.equals(originColumnName) ? newColumnName : pkName) + .collect(Collectors.toList()); + builder.primaryKeyNamed(constrainName, newPrimaryKeyNames); + } + + // build watermark + originSchema + .getWatermarkSpecs() + .forEach( + watermarkSpec -> + builder.watermark( + watermarkSpec.getColumnName(), + watermarkSpec.getWatermarkExpression())); + + // build partition key + List<String> newPartitionKeys = + catalogTable.getPartitionKeys().stream() + .map(name -> name.equals(originColumnName) ? newColumnName : name) + .collect(Collectors.toList()); + + // generate new schema + return new AlterTableSchemaOperation( + tableIdentifier, + CatalogTable.of( + builder.build(), + catalogTable.getComment(), + newPartitionKeys, + catalogTable.getOptions())); + } + + private static void validateColumnName( + String originColumnName, + String newColumnName, + List<String> tableColumns, + ResolvedSchema originResolvedSchema) { + // validate old column + if (!tableColumns.contains(originColumnName)) { + throw new ValidationException( + String.format( + "Old column %s not found in table schema for RENAME COLUMN", + originColumnName)); + } + + // validate new column + if (tableColumns.contains(newColumnName)) { + throw new ValidationException( + String.format( + "New column %s already existed in table schema for RENAME COLUMN", + newColumnName)); + } + + // validate old column name is referenced by computed column case + originResolvedSchema.getColumns().stream() + .filter(column -> column instanceof Column.ComputedColumn) + .forEach( + column -> { + Column.ComputedColumn computedColumn = (Column.ComputedColumn) column; + Set<String> referencedColumn = + ColumnReferenceFinder.findReferencedColumn( + computedColumn.getExpression(), tableColumns); + if (referencedColumn.contains(originColumnName)) { + throw new ValidationException( + String.format( + "Old column %s is referenced by computed column %s, currently doesn't " + + "allow to rename column which is referenced by computed column.", + originColumnName, + computedColumn.asSummaryString())); + } + }); + } + + private static void buildNewColumnFromOriginColumn( + Schema.Builder builder, Schema.UnresolvedColumn originColumn, String columnName) { + if (originColumn instanceof Schema.UnresolvedComputedColumn) { + builder.columnByExpression( + columnName, ((Schema.UnresolvedComputedColumn) originColumn).getExpression()); + } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) { + builder.column( + columnName, ((Schema.UnresolvedPhysicalColumn) originColumn).getDataType()); + } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) { + Schema.UnresolvedMetadataColumn metadataColumn = + (Schema.UnresolvedMetadataColumn) originColumn; + builder.columnByMetadata( + columnName, + metadataColumn.getDataType(), + metadataColumn.getMetadataKey(), + metadataColumn.isVirtual()); + } Review Comment: Your suggestion is good, but I think here is not needed to validate it because of the the exception will be throw in create table -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org