This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9a30a23794ef3bfd19ea723bb986e56ee33feec3 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Tue Mar 24 11:00:09 2026 +0100 [FLINK-39284][table] Nullify types for columns while `CREATE OR ALTER MATERIALIZED TABLE` similar to `CREATE` --- ...SqlCreateOrAlterMaterializedTableConverter.java | 12 ++---- .../planner/utils/MaterializedTableUtils.java | 48 +++++++++++++++------- ...reateOrAlterMaterializedTableConverterTest.java | 25 +++++++++++ 3 files changed, 62 insertions(+), 23 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java index 16cb77b0ffd..7b2ac0abfba 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java @@ -24,7 +24,6 @@ import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; -import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; @@ -124,15 +123,12 @@ public class SqlCreateOrAlterMaterializedTableConverter private Function<ResolvedCatalogMaterializedTable, List<TableChange>> buildTableChanges( final MergeContext mergeContext, final SchemaResolver schemaResolver) { return oldTable -> { - final List<TableChange> changes = new ArrayList<>(); - final ResolvedSchema oldSchema = oldTable.getResolvedSchema(); final ResolvedSchema newSchema = schemaResolver.resolve(mergeContext.getMergedSchema()); - final List<Column> newColumns = - MaterializedTableUtils.validateAndExtractNewColumns( - oldSchema, newSchema, mergeContext.hasSchemaDefinition()); - - newColumns.forEach(column -> changes.add(TableChange.add(column))); + final List<TableChange> changes = + new ArrayList<>( + MaterializedTableUtils.validateAndExtractColumnChanges( + oldSchema, newSchema, mergeContext.hasSchemaDefinition())); final UniqueConstraint oldConstraint = oldSchema.getPrimaryKey().orElse(null); final UniqueConstraint newConstraint = newSchema.getPrimaryKey().orElse(null); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java index 7d3b2898124..ea578a2744a 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java @@ -38,6 +38,7 @@ import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableChange.ColumnPosition; import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext; +import org.apache.flink.table.types.DataType; import org.apache.calcite.sql.SqlIntervalLiteral; import org.apache.calcite.sql.SqlIntervalLiteral.IntervalValue; @@ -278,7 +279,7 @@ public class MaterializedTableUtils { } } - public static List<Column> validateAndExtractNewColumns( + public static List<TableChange> validateAndExtractColumnChanges( ResolvedSchema oldSchema, ResolvedSchema newSchema, boolean schemaDefinedInQuery) { final List<Column> newColumns = getPersistedColumns(newSchema); final List<Column> oldColumns = getPersistedColumns(oldSchema); @@ -294,29 +295,46 @@ public class MaterializedTableUtils { originalColumnSize, newColumnSize)); } + final List<TableChange> columnChanges = new ArrayList<>(); for (int i = 0; i < oldColumns.size(); i++) { - Column oldColumn = oldColumns.get(i); - Column newColumn = newColumns.get(i); + final Column oldColumn = oldColumns.get(i); + final Column newColumn = newColumns.get(i); + final DataType newColumnDataType = + getNewColumnDatatype(oldColumn, newColumns.get(i), schemaDefinedInQuery); if (!oldColumn.equals(newColumn)) { - throw new ValidationException( - String.format( - "When modifying the query of a materialized table, " - + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" - + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", - i, oldColumn, newColumn)); + if (!oldColumn.getName().equals(newColumn.getName()) + || !oldColumn.getDataType().equals(newColumnDataType)) { + throw new ValidationException( + String.format( + "When modifying the query of a materialized table, " + + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" + + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", + i + 1, oldColumn, newColumn)); + } } } - final List<Column> newAddedColumns = new ArrayList<>(); for (int i = oldColumns.size(); i < newColumns.size(); i++) { Column newColumn = newColumns.get(i); - newAddedColumns.add( - schemaDefinedInQuery - ? newColumn - : newColumn.copy(newColumn.getDataType().nullable())); + columnChanges.add( + TableChange.add( + schemaDefinedInQuery + ? newColumn + : newColumn.copy(newColumn.getDataType().nullable()))); } - return newAddedColumns; + return columnChanges; + } + + private static DataType getNewColumnDatatype( + Column oldColumn, Column newColumn, boolean schemaDefinedInQuery) { + if (schemaDefinedInQuery) { + return newColumn.getDataType(); + } + if (oldColumn.getDataType().nullable().equals(newColumn.getDataType().nullable())) { + return oldColumn.getDataType(); + } + return newColumn.getDataType(); } public static ResolvedSchema getQueryOperationResolvedSchema( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java index b9e569cd235..8fbdc6413b8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java @@ -145,6 +145,31 @@ class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest + "The source column has type 'STRING', while the target column has type 'INT'.")); } + @Test + void testCreateOrAlterMaterializedTableForExistingTableNoChanges() { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()).isEmpty(); + assertThat(operation.asSummaryString()) + .isEqualTo("CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt\n"); + } + @Test void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistException { final String sql =
