This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e6ac376f362 [FLINK-39279][table] Make `CREATE OR ALTER MATERIALIZED
TABLE` respect distribution
e6ac376f362 is described below
commit e6ac376f362d37015585f5d7f00b4c754c28d071
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 20 21:03:14 2026 +0100
[FLINK-39279][table] Make `CREATE OR ALTER MATERIALIZED TABLE` respect
distribution
---
...SqlCreateOrAlterMaterializedTableConverter.java | 14 ++++++
...erializedTableNodeToOperationConverterTest.java | 52 +++++++++++++++++++++-
2 files changed, 64 insertions(+), 2 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index a9eba27c5b2..554e89f2ffc 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
@@ -156,6 +157,19 @@ public class SqlCreateOrAlterMaterializedTableConverter
throw new ValidationException("Changing of REFRESH MODE is
unsupported");
}
+ final TableDistribution oldDistribution =
oldTable.getDistribution().orElse(null);
+ final TableDistribution newDistribution =
+ mergeContext.getMergedTableDistribution().orElse(null);
+ if (!Objects.equals(oldDistribution, newDistribution)) {
+ if (oldDistribution == null) {
+ changes.add(TableChange.add(newDistribution));
+ } else if (newDistribution == null) {
+ changes.add(TableChange.dropDistribution());
+ } else {
+ changes.add(TableChange.modify(newDistribution));
+ }
+ }
+
return changes;
};
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 00246e79719..6f246c2fbb1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -37,6 +37,8 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.TableDistribution.Kind;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -732,6 +734,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+ ")\n"
+ "COMMENT 'materialized table comment'\n"
+ + "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n"
+ "PARTITIONED BY (a, d)\n"
+ "WITH (\n"
+ " 'format' = 'json2'\n"
@@ -753,7 +756,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
"SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`,
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+ "FROM `builtin`.`default`.`t3` AS
`t3`"),
TableChange.set("format", "json2"),
- TableChange.reset("connector"));
+ TableChange.reset("connector"),
+ TableChange.add(TableDistribution.of(Kind.HASH, 7,
List.of("b"))));
assertThat(operation.asSummaryString())
.isEqualTo(
"CREATE OR ALTER MATERIALIZED TABLE
builtin.default.base_mtbl\n"
@@ -762,7 +766,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ " MODIFY DEFINITION QUERY TO 'SELECT
`t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING)
AS `f`\n"
+ "FROM `builtin`.`default`.`t3` AS `t3`',\n"
+ " SET 'format' = 'json2',\n"
- + " RESET 'connector'");
+ + " RESET 'connector',\n"
+ + " ADD DISTRIBUTED BY HASH(`b`) INTO 7
BUCKETS");
// new table only difference schema & definition query with old table.
CatalogMaterializedTable oldTable =
@@ -800,6 +805,49 @@ class SqlMaterializedTableNodeToOperationConverterTest
new UnresolvedPhysicalColumn("f",
DataTypes.VARCHAR(Integer.MAX_VALUE)));
}
+ @Test
+ void testCreateOrAlterMaterializedTableWithDistributionForExistingTable() {
+ final String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_metadata
(\n"
+ + " t AS current_timestamp,"
+ + " m STRING METADATA VIRTUAL,"
+ + " m_p STRING METADATA,"
+ + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED,"
+ + " WATERMARK FOR t as current_timestamp - INTERVAL
'5' SECOND"
+ + ")\n"
+ + "COMMENT 'materialized table comment'\n"
+ + "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem', \n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT t1.* FROM t1";
+ Operation operation = parse(sql);
+
+
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+ FullAlterMaterializedTableOperation op =
(FullAlterMaterializedTableOperation) operation;
+ assertThat(op.getTableChanges())
+ .containsExactly(
+ TableChange.modifyDefinitionQuery(
+ "SELECT `t1`.*\n" + "FROM `t1`",
+ "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`\n"
+ + "FROM `builtin`.`default`.`t1` AS
`t1`"),
+ TableChange.set("connector", "filesystem"),
+ TableChange.set("format", "json"),
+ TableChange.modify(TableDistribution.of(Kind.HASH, 5,
List.of("a"))));
+ assertThat(operation.asSummaryString())
+ .isEqualTo(
+ "CREATE OR ALTER MATERIALIZED TABLE
builtin.default.base_mtbl_with_metadata\n"
+ + " MODIFY DEFINITION QUERY TO 'SELECT
`t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+ + "FROM `builtin`.`default`.`t1` AS `t1`',\n"
+ + " SET 'connector' = 'filesystem',\n"
+ + " SET 'format' = 'json',\n"
+ + " MODIFY DISTRIBUTED BY HASH(`a`) INTO 5
BUCKETS");
+ }
+
private static Collection<TestSpec>
testDataForCreateAlterMaterializedTableFailedCase() {
final Collection<TestSpec> list = new ArrayList<>();
list.addAll(createWithInvalidSchema());