This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bbde1e2d68e3227d2f17969b36559e0e50620404 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Tue Mar 24 10:57:39 2026 +0100 [FLINK-39284][table] Make `CREATE OR ALTER MATERIALIZED TABLE` respect constraint changes --- ...SqlCreateOrAlterMaterializedTableConverter.java | 24 +++++- ...reateOrAlterMaterializedTableConverterTest.java | 95 ++++++++++++++++++++-- 2 files changed, 108 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java index 554e89f2ffc..16cb77b0ffd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java @@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.SchemaResolver; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableDistribution; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; @@ -126,13 +127,25 @@ public class SqlCreateOrAlterMaterializedTableConverter final List<TableChange> changes = new ArrayList<>(); final ResolvedSchema oldSchema = oldTable.getResolvedSchema(); + final ResolvedSchema newSchema = schemaResolver.resolve(mergeContext.getMergedSchema()); final List<Column> newColumns = MaterializedTableUtils.validateAndExtractNewColumns( - oldSchema, - schemaResolver.resolve(mergeContext.getMergedSchema()), - mergeContext.hasSchemaDefinition()); + oldSchema, newSchema, mergeContext.hasSchemaDefinition()); newColumns.forEach(column -> changes.add(TableChange.add(column))); + + final UniqueConstraint oldConstraint = oldSchema.getPrimaryKey().orElse(null); + final UniqueConstraint newConstraint = newSchema.getPrimaryKey().orElse(null); + if (!Objects.equals(oldConstraint, newConstraint)) { + if (newConstraint == null) { + changes.add(TableChange.dropConstraint(oldConstraint.getName())); + } else if (oldConstraint == null) { + changes.add(TableChange.add(newConstraint)); + } else { + changes.add(TableChange.modify(newConstraint)); + } + } + changes.add( TableChange.modifyDefinitionQuery( mergeContext.getMergedOriginalQuery(), @@ -142,7 +155,10 @@ public class SqlCreateOrAlterMaterializedTableConverter final Map<String, String> newOptions = mergeContext.getMergedTableOptions(); for (Map.Entry<String, String> newOptionEntry : newOptions.entrySet()) { - changes.add(TableChange.set(newOptionEntry.getKey(), newOptionEntry.getValue())); + if (!newOptionEntry.getValue().equals(oldOptions.get(newOptionEntry.getKey()))) { + changes.add( + TableChange.set(newOptionEntry.getKey(), newOptionEntry.getValue())); + } } for (Map.Entry<String, String> oldOptionEntry : oldOptions.entrySet()) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java index 2d4940369f1..b9e569cd235 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableDistribution; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; import org.apache.flink.table.catalog.exceptions.TableNotExistException; @@ -46,7 +47,7 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -public class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest +class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest extends SqlNodeToOperationConversionTestBase { private static final String DEFAULT_MATERIALIZED_TABLE = "CREATE MATERIALIZED TABLE mt (\n" @@ -82,6 +83,7 @@ public class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest // If NOT NULL is defined in schema, it should stay TableChange.add(Column.physical("a1", DataTypes.BIGINT().notNull())), TableChange.add(Column.physical("f", DataTypes.INT())), + TableChange.dropConstraint("ct1"), TableChange.modifyDefinitionQuery( "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS `f`\nFROM `t1`", "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`a` AS `a1`, 3 AS `f`\n" @@ -102,6 +104,7 @@ public class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest .containsExactly( // No explicit schema, so nullable will be used TableChange.add(Column.physical("a1", DataTypes.BIGINT())), + TableChange.dropConstraint("ct1"), TableChange.modifyDefinitionQuery( "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM `t1`", "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`a` AS `a1`\n" @@ -243,21 +246,99 @@ public class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; assertThat(op.getTableChanges()) .containsExactly( - TableChange.modifyDefinitionQuery( - "SELECT `t1`.*\n" + "FROM `t1`", - "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" - + "FROM `builtin`.`default`.`t1` AS `t1`"), TableChange.modify( TableDistribution.of( TableDistribution.Kind.HASH, 4, List.of("b")))); assertThat(operation.asSummaryString()) .isEqualTo( "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt2\n" - + " MODIFY DEFINITION QUERY TO 'SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" - + "FROM `builtin`.`default`.`t1` AS `t1`',\n" + " MODIFY DISTRIBUTED BY HASH(`b`) INTO 4 BUCKETS"); } + @Test + void testCreateOrAlterMaterializedTableWithDroppedConstraint() { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt \n" + + "COMMENT 'New materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()).containsExactly(TableChange.dropConstraint("ct1")); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt\n" + + " DROP CONSTRAINT ct1"); + } + + @Test + void testCreateOrAlterMaterializedTableWithChangedConstraint() { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " CONSTRAINT new_constraint PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'New materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.modify( + UniqueConstraint.primaryKey("new_constraint", List.of("a")))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt\n" + + " MODIFY CONSTRAINT `new_constraint` PRIMARY KEY (`a`) NOT ENFORCED"); + } + + @Test + void testCreateOrAlterMaterializedTableWithNewConstraint() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt1 (\n" + + " id INT NOT NULL, t TIMESTAMP_LTZ(3)\n" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + createMaterializedTableInCatalog(prepSql, "mt1"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt1 (\n" + + " id INT NOT NULL, t TIMESTAMP_LTZ(3),\n" + + " CONSTRAINT new_constraint PRIMARY KEY(id) NOT ENFORCED" + + ")\n" + + "AS SELECT 1 as id, current_timestamp as t"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.add( + UniqueConstraint.primaryKey("new_constraint", List.of("id")))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt1\n" + + " ADD CONSTRAINT `new_constraint` PRIMARY KEY (`id`) NOT ENFORCED"); + } + private void createMaterializedTableInCatalog(String sql, String materializedTableName) throws TableAlreadyExistException, DatabaseNotExistException { final ObjectPath objectPath =
