This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dc6d1288fb1 [FLINK-39358][table] Constraint and watermark should stay 
while CoA MT if there is no explicit change with schema or constraint definition
dc6d1288fb1 is described below

commit dc6d1288fb1ae2493e8e697d6bbfe32c2ef82cac
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 31 11:19:34 2026 +0200

    [FLINK-39358][table] Constraint and watermark should stay while CoA MT if 
there is no explicit change with schema or constraint definition
---
 .../SqlCreateMaterializedTable.java                |  4 ++
 .../AbstractCreateMaterializedTableConverter.java  |  4 ++
 ...SqlCreateOrAlterMaterializedTableConverter.java | 47 +++++++++++++++++-----
 ...reateOrAlterMaterializedTableConverterTest.java | 31 +++++++++++---
 4 files changed, 69 insertions(+), 17 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
index 33bb12c06c2..a1d51cf6a93 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
@@ -118,6 +118,10 @@ public class SqlCreateMaterializedTable extends 
SqlCreateObject implements Exten
         return columnList;
     }
 
+    public List<SqlTableConstraint> getTableConstraints() {
+        return tableConstraints;
+    }
+
     public Optional<SqlWatermark> getWatermark() {
         return Optional.ofNullable(watermark);
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index 65076bc8503..9c4d1969601 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -59,6 +59,10 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
     protected interface MergeContext {
         boolean hasSchemaDefinition();
 
+        // A separate from schema definition method is required
+        // as current syntax allows to specify constraints only without the 
whole schema
+        boolean hasConstraintDefinition();
+
         Schema getMergedSchema();
 
         Map<String, String> getMergedTableOptions();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index ce005434122..198a747406b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -198,23 +198,40 @@ public class SqlCreateOrAlterMaterializedTableConverter
             final ResolvedCatalogMaterializedTable oldTable) {
         final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
         final ResolvedSchema newSchema = 
schemaResolver.resolve(mergeContext.getMergedSchema());
+        final boolean hasSchemaDefinition = mergeContext.hasSchemaDefinition();
         final List<TableChange> changes =
                 new ArrayList<>(
                         MaterializedTableUtils.validateAndExtractColumnChanges(
-                                oldSchema, newSchema, 
mergeContext.hasSchemaDefinition()));
+                                oldSchema, newSchema, hasSchemaDefinition));
 
+        getConstraintChange(oldSchema, newSchema, 
mergeContext.hasConstraintDefinition())
+                .ifPresent(changes::add);
+        getWatermarkChange(oldSchema, newSchema, 
hasSchemaDefinition).ifPresent(changes::add);
+        return changes;
+    }
+
+    private Optional<TableChange> getConstraintChange(
+            final ResolvedSchema oldSchema,
+            final ResolvedSchema newSchema,
+            boolean hasConstraintDefinition) {
         final UniqueConstraint oldConstraint = 
oldSchema.getPrimaryKey().orElse(null);
         final UniqueConstraint newConstraint = 
newSchema.getPrimaryKey().orElse(null);
-        if (!Objects.equals(oldConstraint, newConstraint)) {
+        if (hasConstraintDefinition && !Objects.equals(oldConstraint, 
newConstraint)) {
             if (newConstraint == null) {
-                
changes.add(TableChange.dropConstraint(oldConstraint.getName()));
+                return 
Optional.of(TableChange.dropConstraint(oldConstraint.getName()));
             } else if (oldConstraint == null) {
-                changes.add(TableChange.add(newConstraint));
+                return Optional.of(TableChange.add(newConstraint));
             } else {
-                changes.add(TableChange.modify(newConstraint));
+                return Optional.of(TableChange.modify(newConstraint));
             }
         }
+        return Optional.empty();
+    }
 
+    private Optional<TableChange> getWatermarkChange(
+            final ResolvedSchema oldSchema,
+            final ResolvedSchema newSchema,
+            boolean hasSchemaDefinition) {
         final WatermarkSpec oldWatermarkSpec =
                 oldSchema.getWatermarkSpecs().isEmpty()
                         ? null
@@ -223,17 +240,16 @@ public class SqlCreateOrAlterMaterializedTableConverter
                 newSchema.getWatermarkSpecs().isEmpty()
                         ? null
                         : newSchema.getWatermarkSpecs().get(0);
-        if (!Objects.equals(oldWatermarkSpec, newWatermarkSpec)) {
+        if (hasSchemaDefinition && !Objects.equals(oldWatermarkSpec, 
newWatermarkSpec)) {
             if (newWatermarkSpec == null) {
-                changes.add(TableChange.dropWatermark());
+                return Optional.of(TableChange.dropWatermark());
             } else if (oldWatermarkSpec == null) {
-                changes.add(TableChange.add(newWatermarkSpec));
+                return Optional.of(TableChange.add(newWatermarkSpec));
             } else {
-                changes.add(TableChange.modify(newWatermarkSpec));
+                return Optional.of(TableChange.modify(newWatermarkSpec));
             }
         }
-
-        return changes;
+        return Optional.empty();
     }
 
     @Override
@@ -261,6 +277,15 @@ public class SqlCreateOrAlterMaterializedTableConverter
                         && sqlNodeList.getList().get(0) instanceof 
SqlRegularColumn;
             }
 
+            @Override
+            public boolean hasConstraintDefinition() {
+                if 
(!sqlCreateMaterializedTable.getTableConstraints().isEmpty()) {
+                    return true;
+                }
+
+                return hasSchemaDefinition();
+            }
+
             @Override
             public Schema getMergedSchema() {
                 final Set<String> querySchemaColumnNames =
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
index 95e7d1714cb..9ba20168766 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -104,7 +104,6 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                 .containsExactly(
                         // No explicit schema, so nullable will be used
                         TableChange.add(Column.physical("a1", 
DataTypes.BIGINT())),
-                        TableChange.dropConstraint("ct1"),
                         TableChange.modifyDefinitionQuery(
                                 "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM 
`t1`",
                                 "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`a` AS `a1`\n"
@@ -283,7 +282,7 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
     @Test
     void testCreateOrAlterMaterializedTableWithDroppedConstraint() {
         final String sql =
-                "CREATE OR ALTER MATERIALIZED TABLE mt \n"
+                "CREATE OR ALTER MATERIALIZED TABLE mt (a BIGINT NOT NULL, b 
STRING, c INT, d STRING)\n"
                         + "COMMENT 'New materialized table comment'\n"
                         + "PARTITIONED BY (a, d)\n"
                         + "WITH (\n"
@@ -304,6 +303,28 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                                 + "  DROP CONSTRAINT ct1");
     }
 
+    @Test
+    void testCreateOrAlterMaterializedTableWithoutSchemaConstraintShouldStay() 
{
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt \n"
+                        + "COMMENT 'New materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges()).isEmpty();
+        assertThat(operation.asSummaryString())
+                .isEqualTo("CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt\n");
+    }
+
     @Test
     void testCreateOrAlterMaterializedTableWithChangedConstraint() {
         final String sql =
@@ -444,7 +465,7 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
     }
 
     @Test
-    void 
testCreateOrAlterMaterializedTableWithDroppedWatermarkByNoExplicitSchema()
+    void testCreateOrAlterMaterializedTableWithoutSchemaWatermarkShouldStay()
             throws TableAlreadyExistException, DatabaseNotExistException {
         final String prepSql =
                 "CREATE MATERIALIZED TABLE mt1 (\n"
@@ -462,9 +483,7 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
         
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
 
         assertThat(operation.asSummaryString())
-                .isEqualTo(
-                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt1\n"
-                                + "  DROP WATERMARK");
+                .isEqualTo("CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt1\n");
     }
 
     @Test

Reply via email to