This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bbde1e2d68e3227d2f17969b36559e0e50620404
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 24 10:57:39 2026 +0100

    [FLINK-39284][table] Make `CREATE OR ALTER MATERIALIZED TABLE` respect 
constraint changes
---
 ...SqlCreateOrAlterMaterializedTableConverter.java | 24 +++++-
 ...reateOrAlterMaterializedTableConverterTest.java | 95 ++++++++++++++++++++--
 2 files changed, 108 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 554e89f2ffc..16cb77b0ffd 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaResolver;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.operations.Operation;
 import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
 import 
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
@@ -126,13 +127,25 @@ public class SqlCreateOrAlterMaterializedTableConverter
             final List<TableChange> changes = new ArrayList<>();
 
             final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+            final ResolvedSchema newSchema = 
schemaResolver.resolve(mergeContext.getMergedSchema());
             final List<Column> newColumns =
                     MaterializedTableUtils.validateAndExtractNewColumns(
-                            oldSchema,
-                            
schemaResolver.resolve(mergeContext.getMergedSchema()),
-                            mergeContext.hasSchemaDefinition());
+                            oldSchema, newSchema, 
mergeContext.hasSchemaDefinition());
 
             newColumns.forEach(column -> changes.add(TableChange.add(column)));
+
+            final UniqueConstraint oldConstraint = 
oldSchema.getPrimaryKey().orElse(null);
+            final UniqueConstraint newConstraint = 
newSchema.getPrimaryKey().orElse(null);
+            if (!Objects.equals(oldConstraint, newConstraint)) {
+                if (newConstraint == null) {
+                    
changes.add(TableChange.dropConstraint(oldConstraint.getName()));
+                } else if (oldConstraint == null) {
+                    changes.add(TableChange.add(newConstraint));
+                } else {
+                    changes.add(TableChange.modify(newConstraint));
+                }
+            }
+
             changes.add(
                     TableChange.modifyDefinitionQuery(
                             mergeContext.getMergedOriginalQuery(),
@@ -142,7 +155,10 @@ public class SqlCreateOrAlterMaterializedTableConverter
             final Map<String, String> newOptions = 
mergeContext.getMergedTableOptions();
 
             for (Map.Entry<String, String> newOptionEntry : 
newOptions.entrySet()) {
-                changes.add(TableChange.set(newOptionEntry.getKey(), 
newOptionEntry.getValue()));
+                if 
(!newOptionEntry.getValue().equals(oldOptions.get(newOptionEntry.getKey()))) {
+                    changes.add(
+                            TableChange.set(newOptionEntry.getKey(), 
newOptionEntry.getValue()));
+                }
             }
 
             for (Map.Entry<String, String> oldOptionEntry : 
oldOptions.entrySet()) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
index 2d4940369f1..b9e569cd235 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
@@ -46,7 +47,7 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-public class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
+class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
         extends SqlNodeToOperationConversionTestBase {
     private static final String DEFAULT_MATERIALIZED_TABLE =
             "CREATE MATERIALIZED TABLE mt (\n"
@@ -82,6 +83,7 @@ public class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                         // If NOT NULL is defined in schema, it should stay
                         TableChange.add(Column.physical("a1", 
DataTypes.BIGINT().notNull())),
                         TableChange.add(Column.physical("f", DataTypes.INT())),
+                        TableChange.dropConstraint("ct1"),
                         TableChange.modifyDefinitionQuery(
                                 "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS 
`f`\nFROM `t1`",
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`a` AS `a1`, 3 AS `f`\n"
@@ -102,6 +104,7 @@ public class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                 .containsExactly(
                         // No explicit schema, so nullable will be used
                         TableChange.add(Column.physical("a1", 
DataTypes.BIGINT())),
+                        TableChange.dropConstraint("ct1"),
                         TableChange.modifyDefinitionQuery(
                                 "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM 
`t1`",
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`a` AS `a1`\n"
@@ -243,21 +246,99 @@ public class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
         FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
         assertThat(op.getTableChanges())
                 .containsExactly(
-                        TableChange.modifyDefinitionQuery(
-                                "SELECT `t1`.*\n" + "FROM `t1`",
-                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
-                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
                         TableChange.modify(
                                 TableDistribution.of(
                                         TableDistribution.Kind.HASH, 4, 
List.of("b"))));
         assertThat(operation.asSummaryString())
                 .isEqualTo(
                         "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt2\n"
-                                + " MODIFY DEFINITION QUERY TO 'SELECT 
`t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
-                                + "FROM `builtin`.`default`.`t1` AS `t1`',\n"
                                 + "  MODIFY DISTRIBUTED BY HASH(`b`) INTO 4 
BUCKETS");
     }
 
+    @Test
+    void testCreateOrAlterMaterializedTableWithDroppedConstraint() {
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt \n"
+                        + "COMMENT 'New materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        
assertThat(op.getTableChanges()).containsExactly(TableChange.dropConstraint("ct1"));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt\n"
+                                + "  DROP CONSTRAINT ct1");
+    }
+
+    @Test
+    void testCreateOrAlterMaterializedTableWithChangedConstraint() {
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt (\n"
+                        + "   CONSTRAINT new_constraint PRIMARY KEY(a) NOT 
ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'New materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges())
+                .containsExactly(
+                        TableChange.modify(
+                                UniqueConstraint.primaryKey("new_constraint", 
List.of("a"))));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt\n"
+                                + "  MODIFY CONSTRAINT `new_constraint` 
PRIMARY KEY (`a`) NOT ENFORCED");
+    }
+
+    @Test
+    void testCreateOrAlterMaterializedTableWithNewConstraint()
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        final String prepSql =
+                "CREATE MATERIALIZED TABLE mt1 (\n"
+                        + "   id INT NOT NULL, t TIMESTAMP_LTZ(3)\n"
+                        + ")\n"
+                        + "AS SELECT 1 as id, current_timestamp as t";
+        createMaterializedTableInCatalog(prepSql, "mt1");
+
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt1 (\n"
+                        + "   id INT NOT NULL, t TIMESTAMP_LTZ(3),\n"
+                        + "   CONSTRAINT new_constraint PRIMARY KEY(id) NOT 
ENFORCED"
+                        + ")\n"
+                        + "AS SELECT 1 as id, current_timestamp as t";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges())
+                .containsExactly(
+                        TableChange.add(
+                                UniqueConstraint.primaryKey("new_constraint", 
List.of("id"))));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt1\n"
+                                + "  ADD CONSTRAINT `new_constraint` PRIMARY 
KEY (`id`) NOT ENFORCED");
+    }
+
     private void createMaterializedTableInCatalog(String sql, String 
materializedTableName)
             throws TableAlreadyExistException, DatabaseNotExistException {
         final ObjectPath objectPath =

Reply via email to