This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a30a23794ef3bfd19ea723bb986e56ee33feec3
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 24 11:00:09 2026 +0100

    [FLINK-39284][table] Nullify types for columns while `CREATE OR ALTER 
MATERIALIZED TABLE` similar to `CREATE`
---
 ...SqlCreateOrAlterMaterializedTableConverter.java | 12 ++----
 .../planner/utils/MaterializedTableUtils.java      | 48 +++++++++++++++-------
 ...reateOrAlterMaterializedTableConverterTest.java | 25 +++++++++++
 3 files changed, 62 insertions(+), 23 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index 16cb77b0ffd..7b2ac0abfba 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
-import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -124,15 +123,12 @@ public class SqlCreateOrAlterMaterializedTableConverter
     private Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
buildTableChanges(
             final MergeContext mergeContext, final SchemaResolver 
schemaResolver) {
         return oldTable -> {
-            final List<TableChange> changes = new ArrayList<>();
-
             final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
             final ResolvedSchema newSchema = 
schemaResolver.resolve(mergeContext.getMergedSchema());
-            final List<Column> newColumns =
-                    MaterializedTableUtils.validateAndExtractNewColumns(
-                            oldSchema, newSchema, 
mergeContext.hasSchemaDefinition());
-
-            newColumns.forEach(column -> changes.add(TableChange.add(column)));
+            final List<TableChange> changes =
+                    new ArrayList<>(
+                            
MaterializedTableUtils.validateAndExtractColumnChanges(
+                                    oldSchema, newSchema, 
mergeContext.hasSchemaDefinition()));
 
             final UniqueConstraint oldConstraint = 
oldSchema.getPrimaryKey().orElse(null);
             final UniqueConstraint newConstraint = 
newSchema.getPrimaryKey().orElse(null);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 7d3b2898124..ea578a2744a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableChange.ColumnPosition;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import 
org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
+import org.apache.flink.table.types.DataType;
 
 import org.apache.calcite.sql.SqlIntervalLiteral;
 import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue;
@@ -278,7 +279,7 @@ public class MaterializedTableUtils {
         }
     }
 
-    public static List<Column> validateAndExtractNewColumns(
+    public static List<TableChange> validateAndExtractColumnChanges(
             ResolvedSchema oldSchema, ResolvedSchema newSchema, boolean 
schemaDefinedInQuery) {
         final List<Column> newColumns = getPersistedColumns(newSchema);
         final List<Column> oldColumns = getPersistedColumns(oldSchema);
@@ -294,29 +295,46 @@ public class MaterializedTableUtils {
                             originalColumnSize, newColumnSize));
         }
 
+        final List<TableChange> columnChanges = new ArrayList<>();
         for (int i = 0; i < oldColumns.size(); i++) {
-            Column oldColumn = oldColumns.get(i);
-            Column newColumn = newColumns.get(i);
+            final Column oldColumn = oldColumns.get(i);
+            final Column newColumn = newColumns.get(i);
+            final DataType newColumnDataType =
+                    getNewColumnDatatype(oldColumn, newColumns.get(i), 
schemaDefinedInQuery);
             if (!oldColumn.equals(newColumn)) {
-                throw new ValidationException(
-                        String.format(
-                                "When modifying the query of a materialized 
table, "
-                                        + "currently only support appending 
columns at the end of original schema, dropping, renaming, and reordering 
columns are not supported.\n"
-                                        + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
-                                i, oldColumn, newColumn));
+                if (!oldColumn.getName().equals(newColumn.getName())
+                        || !oldColumn.getDataType().equals(newColumnDataType)) 
{
+                    throw new ValidationException(
+                            String.format(
+                                    "When modifying the query of a 
materialized table, "
+                                            + "currently only support 
appending columns at the end of original schema, dropping, renaming, and 
reordering columns are not supported.\n"
+                                            + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
+                                    i + 1, oldColumn, newColumn));
+                }
             }
         }
 
-        final List<Column> newAddedColumns = new ArrayList<>();
         for (int i = oldColumns.size(); i < newColumns.size(); i++) {
             Column newColumn = newColumns.get(i);
-            newAddedColumns.add(
-                    schemaDefinedInQuery
-                            ? newColumn
-                            : 
newColumn.copy(newColumn.getDataType().nullable()));
+            columnChanges.add(
+                    TableChange.add(
+                            schemaDefinedInQuery
+                                    ? newColumn
+                                    : 
newColumn.copy(newColumn.getDataType().nullable())));
         }
 
-        return newAddedColumns;
+        return columnChanges;
+    }
+
+    private static DataType getNewColumnDatatype(
+            Column oldColumn, Column newColumn, boolean schemaDefinedInQuery) {
+        if (schemaDefinedInQuery) {
+            return newColumn.getDataType();
+        }
+        if 
(oldColumn.getDataType().nullable().equals(newColumn.getDataType().nullable())) 
{
+            return oldColumn.getDataType();
+        }
+        return newColumn.getDataType();
     }
 
     public static ResolvedSchema getQueryOperationResolvedSchema(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
index b9e569cd235..8fbdc6413b8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -145,6 +145,31 @@ class 
SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
                                 + "The source column has type 'STRING', while 
the target column has type 'INT'."));
     }
 
+    @Test
+    void testCreateOrAlterMaterializedTableForExistingTableNoChanges() {
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'filesystem', \n"
+                        + "  'format' = 'json'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT * FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges()).isEmpty();
+        assertThat(operation.asSummaryString())
+                .isEqualTo("CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt\n");
+    }
+
     @Test
     void testCreateOrAlterMaterializedTableForExistingTable() throws 
TableNotExistException {
         final String sql =

Reply via email to