This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e6ac376f362 [FLINK-39279][table] Make `CREATE OR ALTER MATERIALIZED 
TABLE` respect distribution
e6ac376f362 is described below

commit e6ac376f362d37015585f5d7f00b4c754c28d071
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 20 21:03:14 2026 +0100

    [FLINK-39279][table] Make `CREATE OR ALTER MATERIALIZED TABLE` respect 
distribution
---
 ...SqlCreateOrAlterMaterializedTableConverter.java | 14 ++++++
 ...erializedTableNodeToOperationConverterTest.java | 52 +++++++++++++++++++++-
 2 files changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index a9eba27c5b2..554e89f2ffc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
@@ -156,6 +157,19 @@ public class SqlCreateOrAlterMaterializedTableConverter
                 throw new ValidationException("Changing of REFRESH MODE is 
unsupported");
             }
 
+            final TableDistribution oldDistribution = 
oldTable.getDistribution().orElse(null);
+            final TableDistribution newDistribution =
+                    mergeContext.getMergedTableDistribution().orElse(null);
+            if (!Objects.equals(oldDistribution, newDistribution)) {
+                if (oldDistribution == null) {
+                    changes.add(TableChange.add(newDistribution));
+                } else if (newDistribution == null) {
+                    changes.add(TableChange.dropDistribution());
+                } else {
+                    changes.add(TableChange.modify(newDistribution));
+                }
+            }
+
             return changes;
         };
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 00246e79719..6f246c2fbb1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -37,6 +37,8 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.TableDistribution.Kind;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -732,6 +734,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
                         + ")\n"
                         + "COMMENT 'materialized table comment'\n"
+                        + "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n"
                         + "PARTITIONED BY (a, d)\n"
                         + "WITH (\n"
                         + "  'format' = 'json2'\n"
@@ -753,7 +756,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
                                         + "FROM `builtin`.`default`.`t3` AS 
`t3`"),
                         TableChange.set("format", "json2"),
-                        TableChange.reset("connector"));
+                        TableChange.reset("connector"),
+                        TableChange.add(TableDistribution.of(Kind.HASH, 7, 
List.of("b"))));
         assertThat(operation.asSummaryString())
                 .isEqualTo(
                         "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.base_mtbl\n"
@@ -762,7 +766,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 + " MODIFY DEFINITION QUERY TO 'SELECT 
`t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) 
AS `f`\n"
                                 + "FROM `builtin`.`default`.`t3` AS `t3`',\n"
                                 + "  SET 'format' = 'json2',\n"
-                                + "  RESET 'connector'");
+                                + "  RESET 'connector',\n"
+                                + "  ADD DISTRIBUTED BY HASH(`b`) INTO 7 
BUCKETS");
 
         // new table only difference schema & definition query with old table.
         CatalogMaterializedTable oldTable =
@@ -800,6 +805,49 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         new UnresolvedPhysicalColumn("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE)));
     }
 
+    @Test
+    void testCreateOrAlterMaterializedTableWithDistributionForExistingTable() {
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_metadata 
(\n"
+                        + "   t AS current_timestamp,"
+                        + "   m STRING METADATA VIRTUAL,"
+                        + "   m_p STRING METADATA,"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED,"
+                        + "   WATERMARK FOR t as current_timestamp - INTERVAL 
'5' SECOND"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT t1.* FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges())
+                .containsExactly(
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `t1`.*\n" + "FROM `t1`",
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
+                        TableChange.set("connector", "filesystem"),
+                        TableChange.set("format", "json"),
+                        TableChange.modify(TableDistribution.of(Kind.HASH, 5, 
List.of("a"))));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.base_mtbl_with_metadata\n"
+                                + " MODIFY DEFINITION QUERY TO 'SELECT 
`t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+                                + "FROM `builtin`.`default`.`t1` AS `t1`',\n"
+                                + "  SET 'connector' = 'filesystem',\n"
+                                + "  SET 'format' = 'json',\n"
+                                + "  MODIFY DISTRIBUTED BY HASH(`a`) INTO 5 
BUCKETS");
+    }
+
     private static Collection<TestSpec> 
testDataForCreateAlterMaterializedTableFailedCase() {
         final Collection<TestSpec> list = new ArrayList<>();
         list.addAll(createWithInvalidSchema());

Reply via email to