LadyForest commented on code in PR #19329:
URL: https://github.com/apache/flink/pull/19329#discussion_r1058243819


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -1258,6 +1259,94 @@ public void testAlterTable() throws Exception {
                 .hasMessageContaining("ALTER TABLE RESET does not support 
empty key");
     }
 
+    @Test
+    public void testAlterTableRenameColumn() throws Exception {
+        prepareTable("tb1", false, false, true, 3);
+        // rename pk column c
+        Operation operation = parse("alter table tb1 rename c to c1");
+        assert operation instanceof AlterTableSchemaOperation;

Review Comment:
   `assert` can be turned off.
   ```suggestion
           assertThat(operation).isInstanceOf(AlterTableSchemaOperation.class);
   ```



##########
flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java:
##########
@@ -330,6 +330,10 @@ void testAlterTable() {
         final String sql3 = "alter table t1 drop constraint ct1";
         final String expected3 = "ALTER TABLE `T1` DROP CONSTRAINT `CT1`";
         sql(sql3).ok(expected3);
+
+        final String sql4 = "alter table t1 rename a to b";
+        final String expected4 = "ALTER TABLE `T1` RENAME `A` TO `B`";
+        sql(sql4).ok(expected4);

Review Comment:
   Add a case to cover the compound identifier.
   ```java
   sql("alter table t1 rename a.x to a.y").ok("ALTER TABLE `T1` RENAME `A`.`X` 
TO `A`.`Y`");
   ```
   



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -99,6 +107,82 @@ public Schema applySchemaChange(SqlAlterTableSchema 
alterTableSchema, Schema ori
         return converter.convert();
     }
 
+    public Schema applySchemaChange(
+            SqlAlterTableRenameColumn renameColumn, ContextResolvedTable 
originalTable) {
+        String oldColumnName = 
getColumnName(renameColumn.getOriginColumnIdentifier());
+        String newColumnName = 
getColumnName(renameColumn.getNewColumnIdentifier());
+        List<String> tableColumns =
+                originalTable.getResolvedSchema().getColumns().stream()
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        // validate old column is exists or new column isn't duplicated or old 
column isn't
+        // referenced by computed column
+        validateColumnName(
+                oldColumnName,
+                newColumnName,
+                tableColumns,
+                originalTable.getResolvedSchema(),
+                ((CatalogTable) 
originalTable.getResolvedTable()).getPartitionKeys());
+
+        // validate old column isn't referenced by watermark
+        List<WatermarkSpec> watermarkSpecs = 
originalTable.getResolvedSchema().getWatermarkSpecs();
+        watermarkSpecs.forEach(
+                watermarkSpec -> {
+                    String rowtimeAttribute = 
watermarkSpec.getRowtimeAttribute();
+                    Set<String> referencedColumns =
+                            ColumnReferenceFinder.findReferencedColumn(
+                                    watermarkSpec.getWatermarkExpression(), 
tableColumns);
+                    if (oldColumnName.equals(rowtimeAttribute)
+                            || referencedColumns.contains(oldColumnName)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "Old column %s is referred by 
watermark expression %s, "
+                                                + "currently doesn't allow to 
rename column which is "
+                                                + "referred by watermark 
expression.",
+                                        oldColumnName, 
watermarkSpec.asSummaryString()));
+                    }
+                });
+
+        Schema.Builder builder = Schema.newBuilder();
+        // build column
+        Schema originSchema = originalTable.getTable().getUnresolvedSchema();
+        originSchema
+                .getColumns()
+                .forEach(
+                        column -> {
+                            if (oldColumnName.equals(column.getName())) {
+                                buildNewColumnFromOriginColumn(builder, 
column, newColumnName);

Review Comment:
   It seems the column comment is omitted.
   ```suggestion
                                   buildNewColumnFromOriginColumn(builder, 
column, newColumnName);
                                   builder.withComment(column.getName());
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java:
##########
@@ -2670,7 +2776,7 @@ private void prepareTable(
                 Schema.newBuilder()
                         .column("a", DataTypes.INT().notNull())
                         .column("b", DataTypes.BIGINT().notNull())
-                        .column("c", DataTypes.STRING())
+                        .column("c", DataTypes.STRING().notNull())

Review Comment:
   Why change `c`'s nullability?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/AlterSchemaConverter.java:
##########
@@ -417,6 +495,84 @@ void checkColumnExists(String columnName) {
         }
     }
 
+    // 
--------------------------------------------------------------------------------------------
+
+    private void validateColumnName(
+            String originColumnName,
+            String newColumnName,
+            List<String> tableColumns,
+            ResolvedSchema originResolvedSchema,
+            List<String> partitionKeys) {
+        // validate old column
+        if (!tableColumns.contains(originColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "Old column %s not found in table schema for 
RENAME COLUMN",
+                            originColumnName));
+        }
+
+        // validate new column
+        if (tableColumns.contains(newColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "New column %s already existed in table schema for 
RENAME COLUMN",
+                            newColumnName));
+        }
+
+        // validate old column name isn't referred by computed column case
+        originResolvedSchema.getColumns().stream()
+                .filter(column -> column instanceof Column.ComputedColumn)
+                .forEach(
+                        column -> {
+                            Column.ComputedColumn computedColumn = 
(Column.ComputedColumn) column;
+                            Set<String> referencedColumn =
+                                    ColumnReferenceFinder.findReferencedColumn(
+                                            computedColumn.getExpression(), 
tableColumns);
+                            if (referencedColumn.contains(originColumnName)) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Old column %s is referred by 
computed column %s, currently doesn't "
+                                                        + "allow to rename 
column which is referred by computed column.",
+                                                originColumnName,
+                                                
computedColumn.asSummaryString()));
+                            }
+                        });
+        // validate partition keys doesn't contain the old column
+        if (partitionKeys.contains(originColumnName)) {
+            throw new ValidationException(
+                    String.format(
+                            "Can not rename column %s because it is used as 
the partition keys.",
+                            originColumnName));
+        }
+    }
+
+    private void buildNewColumnFromOriginColumn(
+            Schema.Builder builder, Schema.UnresolvedColumn originColumn, 
String columnName) {
+        if (originColumn instanceof Schema.UnresolvedComputedColumn) {
+            builder.columnByExpression(
+                    columnName, ((Schema.UnresolvedComputedColumn) 
originColumn).getExpression());
+        } else if (originColumn instanceof Schema.UnresolvedPhysicalColumn) {
+            builder.column(
+                    columnName, ((Schema.UnresolvedPhysicalColumn) 
originColumn).getDataType());
+        } else if (originColumn instanceof Schema.UnresolvedMetadataColumn) {
+            Schema.UnresolvedMetadataColumn metadataColumn =
+                    (Schema.UnresolvedMetadataColumn) originColumn;
+            builder.columnByMetadata(
+                    columnName,
+                    metadataColumn.getDataType(),
+                    metadataColumn.getMetadataKey(),
+                    metadataColumn.isVirtual());
+        }
+    }
+
+    private static String getColumnName(SqlIdentifier identifier) {
+        if (!identifier.isSimple()) {
+            throw new UnsupportedOperationException(
+                    String.format("Alter nested row type %s is not supported 
yet.", identifier));

Review Comment:
   Nit: add `EX_MSG_PREFIX` to align exception message type?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to