This is an automated email from the ASF dual-hosted git repository. AHeise pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ac4c4a08e0574f92d75885c872865a69e838312 Author: Arvid Heise <[email protected]> AuthorDate: Mon May 18 21:13:26 2026 +0200 [FLINK-39700][table-planner] Name-based column matching; CREATE OR ALTER drops absent non-persisted columns validateAndExtractColumnChanges switches from position-based throws to name-based diff: columns are matched by name; new columns emit AddColumn; matched columns with type/class/comment differences emit ModifyPhysicalColumnType/ModifyColumn/ModifyColumnComment; absent columns emit DropColumn (skipping non-persisted when schemaDefinedInQuery=false). This allows physical DDL columns to be declared in a different order than the AS SELECT projection. AlterMaterializedTableChangeOperation gains a protected computeNewTable() hook so subclasses can plug in a different builder without shadowing the cached newTable field. FullAlterMaterializedTableOperation overrides the hook with a newTableBuilder lambda supplied by SqlCreateOrAlterMaterializedTableConverter so non-persisted columns absent from the new DDL are dropped (declarative CREATE OR ALTER semantics). Tests for validateAndExtractColumnChanges are added alongside. --- .../AlterMaterializedTableChangeOperation.java | 105 ++++---- .../FullAlterMaterializedTableOperation.java | 14 +- .../AbstractCreateMaterializedTableConverter.java | 6 + ...SqlCreateOrAlterMaterializedTableConverter.java | 130 +++++++--- .../planner/utils/MaterializedTableUtils.java | 137 ++++++---- ...erializedTableNodeToOperationConverterTest.java | 9 +- .../utils/ValidateAndExtractColumnChangesTest.java | 281 +++++++++++++++++++++ 7 files changed, 539 insertions(+), 143 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java index 2961dc88a10..823cacb4327 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java @@ -18,7 +18,6 @@ package org.apache.flink.table.operations.materializedtable; -import org.apache.flink.annotation.Confluent; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.api.ValidationException; @@ -82,33 +81,23 @@ public class AlterMaterializedTableChangeOperation extends AlterMaterializedTabl public CatalogMaterializedTable getNewTable() { if (newTable == null) { - newTable = - MaterializedTableChangeHandler.buildNewMaterializedTable( - getHandlerWithChanges()); + newTable = computeNewTable(); } return newTable; } - @Confluent public ResolvedCatalogMaterializedTable getOldTable() { return oldTable; } - @Confluent public void setOldTable(final ResolvedCatalogMaterializedTable oldTable) { this.oldTable = oldTable; + // All caches are derived from oldTable; invalidate them together. + this.tableChanges = null; + this.handler = null; this.newTable = null; } - protected MaterializedTableChangeHandler getHandlerWithChanges() { - if (handler == null) { - handler = - MaterializedTableChangeHandler.getHandlerWithChanges( - oldTable, getTableChanges()); - } - return handler; - } - @VisibleForTesting public void validateChanges() { final List<TableChange> changes = getTableChanges(); @@ -137,18 +126,55 @@ public class AlterMaterializedTableChangeOperation extends AlterMaterializedTabl } } + @Override + public TableResultInternal execute(Context ctx) { + validateChanges(); + ctx.getCatalogManager() + .alterTable(getNewTable(), getTableChanges(), getTableIdentifier(), false); + return TableResultImpl.TABLE_RESULT_OK; + } + + @Override + public String asSummaryString() { + String changes = + getTableChanges().stream() + .map(AlterMaterializedTableChangeOperation::toString) + .collect(Collectors.joining(",\n")); + return String.format( + "%s %s\n%s", getOperationName(), tableIdentifier.asSummaryString(), changes); + } + + /** Hook for subclasses to provide a different new-table builder. */ + protected CatalogMaterializedTable computeNewTable() { + return MaterializedTableChangeHandler.buildNewMaterializedTable(getHandlerWithChanges()); + } + + protected MaterializedTableChangeHandler getHandlerWithChanges() { + if (handler == null) { + handler = + MaterializedTableChangeHandler.getHandlerWithChanges( + oldTable, getTableChanges()); + } + return handler; + } + + protected String getOperationName() { + return "ALTER MATERIALIZED TABLE"; + } + private void checkDroppedColumn( DropColumn change, List<Column> oldColumns, Map<String, Integer> columnIndex, List<String> errors) { - final Integer idx = columnIndex.get(change.getColumnName()); - if (idx != null && oldColumns.get(idx).isPersisted()) { - errors.add( - String.format( - "Dropping of persisted column `%s` is not supported.", - change.getColumnName())); + final int idx = columnIndex.getOrDefault(change.getColumnName(), -1); + if (idx < 0 || !oldColumns.get(idx).isPersisted()) { + return; } + errors.add( + String.format( + "Dropping of persisted column `%s` is not supported.", + change.getColumnName())); } private void checkPositionChange( @@ -172,14 +198,15 @@ public class AlterMaterializedTableChangeOperation extends AlterMaterializedTabl List<Column> oldColumns, Map<String, Integer> columnIndex, List<String> errors) { - final Integer idx = columnIndex.get(change.getOldColumn().getName()); - if (idx != null) { - errors.add( - positionChangeError( - change.getOldColumn().asSummaryString(), - change.getNewColumn().asSummaryString(), - idx)); + final int idx = columnIndex.getOrDefault(change.getOldColumn().getName(), -1); + if (idx < 0) { + return; } + errors.add( + positionChangeError( + change.getOldColumn().asSummaryString(), + change.getNewColumn().asSummaryString(), + idx)); } /** @@ -205,28 +232,6 @@ public class AlterMaterializedTableChangeOperation extends AlterMaterializedTabl position + 1, oldColumn, newColumn); } - @Override - public TableResultInternal execute(Context ctx) { - validateChanges(); - ctx.getCatalogManager() - .alterTable(getNewTable(), getTableChanges(), getTableIdentifier(), false); - return TableResultImpl.TABLE_RESULT_OK; - } - - @Override - public String asSummaryString() { - String changes = - getTableChanges().stream() - .map(AlterMaterializedTableChangeOperation::toString) - .collect(Collectors.joining(",\n")); - return String.format( - "%s %s\n%s", getOperationName(), tableIdentifier.asSummaryString(), changes); - } - - protected String getOperationName() { - return "ALTER MATERIALIZED TABLE"; - } - private static String toString(TableChange tableChange) { if (tableChange instanceof ModifyRefreshStatus) { ModifyRefreshStatus refreshStatus = (ModifyRefreshStatus) tableChange; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java index 4b1d4ae395d..ade473ddeac 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java @@ -19,6 +19,7 @@ package org.apache.flink.table.operations.materializedtable; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.TableChange; @@ -33,11 +34,22 @@ import java.util.function.Function; @Internal public class FullAlterMaterializedTableOperation extends AlterMaterializedTableChangeOperation { + private final Function<ResolvedCatalogMaterializedTable, CatalogMaterializedTable> + newTableBuilder; + public FullAlterMaterializedTableOperation( final ObjectIdentifier tableIdentifier, final Function<ResolvedCatalogMaterializedTable, List<TableChange>> tableChangeForTable, - final ResolvedCatalogMaterializedTable oldTable) { + final ResolvedCatalogMaterializedTable oldTable, + final Function<ResolvedCatalogMaterializedTable, CatalogMaterializedTable> + newTableBuilder) { super(tableIdentifier, tableChangeForTable, oldTable); + this.newTableBuilder = newTableBuilder; + } + + @Override + protected CatalogMaterializedTable computeNewTable() { + return newTableBuilder.apply(getOldTable()); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java index 2d6d10bdc53..b788f0560e3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java @@ -81,7 +81,13 @@ public abstract class AbstractCreateMaterializedTableConverter<T extends SqlCrea RefreshMode getMergedRefreshMode(); + LogicalRefreshMode getMergedLogicalRefreshMode(); + StartMode getMergedStartMode(); + + String getMergedComment(); + + IntervalFreshness getMergedFreshness(); } protected abstract MergeContext getMergeContext( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java index 0d81346262c..c1e35838ead 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java @@ -23,7 +23,10 @@ import org.apache.flink.sql.parser.ddl.materializedtable.SqlCreateOrAlterMateria import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; @@ -37,6 +40,7 @@ import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.MaterializedTableChangeHandler; import org.apache.flink.table.planner.operations.converters.MergeTableAsUtil; import org.apache.flink.table.planner.utils.MaterializedTableUtils; @@ -50,7 +54,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.function.Function; /** A converter for {@link SqlCreateOrAlterMaterializedTable}. */ public class SqlCreateOrAlterMaterializedTableConverter @@ -105,11 +108,40 @@ public class SqlCreateOrAlterMaterializedTableConverter final ResolvedCatalogMaterializedTable oldTable, final ConvertContext context, final ObjectIdentifier identifier) { + final SchemaResolver schemaResolver = context.getCatalogManager().getSchemaResolver(); final MergeContext mergeContext = getMergeContext(sqlCreateOrAlterTable, context); return new FullAlterMaterializedTableOperation( identifier, - buildTableChanges(mergeContext, context.getCatalogManager().getSchemaResolver()), - oldTable); + currentTable -> buildTableChanges(currentTable, mergeContext, schemaResolver), + oldTable, + currentTable -> buildNewTable(currentTable, mergeContext, schemaResolver)); + } + + private CatalogMaterializedTable buildNewTable( + final ResolvedCatalogMaterializedTable currentTable, + final MergeContext mergeContext, + final SchemaResolver schemaResolver) { + return CatalogMaterializedTable.newBuilder() + .schema( + MaterializedTableChangeHandler.getHandlerWithChanges( + currentTable, + getSchemaTableChanges( + mergeContext, schemaResolver, currentTable)) + .retrieveSchema()) + .comment(mergeContext.getMergedComment()) + .partitionKeys(mergeContext.getMergedPartitionKeys()) + .options(mergeContext.getMergedTableOptions()) + .originalQuery(mergeContext.getMergedOriginalQuery()) + .expandedQuery(mergeContext.getMergedExpandedQuery()) + .distribution(mergeContext.getMergedTableDistribution().orElse(null)) + .freshness(mergeContext.getMergedFreshness()) + .logicalRefreshMode(mergeContext.getMergedLogicalRefreshMode()) + .refreshMode(mergeContext.getMergedRefreshMode()) + .refreshStatus(currentTable.getRefreshStatus()) + .refreshHandlerDescription(currentTable.getRefreshHandlerDescription().orElse(null)) + .serializedRefreshHandler(currentTable.getSerializedRefreshHandler()) + .startMode(mergeContext.getMergedStartMode()) + .build(); } private Operation handleCreate( @@ -122,33 +154,37 @@ public class SqlCreateOrAlterMaterializedTableConverter return new CreateMaterializedTableOperation(identifier, resolvedTable); } - private Function<ResolvedCatalogMaterializedTable, List<TableChange>> buildTableChanges( - final MergeContext mergeContext, final SchemaResolver schemaResolver) { - return oldTable -> { - final List<TableChange> changes = - getSchemaTableChanges(mergeContext, schemaResolver, oldTable); + private List<TableChange> buildTableChanges( + final ResolvedCatalogMaterializedTable oldTable, + final MergeContext mergeContext, + final SchemaResolver schemaResolver) { + final List<TableChange> changes = + getSchemaTableChanges(mergeContext, schemaResolver, oldTable); - changes.addAll(getQueryTableChanges(mergeContext, oldTable)); - changes.addAll(getOptionsTableChanges(mergeContext, oldTable)); - changes.addAll(getDistributionTableChanges(mergeContext, oldTable)); + changes.addAll(getQueryTableChanges(mergeContext, oldTable)); + changes.addAll(getOptionsTableChanges(mergeContext, oldTable)); + changes.addAll(getDistributionTableChanges(mergeContext, oldTable)); - final RefreshMode oldRefreshMode = oldTable.getRefreshMode(); - final RefreshMode newRefreshMode = mergeContext.getMergedRefreshMode(); - if (oldRefreshMode != newRefreshMode && newRefreshMode != null) { - throw new ValidationException("Changing of REFRESH MODE is unsupported"); - } + final RefreshMode oldRefreshMode = oldTable.getRefreshMode(); + final RefreshMode newRefreshMode = mergeContext.getMergedRefreshMode(); + if (oldRefreshMode != newRefreshMode && newRefreshMode != null) { + throw new ValidationException("Changing of REFRESH MODE is unsupported"); + } - final StartMode newStartMode = mergeContext.getMergedStartMode(); + final StartMode newStartMode = mergeContext.getMergedStartMode(); + if (newStartMode != null) { final StartMode oldStartMode = oldTable.getStartMode() .orElseThrow( - () -> new ValidationException("START_MODE must not be null")); + () -> + new ValidationException( + "Start mode must be set on materialized table.")); if (!Objects.equals(oldStartMode, newStartMode)) { changes.add(TableChange.modifyStartMode(newStartMode)); } + } - return changes; - }; + return changes; } private List<TableChange> getDistributionTableChanges( @@ -295,21 +331,12 @@ public class SqlCreateOrAlterMaterializedTableConverter @Override public Schema getMergedSchema() { - final Set<String> querySchemaColumnNames = - new HashSet<>(querySchema.getColumnNames()); final SqlNodeList sqlNodeList = sqlCreateMaterializedTable.getColumnList(); - for (SqlNode column : sqlNodeList) { - if (!(column instanceof SqlRegularColumn)) { - continue; - } - - SqlRegularColumn physicalColumn = (SqlRegularColumn) column; - if (!querySchemaColumnNames.contains(physicalColumn.getName().getSimple())) { - throw new ValidationException( - String.format( - "Invalid as physical column '%s' is defined in the DDL, but is not used in a query column.", - physicalColumn.getName().getSimple())); - } + if (createOrAlterOperation(sqlCreateMaterializedTable)) { + MaterializedTableUtils.validatePersistedColumnsUsedByQuery( + sqlNodeList, querySchema); + } else { + validatePhysicalColumnsUsedByQuery(sqlNodeList, querySchema); } if (sqlCreateMaterializedTable.isSchemaWithColumnsIdentifiersOnly()) { // If only column identifiers are provided, then these are used to @@ -358,14 +385,45 @@ public class SqlCreateOrAlterMaterializedTableConverter @Override public RefreshMode getMergedRefreshMode() { - return getDerivedRefreshMode( - getDerivedLogicalRefreshMode(sqlCreateMaterializedTable)); + return getDerivedRefreshMode(getMergedLogicalRefreshMode()); + } + + @Override + public LogicalRefreshMode getMergedLogicalRefreshMode() { + return getDerivedLogicalRefreshMode(sqlCreateMaterializedTable); } @Override public StartMode getMergedStartMode() { return getStartMode(sqlCreateMaterializedTable, context); } + + @Override + public String getMergedComment() { + return getComment(sqlCreateMaterializedTable); + } + + @Override + public IntervalFreshness getMergedFreshness() { + return getDerivedFreshness(sqlCreateMaterializedTable); + } }; } + + private static void validatePhysicalColumnsUsedByQuery( + SqlNodeList sqlNodeList, ResolvedSchema querySchema) { + final Set<String> querySchemaColumnNames = new HashSet<>(querySchema.getColumnNames()); + for (SqlNode column : sqlNodeList) { + if (!(column instanceof SqlRegularColumn)) { + continue; + } + final SqlRegularColumn physicalColumn = (SqlRegularColumn) column; + if (!querySchemaColumnNames.contains(physicalColumn.getName().getSimple())) { + throw new ValidationException( + String.format( + "Invalid as physical column '%s' is defined in the DDL, but is not used in a query column.", + physicalColumn.getName().getSimple())); + } + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java index 99fd660e836..e454eb7dd8c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java @@ -54,7 +54,6 @@ import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlTimestampLiteral; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.TimestampString; -import org.apache.commons.lang3.StringUtils; import java.math.BigDecimal; import java.time.Instant; @@ -447,68 +446,106 @@ public class MaterializedTableUtils { public static List<TableChange> validateAndExtractColumnChanges( ResolvedSchema oldSchema, ResolvedSchema newSchema, boolean schemaDefinedInQuery) { - final List<Column> newColumns = getPersistedColumns(newSchema); - final List<Column> oldColumns = getPersistedColumns(oldSchema); - final int originalColumnSize = oldColumns.size(); - final int newColumnSize = newColumns.size(); - - if (originalColumnSize > newColumnSize) { - throw new ValidationException( - String.format( - "Failed to modify query because drop column is unsupported. " - + "When modifying a query, you can only append new columns at the end of original schema. " - + "The original schema has %d columns, but the newly derived schema from the query has %d columns.", - originalColumnSize, newColumnSize)); - } - - final List<TableChange> columnChanges = new ArrayList<>(); + final List<Column> oldColumns = oldSchema.getColumns(); + final Map<String, Tuple2<Column, Integer>> oldByName = new HashMap<>(); for (int i = 0; i < oldColumns.size(); i++) { - final Column oldColumn = oldColumns.get(i); - final Column newColumn = newColumns.get(i); - final DataType newColumnDataType = - getNewColumnDatatype(oldColumn, newColumns.get(i), schemaDefinedInQuery); - if (!oldColumn.equals(newColumn)) { - if (!oldColumn.getName().equals(newColumn.getName()) - || !oldColumn.getDataType().equals(newColumnDataType)) { - throw new ValidationException( - String.format( - "When modifying the query of a materialized table, " - + "currently only support appending columns at the end of original schema, dropping, renaming, and reordering columns are not supported.\n" - + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", - i + 1, oldColumn, newColumn)); - } + oldByName.put(oldColumns.get(i).getName(), Tuple2.of(oldColumns.get(i), i)); + } + final Set<String> seen = new HashSet<>(); + final List<Column> newColumns = newSchema.getColumns(); + final List<TableChange> changes = new ArrayList<>(); + for (int newIndex = 0; newIndex < newColumns.size(); newIndex++) { + final Column newColumn = newColumns.get(newIndex); + seen.add(newColumn.getName()); + final Tuple2<Column, Integer> oldEntry = oldByName.get(newColumn.getName()); + if (oldEntry == null) { + changes.add(addChange(newColumn, schemaDefinedInQuery)); + continue; + } + final Column oldColumn = oldEntry.f0; + // No position diff: DDL order is arbitrary; query-driven reorders are caught by + // buildSchemaTableChanges on the ALTER MT AS path. + if (oldColumn.isPhysical() + && newColumn.isPhysical() + && typeChanged(oldColumn, newColumn, schemaDefinedInQuery)) { + final DataType newType = + schemaDefinedInQuery + ? newColumn.getDataType() + : newColumn.getDataType().nullable(); + changes.add(TableChange.modifyPhysicalColumnType(oldColumn, newType)); + // Type changed; still check whether the comment also changed. final String oldComment = oldColumn.getComment().orElse(null); final String newComment = newColumn.getComment().orElse(null); - - if (StringUtils.isEmpty(oldComment) != StringUtils.isEmpty(newComment) - || StringUtils.isNotEmpty(oldComment) - && !Objects.equals(oldComment, newComment)) { - columnChanges.add(TableChange.modifyColumnComment(oldColumn, newComment)); + if (!Objects.equals(oldComment, newComment)) { + changes.add(TableChange.modifyColumnComment(oldColumn, newComment)); } + continue; + } + if (oldColumn.getClass() != newColumn.getClass() + || !definitionEquals(oldColumn, newColumn)) { + changes.add( + new TableChange.ModifyColumn( + oldColumn, + normalizedColumn(newColumn, schemaDefinedInQuery), + null)); + continue; + } + final String oldComment = oldColumn.getComment().orElse(null); + final String newComment = newColumn.getComment().orElse(null); + if (!Objects.equals(oldComment, newComment)) { + changes.add(TableChange.modifyColumnComment(oldColumn, newComment)); } } - for (int i = oldColumns.size(); i < newColumns.size(); i++) { - Column newColumn = newColumns.get(i); - columnChanges.add( - TableChange.add( - schemaDefinedInQuery - ? newColumn - : newColumn.copy(newColumn.getDataType().nullable()))); + for (Map.Entry<String, Tuple2<Column, Integer>> entry : oldByName.entrySet()) { + if (seen.contains(entry.getKey())) { + continue; + } + // Without an explicit DDL column list the new schema only reflects the query + // projection, so old non-persisted columns are retained, not dropped. + if (!schemaDefinedInQuery && !entry.getValue().f0.isPersisted()) { + continue; + } + changes.add(TableChange.dropColumn(entry.getKey())); } + return changes; + } - return columnChanges; + private static TableChange.AddColumn addChange(Column column, boolean schemaDefinedInQuery) { + return TableChange.add(normalizedColumn(column, schemaDefinedInQuery)); } - private static DataType getNewColumnDatatype( - Column oldColumn, Column newColumn, boolean schemaDefinedInQuery) { - if (schemaDefinedInQuery) { - return newColumn.getDataType(); + private static Column normalizedColumn(Column column, boolean schemaDefinedInQuery) { + return schemaDefinedInQuery ? column : column.copy(column.getDataType().nullable()); + } + + private static boolean definitionEquals(Column oldColumn, Column newColumn) { + if (oldColumn instanceof MetadataColumn && newColumn instanceof MetadataColumn) { + final MetadataColumn oldMeta = (MetadataColumn) oldColumn; + final MetadataColumn newMeta = (MetadataColumn) newColumn; + return oldMeta.isVirtual() == newMeta.isVirtual() + && Objects.equals( + oldMeta.getMetadataKey().orElse(null), + newMeta.getMetadataKey().orElse(null)) + && oldMeta.getDataType().equals(newMeta.getDataType()); } - if (oldColumn.getDataType().nullable().equals(newColumn.getDataType().nullable())) { - return oldColumn.getDataType(); + if (oldColumn instanceof ComputedColumn && newColumn instanceof ComputedColumn) { + return Objects.equals( + ((ComputedColumn) oldColumn).getExpression(), + ((ComputedColumn) newColumn).getExpression()); } - return newColumn.getDataType(); + return true; + } + + private static boolean typeChanged( + Column oldColumn, Column newColumn, boolean schemaDefinedInQuery) { + final DataType oldType = oldColumn.getDataType(); + final DataType newType = newColumn.getDataType(); + // schemaDefinedInQuery=false: schema is inferred from the query, which may flip + // nullability without intent — only the base type difference is a real change. + return schemaDefinedInQuery + ? !oldType.equals(newType) + : !oldType.nullable().equals(newType.nullable()); } public static ResolvedSchema getQueryOperationResolvedSchema( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index caaf0578db2..6bb5fc5503d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -1266,13 +1266,10 @@ class SqlMaterializedTableNodeToOperationConverterTest list.add( TestSpec.withExpectedSchema( + // Explicit DDL omits `m` and `calc` — CREATE OR ALTER is declarative, + // so non-persisted columns absent from the DDL are dropped. "CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted (`EXPR$0` INT NOT NULL, `sec` CHAR(1)) AS SELECT 2, 'a' AS sec", - "(\n" - + " `m` STRING METADATA VIRTUAL,\n" - + " `calc` AS ['a' || 'b'],\n" - + " `EXPR$0` INT NOT NULL,\n" - + " `sec` CHAR(1)\n" - + ")")); + "(\n" + " `EXPR$0` INT NOT NULL,\n" + " `sec` CHAR(1)\n" + ")")); list.add( TestSpec.withExpectedSchema( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ValidateAndExtractColumnChangesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ValidateAndExtractColumnChangesTest.java new file mode 100644 index 00000000000..3d3f177af24 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ValidateAndExtractColumnChangesTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.utils.ResolvedExpressionMock; +import org.apache.flink.table.types.DataType; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collection; +import java.util.List; + +import static org.apache.flink.table.catalog.Column.computed; +import static org.apache.flink.table.catalog.Column.metadata; +import static org.apache.flink.table.catalog.Column.physical; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link MaterializedTableUtils#validateAndExtractColumnChanges}. */ +class ValidateAndExtractColumnChangesTest { + + @ParameterizedTest(name = "{0}") + @MethodSource("input") + void test(TestSpec spec) { + assertThat( + MaterializedTableUtils.validateAndExtractColumnChanges( + spec.oldSchema, spec.newSchema, spec.schemaDefinedInQuery)) + .containsExactlyInAnyOrderElementsOf(spec.expected); + } + + private static Collection<TestSpec> input() { + return List.of( + TestSpec.of( + "identical schemas", + schema(physical("a", DataTypes.INT())), + schema(physical("a", DataTypes.INT())), + true, + List.of()), + TestSpec.of( + "comment added", + schema(physical("a", DataTypes.INT())), + schema(physical("a", DataTypes.INT()).withComment("hello")), + true, + List.of( + TableChange.modifyColumnComment( + physical("a", DataTypes.INT()), "hello"))), + TestSpec.of( + "comment removed", + schema(physical("a", DataTypes.INT()).withComment("hello")), + schema(physical("a", DataTypes.INT())), + true, + List.of( + TableChange.modifyColumnComment( + physical("a", DataTypes.INT()).withComment("hello"), + null))), + TestSpec.of( + "comment changed", + schema(physical("a", DataTypes.INT()).withComment("old")), + schema(physical("a", DataTypes.INT()).withComment("new")), + true, + List.of( + TableChange.modifyColumnComment( + physical("a", DataTypes.INT()).withComment("old"), "new"))), + TestSpec.of( + "single column appended", + schema(physical("a", DataTypes.INT())), + schema(physical("a", DataTypes.INT()), physical("b", DataTypes.STRING())), + true, + List.of(TableChange.add(physical("b", DataTypes.STRING())))), + TestSpec.of( + "multiple columns appended", + schema(physical("a", DataTypes.INT())), + schema( + physical("a", DataTypes.INT()), + physical("b", DataTypes.STRING()), + physical("c", DataTypes.BOOLEAN())), + true, + List.of( + TableChange.add(physical("b", DataTypes.STRING())), + TableChange.add(physical("c", DataTypes.BOOLEAN())))), + TestSpec.of( + "nullability differs but schema is not defined in query", + schema(physical("a", DataTypes.INT().notNull())), + schema(physical("a", DataTypes.INT())), + false, + List.of()), + TestSpec.of( + "computed columns are ignored in persisted comparison", + schema( + physical("a", DataTypes.INT()), + computed("comp", expr(DataTypes.INT()))), + schema( + physical("a", DataTypes.INT()), + physical("b", DataTypes.STRING()), + computed("comp", expr(DataTypes.INT()))), + true, + List.of(TableChange.add(physical("b", DataTypes.STRING())))), + TestSpec.of( + "virtual metadata column drop emits dropColumn", + schema( + physical("a", DataTypes.INT()), + metadata("v", DataTypes.INT(), null, true)), + schema(physical("a", DataTypes.INT())), + true, + List.of(TableChange.dropColumn("v"))), + TestSpec.of( + "non-virtual metadata column treated as persisted", + schema(physical("a", DataTypes.INT())), + schema( + physical("a", DataTypes.INT()), + metadata("m", DataTypes.STRING(), null, false)), + true, + List.of(TableChange.add(metadata("m", DataTypes.STRING(), null, false)))), + TestSpec.of( + "schemaDefinedInQuery=false makes added column nullable", + schema(physical("a", DataTypes.INT())), + schema( + physical("a", DataTypes.INT()), + physical("b", DataTypes.STRING().notNull())), + false, + List.of(TableChange.add(physical("b", DataTypes.STRING())))), + TestSpec.of( + "drop persisted column emits dropColumn", + schema(physical("a", DataTypes.INT()), physical("b", DataTypes.STRING())), + schema(physical("a", DataTypes.INT())), + true, + List.of(TableChange.dropColumn("b"))), + TestSpec.of( + "rename persisted column emits drop + add", + schema(physical("a", DataTypes.INT())), + schema(physical("b", DataTypes.INT())), + true, + List.of( + TableChange.dropColumn("a"), + TableChange.add(physical("b", DataTypes.INT())))), + TestSpec.of( + "persisted type change emits modifyPhysicalColumnType", + schema(physical("a", DataTypes.INT())), + schema(physical("a", DataTypes.STRING())), + true, + List.of( + TableChange.modifyPhysicalColumnType( + physical("a", DataTypes.INT()), DataTypes.STRING()))), + TestSpec.of( + "nullability change with schemaDefinedInQuery=true emits modifyPhysicalColumnType", + schema(physical("a", DataTypes.INT().notNull())), + schema(physical("a", DataTypes.INT())), + true, + List.of( + TableChange.modifyPhysicalColumnType( + physical("a", DataTypes.INT().notNull()), + DataTypes.INT()))), + TestSpec.of( + "type change + comment change both emitted", + schema(physical("a", DataTypes.INT()).withComment("old")), + schema(physical("a", DataTypes.STRING()).withComment("new")), + true, + List.of( + TableChange.modifyPhysicalColumnType( + physical("a", DataTypes.INT()).withComment("old"), + DataTypes.STRING()), + TableChange.modifyColumnComment( + physical("a", DataTypes.INT()).withComment("old"), "new"))), + TestSpec.of( + "reorder persisted columns is silent (DDL order is arbitrary)", + schema(physical("a", DataTypes.INT()), physical("b", DataTypes.STRING())), + schema(physical("b", DataTypes.STRING()), physical("a", DataTypes.INT())), + true, + List.of()), + TestSpec.of( + "add computed column", + schema(physical("a", DataTypes.INT())), + schema( + physical("a", DataTypes.INT()), + computed("comp", expr(DataTypes.INT()))), + true, + List.of(TableChange.add(computed("comp", expr(DataTypes.INT()))))), + TestSpec.of( + "add virtual metadata column", + schema(physical("a", DataTypes.INT())), + schema( + physical("a", DataTypes.INT()), + metadata("v", DataTypes.STRING(), null, true)), + true, + List.of(TableChange.add(metadata("v", DataTypes.STRING(), null, true)))), + TestSpec.of( + "drop computed column", + schema( + physical("a", DataTypes.INT()), + computed("comp", expr(DataTypes.INT()))), + schema(physical("a", DataTypes.INT())), + true, + List.of(TableChange.dropColumn("comp"))), + TestSpec.of( + "drop virtual metadata column", + schema( + physical("a", DataTypes.INT()), + metadata("v", DataTypes.STRING(), null, true)), + schema(physical("a", DataTypes.INT())), + true, + List.of(TableChange.dropColumn("v"))), + TestSpec.of( + "modify computed column expression emits modifyColumn", + schema( + physical("a", DataTypes.INT()), + computed("comp", expr(DataTypes.INT()))), + schema( + physical("a", DataTypes.INT()), + computed("comp", expr(DataTypes.BIGINT()))), + true, + List.of( + new TableChange.ModifyColumn( + computed("comp", expr(DataTypes.INT())), + computed("comp", expr(DataTypes.BIGINT())), + null)))); + } + + private static ResolvedSchema schema(Column... columns) { + return ResolvedSchema.of(columns); + } + + private static ResolvedExpression expr(DataType type) { + return new ResolvedExpressionMock(type, () -> "1"); + } + + private static class TestSpec { + private final String name; + private final ResolvedSchema oldSchema; + private final ResolvedSchema newSchema; + private final boolean schemaDefinedInQuery; + private final List<TableChange> expected; + + TestSpec( + String name, + ResolvedSchema oldSchema, + ResolvedSchema newSchema, + boolean schemaDefinedInQuery, + List<TableChange> expected) { + this.name = name; + this.oldSchema = oldSchema; + this.newSchema = newSchema; + this.schemaDefinedInQuery = schemaDefinedInQuery; + this.expected = expected; + } + + static TestSpec of( + String name, + ResolvedSchema oldSchema, + ResolvedSchema newSchema, + boolean schemaDefinedInQuery, + List<TableChange> expected) { + return new TestSpec(name, oldSchema, newSchema, schemaDefinedInQuery, expected); + } + + @Override + public String toString() { + return name; + } + } +}
