This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dc6d1288fb1 [FLINK-39358][table] Constraint and watermark should stay
while CoA MT if there is no explicit change with schema or constraint definition
dc6d1288fb1 is described below
commit dc6d1288fb1ae2493e8e697d6bbfe32c2ef82cac
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 31 11:19:34 2026 +0200
[FLINK-39358][table] Constraint and watermark should stay while CoA MT if
there is no explicit change with schema or constraint definition
---
.../SqlCreateMaterializedTable.java | 4 ++
.../AbstractCreateMaterializedTableConverter.java | 4 ++
...SqlCreateOrAlterMaterializedTableConverter.java | 47 +++++++++++++++++-----
...reateOrAlterMaterializedTableConverterTest.java | 31 +++++++++++---
4 files changed, 69 insertions(+), 17 deletions(-)
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
index 33bb12c06c2..a1d51cf6a93 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/materializedtable/SqlCreateMaterializedTable.java
@@ -118,6 +118,10 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
return columnList;
}
+ public List<SqlTableConstraint> getTableConstraints() {
+ return tableConstraints;
+ }
+
public Optional<SqlWatermark> getWatermark() {
return Optional.ofNullable(watermark);
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index 65076bc8503..9c4d1969601 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -59,6 +59,10 @@ public abstract class
AbstractCreateMaterializedTableConverter<T extends SqlCrea
protected interface MergeContext {
boolean hasSchemaDefinition();
+ // A separate from schema definition method is required
+ // as current syntax allows to specify constraints only without the
whole schema
+ boolean hasConstraintDefinition();
+
Schema getMergedSchema();
Map<String, String> getMergedTableOptions();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index ce005434122..198a747406b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -198,23 +198,40 @@ public class SqlCreateOrAlterMaterializedTableConverter
final ResolvedCatalogMaterializedTable oldTable) {
final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
final ResolvedSchema newSchema =
schemaResolver.resolve(mergeContext.getMergedSchema());
+ final boolean hasSchemaDefinition = mergeContext.hasSchemaDefinition();
final List<TableChange> changes =
new ArrayList<>(
MaterializedTableUtils.validateAndExtractColumnChanges(
- oldSchema, newSchema,
mergeContext.hasSchemaDefinition()));
+ oldSchema, newSchema, hasSchemaDefinition));
+ getConstraintChange(oldSchema, newSchema,
mergeContext.hasConstraintDefinition())
+ .ifPresent(changes::add);
+ getWatermarkChange(oldSchema, newSchema,
hasSchemaDefinition).ifPresent(changes::add);
+ return changes;
+ }
+
+ private Optional<TableChange> getConstraintChange(
+ final ResolvedSchema oldSchema,
+ final ResolvedSchema newSchema,
+ boolean hasConstraintDefinition) {
final UniqueConstraint oldConstraint =
oldSchema.getPrimaryKey().orElse(null);
final UniqueConstraint newConstraint =
newSchema.getPrimaryKey().orElse(null);
- if (!Objects.equals(oldConstraint, newConstraint)) {
+ if (hasConstraintDefinition && !Objects.equals(oldConstraint,
newConstraint)) {
if (newConstraint == null) {
-
changes.add(TableChange.dropConstraint(oldConstraint.getName()));
+ return
Optional.of(TableChange.dropConstraint(oldConstraint.getName()));
} else if (oldConstraint == null) {
- changes.add(TableChange.add(newConstraint));
+ return Optional.of(TableChange.add(newConstraint));
} else {
- changes.add(TableChange.modify(newConstraint));
+ return Optional.of(TableChange.modify(newConstraint));
}
}
+ return Optional.empty();
+ }
+ private Optional<TableChange> getWatermarkChange(
+ final ResolvedSchema oldSchema,
+ final ResolvedSchema newSchema,
+ boolean hasSchemaDefinition) {
final WatermarkSpec oldWatermarkSpec =
oldSchema.getWatermarkSpecs().isEmpty()
? null
@@ -223,17 +240,16 @@ public class SqlCreateOrAlterMaterializedTableConverter
newSchema.getWatermarkSpecs().isEmpty()
? null
: newSchema.getWatermarkSpecs().get(0);
- if (!Objects.equals(oldWatermarkSpec, newWatermarkSpec)) {
+ if (hasSchemaDefinition && !Objects.equals(oldWatermarkSpec,
newWatermarkSpec)) {
if (newWatermarkSpec == null) {
- changes.add(TableChange.dropWatermark());
+ return Optional.of(TableChange.dropWatermark());
} else if (oldWatermarkSpec == null) {
- changes.add(TableChange.add(newWatermarkSpec));
+ return Optional.of(TableChange.add(newWatermarkSpec));
} else {
- changes.add(TableChange.modify(newWatermarkSpec));
+ return Optional.of(TableChange.modify(newWatermarkSpec));
}
}
-
- return changes;
+ return Optional.empty();
}
@Override
@@ -261,6 +277,15 @@ public class SqlCreateOrAlterMaterializedTableConverter
&& sqlNodeList.getList().get(0) instanceof
SqlRegularColumn;
}
+ @Override
+ public boolean hasConstraintDefinition() {
+ if
(!sqlCreateMaterializedTable.getTableConstraints().isEmpty()) {
+ return true;
+ }
+
+ return hasSchemaDefinition();
+ }
+
@Override
public Schema getMergedSchema() {
final Set<String> querySchemaColumnNames =
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
index 95e7d1714cb..9ba20168766 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -104,7 +104,6 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
.containsExactly(
// No explicit schema, so nullable will be used
TableChange.add(Column.physical("a1",
DataTypes.BIGINT())),
- TableChange.dropConstraint("ct1"),
TableChange.modifyDefinitionQuery(
"SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM
`t1`",
"SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`,
`t1`.`d`, `t1`.`a` AS `a1`\n"
@@ -283,7 +282,7 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
@Test
void testCreateOrAlterMaterializedTableWithDroppedConstraint() {
final String sql =
- "CREATE OR ALTER MATERIALIZED TABLE mt \n"
+ "CREATE OR ALTER MATERIALIZED TABLE mt (a BIGINT NOT NULL, b
STRING, c INT, d STRING)\n"
+ "COMMENT 'New materialized table comment'\n"
+ "PARTITIONED BY (a, d)\n"
+ "WITH (\n"
@@ -304,6 +303,28 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
+ " DROP CONSTRAINT ct1");
}
+ @Test
+ void testCreateOrAlterMaterializedTableWithoutSchemaConstraintShouldStay()
{
+ final String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE mt \n"
+ + "COMMENT 'New materialized table comment'\n"
+ + "PARTITIONED BY (a, d)\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem', \n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "REFRESH_MODE = FULL\n"
+ + "AS SELECT * FROM t1";
+ Operation operation = parse(sql);
+
+
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+ FullAlterMaterializedTableOperation op =
(FullAlterMaterializedTableOperation) operation;
+ assertThat(op.getTableChanges()).isEmpty();
+ assertThat(operation.asSummaryString())
+ .isEqualTo("CREATE OR ALTER MATERIALIZED TABLE
builtin.default.mt\n");
+ }
+
@Test
void testCreateOrAlterMaterializedTableWithChangedConstraint() {
final String sql =
@@ -444,7 +465,7 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
}
@Test
- void
testCreateOrAlterMaterializedTableWithDroppedWatermarkByNoExplicitSchema()
+ void testCreateOrAlterMaterializedTableWithoutSchemaWatermarkShouldStay()
throws TableAlreadyExistException, DatabaseNotExistException {
final String prepSql =
"CREATE MATERIALIZED TABLE mt1 (\n"
@@ -462,9 +483,7 @@ class
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
assertThat(operation.asSummaryString())
- .isEqualTo(
- "CREATE OR ALTER MATERIALIZED TABLE
builtin.default.mt1\n"
- + " DROP WATERMARK");
+ .isEqualTo("CREATE OR ALTER MATERIALIZED TABLE
builtin.default.mt1\n");
}
@Test