This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 08648c53ee738f29e4beb7fa1911800c3d973680 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Thu Jan 22 22:04:53 2026 +0100 [FLINK-38963][table] Altering query should be possible if `MATERIALIZED TABLE` schema contains non persisted columns This closes #27460. --- .../planner/utils/MaterializedTableUtils.java | 23 +++-- ...erializedTableNodeToOperationConverterTest.java | 115 ++++++++++++++------- 2 files changed, 92 insertions(+), 46 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java index 42dea1379a2..922c846fc60 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java @@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** The utils for materialized table. */ @Internal @@ -116,9 +117,10 @@ public class MaterializedTableUtils { public static List<Column> validateAndExtractNewColumns( ResolvedSchema oldSchema, ResolvedSchema newSchema) { - List<Column> newAddedColumns = new ArrayList<>(); - int originalColumnSize = oldSchema.getColumns().size(); - int newColumnSize = newSchema.getColumns().size(); + final List<Column> newColumns = getPersistedColumns(newSchema); + final List<Column> oldColumns = getPersistedColumns(oldSchema); + final int originalColumnSize = oldColumns.size(); + final int newColumnSize = newColumns.size(); if (originalColumnSize > newColumnSize) { throw new ValidationException( @@ -129,9 +131,9 @@ public class MaterializedTableUtils { originalColumnSize, newColumnSize)); } - for (int i = 0; i < oldSchema.getColumns().size(); i++) { - Column oldColumn = oldSchema.getColumns().get(i); - Column newColumn = newSchema.getColumns().get(i); + for (int i = 0; i < oldColumns.size(); i++) { + Column oldColumn = oldColumns.get(i); + Column newColumn = newColumns.get(i); if (!oldColumn.equals(newColumn)) { throw new ValidationException( String.format( @@ -142,7 +144,8 @@ public class MaterializedTableUtils { } } - for (int i = oldSchema.getColumns().size(); i < newSchema.getColumns().size(); i++) { + final List<Column> newAddedColumns = new ArrayList<>(); + for (int i = oldColumns.size(); i < newColumns.size(); i++) { Column newColumn = newSchema.getColumns().get(i); newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable())); } @@ -203,6 +206,12 @@ public class MaterializedTableUtils { } } + private static List<Column> getPersistedColumns(ResolvedSchema schema) { + return schema.getColumns().stream() + .filter(Column::isPersisted) + .collect(Collectors.toList()); + } + private static void throwPersistedColumnNotUsedException(String type, String columnName) { throw new ValidationException( String.format(PERSISTED_COLUMN_NOT_USED_IN_QUERY, type, columnName)); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 73c2fe2a16e..a6736a6f1d2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -59,7 +59,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -94,7 +93,7 @@ class SqlMaterializedTableNodeToOperationConverterTest CatalogTable.newBuilder() .schema(tableSchema) .comment("") - .partitionKeys(Arrays.asList("b", "c")) + .partitionKeys(List.of("b", "c")) .options(options) .build(); catalog.createTable(path3, catalogTable, true); @@ -167,6 +166,18 @@ class SqlMaterializedTableNodeToOperationConverterTest + "AS SELECT t1.* FROM t1"; createMaterializedTableInCatalog(sqlWithoutConstraint, "base_mtbl_without_constraint"); + + // MATERIALIZED TABLE with non persisted columns + final String sqlWithNonPersisted = + "CREATE MATERIALIZED TABLE base_mtbl_with_non_persisted (\n" + + " m STRING METADATA VIRTUAL," + + " calc AS 'a' || 'b'" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT 1"; + + createMaterializedTableInCatalog(sqlWithNonPersisted, "base_mtbl_with_non_persisted"); } @Test @@ -503,15 +514,12 @@ class SqlMaterializedTableNodeToOperationConverterTest AlterMaterializedTableAsQueryOperation op = (AlterMaterializedTableAsQueryOperation) operation; assertThat(op.getTableChanges()) - .isEqualTo( - Arrays.asList( - TableChange.add( - Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), - TableChange.add( - Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), - TableChange.modifyDefinitionQuery( - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"))); + .containsExactly( + TableChange.add(Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.add(Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.modifyDefinitionQuery( + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t3` AS `t3`")); assertThat(operation.asSummaryString()) .isEqualTo( "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" @@ -547,12 +555,11 @@ class SqlMaterializedTableNodeToOperationConverterTest .collect(Collectors.toList()); // added column should be a nullable column. assertThat(addedColumn) - .isEqualTo( - Arrays.asList( - new Schema.UnresolvedPhysicalColumn( - "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), - new Schema.UnresolvedPhysicalColumn( - "f", DataTypes.VARCHAR(Integer.MAX_VALUE)))); + .containsExactly( + new Schema.UnresolvedPhysicalColumn( + "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), + new Schema.UnresolvedPhysicalColumn( + "f", DataTypes.VARCHAR(Integer.MAX_VALUE))); } @Test @@ -562,12 +569,11 @@ class SqlMaterializedTableNodeToOperationConverterTest (AlterMaterializedTableAsQueryOperation) parse(sql5); assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) - .isEqualTo( - Arrays.asList( - TableChange.add(Column.physical("a0", DataTypes.INT())), - TableChange.modifyDefinitionQuery( - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`c` AS `a`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"))); + .containsExactly( + TableChange.add(Column.physical("a0", DataTypes.INT())), + TableChange.modifyDefinitionQuery( + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`c` AS `a`\n" + + "FROM `builtin`.`default`.`t3` AS `t3`")); } @Test @@ -648,15 +654,12 @@ class SqlMaterializedTableNodeToOperationConverterTest AlterMaterializedTableAsQueryOperation op = (AlterMaterializedTableAsQueryOperation) operation; assertThat(op.getTableChanges()) - .isEqualTo( - Arrays.asList( - TableChange.add( - Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), - TableChange.add( - Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), - TableChange.modifyDefinitionQuery( - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"))); + .containsExactly( + TableChange.add(Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.add(Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.modifyDefinitionQuery( + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t3` AS `t3`")); assertThat(operation.asSummaryString()) .isEqualTo( "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" @@ -692,12 +695,11 @@ class SqlMaterializedTableNodeToOperationConverterTest .collect(Collectors.toList()); // added column should be a nullable column. assertThat(addedColumn) - .isEqualTo( - Arrays.asList( - new Schema.UnresolvedPhysicalColumn( - "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), - new Schema.UnresolvedPhysicalColumn( - "f", DataTypes.VARCHAR(Integer.MAX_VALUE)))); + .containsExactly( + new Schema.UnresolvedPhysicalColumn( + "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), + new Schema.UnresolvedPhysicalColumn( + "f", DataTypes.VARCHAR(Integer.MAX_VALUE))); } private static Collection<TestSpec> testDataForCreateAlterMaterializedTableFailedCase() { @@ -1051,6 +1053,7 @@ class SqlMaterializedTableNodeToOperationConverterTest list.addAll(alterAddSchemaSuccessCase()); list.addAll(alterModifySchemaSuccessCase()); list.addAll(alterDropSchemaSuccessCase()); + list.addAll(alterQuerySuccessCase()); return list; } @@ -1177,6 +1180,40 @@ class SqlMaterializedTableNodeToOperationConverterTest return list; } + private static Collection<TestSpec> alterQuerySuccessCase() { + final Collection<TestSpec> list = new ArrayList<>(); + + list.add( + TestSpec.withExpectedSchema( + "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted AS SELECT 1", + "(\n" + + " `m` STRING METADATA VIRTUAL,\n" + + " `calc` AS 'a' || 'b',\n" + + " `EXPR$0` INT NOT NULL\n" + + ")")); + + list.add( + TestSpec.withExpectedSchema( + "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted AS SELECT 2, 'a' AS sec", + "(\n" + + " `m` STRING METADATA VIRTUAL,\n" + + " `calc` AS 'a' || 'b',\n" + + " `EXPR$0` INT NOT NULL,\n" + + " `sec` CHAR(1)\n" + + ")")); + + list.add( + TestSpec.withExpectedSchema( + "CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted AS SELECT 2, 'a' AS sec", + "(\n" + + " `m` STRING METADATA VIRTUAL,\n" + + " `calc` AS 'a' || 'b',\n" + + " `EXPR$0` INT NOT NULL,\n" + + " `sec` CHAR(1)\n" + + ")")); + return list; + } + private static Collection<Arguments> testDataWithDifferentSchemasSuccessCase() { final Collection<Arguments> list = new ArrayList<>(); list.addAll(createOrAlter(CREATE_OPERATION)); @@ -1259,7 +1296,7 @@ class SqlMaterializedTableNodeToOperationConverterTest .build()) .comment("materialized table comment") .options(Map.of("connector", "filesystem", "format", "json")) - .partitionKeys(Arrays.asList("a", "d")) + .partitionKeys(List.of("a", "d")) .originalQuery("SELECT *\nFROM `t1`") .expandedQuery( "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
