twalthr commented on code in PR #24886:
URL: https://github.com/apache/flink/pull/24886#discussion_r1636474722


##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDistribution.java:
##########
@@ -70,17 +70,28 @@ public List<SqlNode> getOperandList() {
 
     @Override
     public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
-        writer.newlineAndIndent();
+        unparse(writer, leftPrec, rightPrec, "DISTRIBUTED", true);
+    }
+
+    public void unparseAlter(SqlWriter writer, int leftPrec, int rightPrec) {
+        unparse(writer, leftPrec, rightPrec, "DISTRIBUTION", false);
+    }
+
+    public void unparse(

Review Comment:
   can be private



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -2208,6 +2241,59 @@ public void testAlterTableModifyPk() throws Exception {
                         .build());
     }
 
+    @Test
+    public void testAlterTableAddDistribution() throws Exception {
+        prepareNonManagedTable("tb1", false);
+
+        Operation operation = parse("alter table tb1 add distribution by 
hash(a) into 12 buckets");
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
+        assertAlterTableDistribution(
+                operation,
+                tableIdentifier,
+                TableDistribution.ofHash(Collections.singletonList("a"), 12),
+                "ALTER TABLE cat1.db1.tb1\n" + "  ADD DISTRIBUTED BY HASH(`a`) 
INTO 12 BUCKETS\n");
+    }
+
+    @Test
+    public void testFailedToAlterTableAddDistribution() throws Exception {
+        prepareNonManagedTableWithDistribution("tb1");
+
+        // modify watermark on a table without watermark

Review Comment:
   incorrect comment



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -2208,6 +2241,59 @@ public void testAlterTableModifyPk() throws Exception {
                         .build());
     }
 
+    @Test
+    public void testAlterTableAddDistribution() throws Exception {
+        prepareNonManagedTable("tb1", false);
+
+        Operation operation = parse("alter table tb1 add distribution by 
hash(a) into 12 buckets");
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("cat1", "db1", 
"tb1");
+        assertAlterTableDistribution(
+                operation,
+                tableIdentifier,
+                TableDistribution.ofHash(Collections.singletonList("a"), 12),
+                "ALTER TABLE cat1.db1.tb1\n" + "  ADD DISTRIBUTED BY HASH(`a`) 
INTO 12 BUCKETS\n");
+    }
+
+    @Test
+    public void testFailedToAlterTableAddDistribution() throws Exception {
+        prepareNonManagedTableWithDistribution("tb1");
+
+        // modify watermark on a table without watermark
+        assertThatThrownBy(
+                        () -> parse("alter table tb1 add distribution by 
hash(a) into 12 buckets"))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining("You can modify it or drop it before 
adding a new one.");
+    }
+
+    @Test
+    public void testFailedToAlterTableModifyDistribution() throws Exception {
+        prepareNonManagedTable("tb2", false);
+
+        // modify watermark on a table without watermark

Review Comment:
   incorrect comment



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -1361,16 +1374,36 @@ public void testAlterTableDropConstraint() throws 
Exception {
                                 .getUnresolvedSchema()
                                 .getPrimaryKey())
                 .isNotPresent();
+    }
+
+    @Test
+    public void testAlterTableDropDistribution() throws Exception {
+        prepareNonManagedTableWithDistribution("tb1");
+        String expectedSummaryString = "ALTER TABLE cat1.db1.tb1\n  DROP 
DISTRIBUTION";
 
-        operation = parse("alter table tb1 drop primary key");
+        Operation operation = parse("alter table tb1 drop distribution");
         assertThat(operation).isInstanceOf(AlterTableChangeOperation.class);
         
assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString);
-        assertThat(
-                        ((AlterTableChangeOperation) operation)
-                                .getNewTable()
-                                .getUnresolvedSchema()
-                                .getPrimaryKey())
+        assertThat(((AlterTableChangeOperation) 
operation).getNewTable().getDistribution())
                 .isNotPresent();
+
+        prepareNonManagedTableWithDistribution("tb3");
+        // rename column used as distribution key

Review Comment:
   comment seems to be wrong, also isn't this tested above already, at least 
`is used as a distribution key` occurs two times in this clas



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java:
##########
@@ -2485,13 +2571,31 @@ private void prepareManagedTable(boolean hasPartition) 
throws Exception {
         prepareTable("tb1", true, hasPartition, false, 0);
     }
 
+    private void prepareNonManagedTableWithDistribution(String tableName) 
throws Exception {
+        TableDistribution distribution =
+                TableDistribution.of(
+                        TableDistribution.Kind.HASH, 6, 
Collections.singletonList("c"));
+        prepareTable(tableName, false, false, false, 0, distribution);
+    }
+
     private void prepareTable(
             String tableName,
             boolean managedTable,
             boolean hasPartition,
             boolean hasWatermark,
             int numOfPkFields)
             throws Exception {
+        prepareTable(tableName, managedTable, hasPartition, hasWatermark, 
numOfPkFields, null);
+    }
+
+    private void prepareTable(
+            String tableName,
+            boolean managedTable,
+            boolean hasPartition,
+            boolean hasWatermark,
+            int numOfPkFields,
+            TableDistribution tableDistribution)

Review Comment:
   add `@Nullable` if this arg can be null



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to