This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 564840f7f45 [FLINK-39272][table] Changing of `REFRESH_MODE` for 
`CREATE OF ALTER MATERIALIZED TABLE` should fail
564840f7f45 is described below

commit 564840f7f45175070c5a9cdc733e1a8e1a6d405d
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Mar 19 22:22:02 2026 +0100

    [FLINK-39272][table] Changing of `REFRESH_MODE` for `CREATE OF ALTER 
MATERIALIZED TABLE` should fail
---
 .../AbstractCreateMaterializedTableConverter.java  |  2 ++
 ...SqlCreateOrAlterMaterializedTableConverter.java | 13 +++++++++++
 ...erializedTableNodeToOperationConverterTest.java | 26 ++++++++++++++++++++++
 3 files changed, 41 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index e04d98fb798..7f3e6d2b826 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -70,6 +70,8 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
         String getMergedExpandedQuery();
 
         ResolvedSchema getMergedQuerySchema();
+
+        RefreshMode getMergedRefreshMode();
     }
 
     protected abstract MergeContext getMergeContext(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 3e7094580db..c7ce5f9f173 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMateria
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
+import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
@@ -144,6 +145,12 @@ public class SqlCreateOrAlterMaterializedTableConverter
                 }
             }
 
+            final RefreshMode oldRefreshMode = oldTable.getRefreshMode();
+            final RefreshMode newRefreshMode = 
mergeContext.getMergedRefreshMode();
+            if (oldRefreshMode != newRefreshMode && newRefreshMode != null) {
+                throw new ValidationException("Changing of REFRESH MODE is 
unsupported");
+            }
+
             return changes;
         };
     }
@@ -228,6 +235,12 @@ public class SqlCreateOrAlterMaterializedTableConverter
             public ResolvedSchema getMergedQuerySchema() {
                 return this.querySchema;
             }
+
+            @Override
+            public RefreshMode getMergedRefreshMode() {
+                return getDerivedRefreshMode(
+                        
getDerivedLogicalRefreshMode(sqlCreateMaterializedTable));
+            }
         };
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 9b928b4333d..02d4275c8e4 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -649,6 +649,32 @@ class SqlMaterializedTableNodeToOperationConverterTest
         assertThat(materializedTable.getOrigin()).isEqualTo(expected);
     }
 
+    @Test
+    void testCreateOrAlterMaterializedTableWithChangingRefreshMode() {
+        // Changing refresh mode is not supported
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = CONTINUOUS\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        // Will be invoked while operation#execute
+        assertThatThrownBy(op::getTableChanges)
+                .isInstanceOf(ValidationException.class)
+                .hasMessage("Changing of REFRESH MODE is unsupported");
+    }
+
     @Test
     void testCreateOrAlterMaterializedTableForExistingTable() throws 
TableNotExistException {
         final String sql =

Reply via email to