This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 564840f7f45 [FLINK-39272][table] Changing of `REFRESH_MODE` for
`CREATE OF ALTER MATERIALIZED TABLE` should fail
564840f7f45 is described below
commit 564840f7f45175070c5a9cdc733e1a8e1a6d405d
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Mar 19 22:22:02 2026 +0100
[FLINK-39272][table] Changing of `REFRESH_MODE` for `CREATE OF ALTER
MATERIALIZED TABLE` should fail
---
.../AbstractCreateMaterializedTableConverter.java | 2 ++
...SqlCreateOrAlterMaterializedTableConverter.java | 13 +++++++++++
...erializedTableNodeToOperationConverterTest.java | 26 ++++++++++++++++++++++
3 files changed, 41 insertions(+)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index e04d98fb798..7f3e6d2b826 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -70,6 +70,8 @@ public abstract class
AbstractCreateMaterializedTableConverter<T extends SqlCrea
String getMergedExpandedQuery();
ResolvedSchema getMergedQuerySchema();
+
+ RefreshMode getMergedRefreshMode();
}
protected abstract MergeContext getMergeContext(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 3e7094580db..c7ce5f9f173 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -23,6 +23,7 @@ import
org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMateria
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
@@ -144,6 +145,12 @@ public class SqlCreateOrAlterMaterializedTableConverter
}
}
+ final RefreshMode oldRefreshMode = oldTable.getRefreshMode();
+ final RefreshMode newRefreshMode =
mergeContext.getMergedRefreshMode();
+ if (oldRefreshMode != newRefreshMode && newRefreshMode != null) {
+ throw new ValidationException("Changing of REFRESH MODE is
unsupported");
+ }
+
return changes;
};
}
@@ -228,6 +235,12 @@ public class SqlCreateOrAlterMaterializedTableConverter
public ResolvedSchema getMergedQuerySchema() {
return this.querySchema;
}
+
+ @Override
+ public RefreshMode getMergedRefreshMode() {
+ return getDerivedRefreshMode(
+
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable));
+ }
};
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 9b928b4333d..02d4275c8e4 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -649,6 +649,32 @@ class SqlMaterializedTableNodeToOperationConverterTest
assertThat(materializedTable.getOrigin()).isEqualTo(expected);
}
+ @Test
+ void testCreateOrAlterMaterializedTableWithChangingRefreshMode() {
+ // Changing refresh mode is not supported
+ final String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
+ + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+ + ")\n"
+ + "COMMENT 'materialized table comment'\n"
+ + "PARTITIONED BY (a, d)\n"
+ + "WITH (\n"
+ + " 'connector' = 'filesystem', \n"
+ + " 'format' = 'json'\n"
+ + ")\n"
+ + "FRESHNESS = INTERVAL '30' SECOND\n"
+ + "REFRESH_MODE = CONTINUOUS\n"
+ + "AS SELECT * FROM t1";
+ Operation operation = parse(sql);
+
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+ FullAlterMaterializedTableOperation op =
(FullAlterMaterializedTableOperation) operation;
+ // Will be invoked while operation#execute
+ assertThatThrownBy(op::getTableChanges)
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Changing of REFRESH MODE is unsupported");
+ }
+
@Test
void testCreateOrAlterMaterializedTableForExistingTable() throws
TableNotExistException {
final String sql =