This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08648c53ee738f29e4beb7fa1911800c3d973680
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Jan 22 22:04:53 2026 +0100

    [FLINK-38963][table] Altering query should be possible if `MATERIALIZED 
TABLE` schema contains non persisted columns
    
    This closes #27460.
---
 .../planner/utils/MaterializedTableUtils.java      |  23 +++--
 ...erializedTableNodeToOperationConverterTest.java | 115 ++++++++++++++-------
 2 files changed, 92 insertions(+), 46 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 42dea1379a2..922c846fc60 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /** The utils for materialized table. */
 @Internal
@@ -116,9 +117,10 @@ public class MaterializedTableUtils {
 
     public static List<Column> validateAndExtractNewColumns(
             ResolvedSchema oldSchema, ResolvedSchema newSchema) {
-        List<Column> newAddedColumns = new ArrayList<>();
-        int originalColumnSize = oldSchema.getColumns().size();
-        int newColumnSize = newSchema.getColumns().size();
+        final List<Column> newColumns = getPersistedColumns(newSchema);
+        final List<Column> oldColumns = getPersistedColumns(oldSchema);
+        final int originalColumnSize = oldColumns.size();
+        final int newColumnSize = newColumns.size();
 
         if (originalColumnSize > newColumnSize) {
             throw new ValidationException(
@@ -129,9 +131,9 @@ public class MaterializedTableUtils {
                             originalColumnSize, newColumnSize));
         }
 
-        for (int i = 0; i < oldSchema.getColumns().size(); i++) {
-            Column oldColumn = oldSchema.getColumns().get(i);
-            Column newColumn = newSchema.getColumns().get(i);
+        for (int i = 0; i < oldColumns.size(); i++) {
+            Column oldColumn = oldColumns.get(i);
+            Column newColumn = newColumns.get(i);
             if (!oldColumn.equals(newColumn)) {
                 throw new ValidationException(
                         String.format(
@@ -142,7 +144,8 @@ public class MaterializedTableUtils {
             }
         }
 
-        for (int i = oldSchema.getColumns().size(); i < 
newSchema.getColumns().size(); i++) {
+        final List<Column> newAddedColumns = new ArrayList<>();
+        for (int i = oldColumns.size(); i < newColumns.size(); i++) {
             Column newColumn = newSchema.getColumns().get(i);
             
newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable()));
         }
@@ -203,6 +206,12 @@ public class MaterializedTableUtils {
         }
     }
 
+    private static List<Column> getPersistedColumns(ResolvedSchema schema) {
+        return schema.getColumns().stream()
+                .filter(Column::isPersisted)
+                .collect(Collectors.toList());
+    }
+
     private static void throwPersistedColumnNotUsedException(String type, 
String columnName) {
         throw new ValidationException(
                 String.format(PERSISTED_COLUMN_NOT_USED_IN_QUERY, type, 
columnName));
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 73c2fe2a16e..a6736a6f1d2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -59,7 +59,6 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -94,7 +93,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                 CatalogTable.newBuilder()
                         .schema(tableSchema)
                         .comment("")
-                        .partitionKeys(Arrays.asList("b", "c"))
+                        .partitionKeys(List.of("b", "c"))
                         .options(options)
                         .build();
         catalog.createTable(path3, catalogTable, true);
@@ -167,6 +166,18 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         + "AS SELECT t1.* FROM t1";
 
         createMaterializedTableInCatalog(sqlWithoutConstraint, 
"base_mtbl_without_constraint");
+
+        // MATERIALIZED TABLE with non persisted columns
+        final String sqlWithNonPersisted =
+                "CREATE MATERIALIZED TABLE base_mtbl_with_non_persisted (\n"
+                        + "   m STRING METADATA VIRTUAL,"
+                        + "   calc AS 'a' || 'b'"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT 1";
+
+        createMaterializedTableInCatalog(sqlWithNonPersisted, 
"base_mtbl_with_non_persisted");
     }
 
     @Test
@@ -503,15 +514,12 @@ class SqlMaterializedTableNodeToOperationConverterTest
         AlterMaterializedTableAsQueryOperation op =
                 (AlterMaterializedTableAsQueryOperation) operation;
         assertThat(op.getTableChanges())
-                .isEqualTo(
-                        Arrays.asList(
-                                TableChange.add(
-                                        Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
-                                TableChange.add(
-                                        Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
-                                TableChange.modifyDefinitionQuery(
-                                        "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
-                                                + "FROM 
`builtin`.`default`.`t3` AS `t3`")));
+                .containsExactly(
+                        TableChange.add(Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                        TableChange.add(Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
         assertThat(operation.asSummaryString())
                 .isEqualTo(
                         "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
@@ -547,12 +555,11 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         .collect(Collectors.toList());
         // added column should be a nullable column.
         assertThat(addedColumn)
-                .isEqualTo(
-                        Arrays.asList(
-                                new Schema.UnresolvedPhysicalColumn(
-                                        "e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
-                                new Schema.UnresolvedPhysicalColumn(
-                                        "f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))));
+                .containsExactly(
+                        new Schema.UnresolvedPhysicalColumn(
+                                "e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                        new Schema.UnresolvedPhysicalColumn(
+                                "f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
     }
 
     @Test
@@ -562,12 +569,11 @@ class SqlMaterializedTableNodeToOperationConverterTest
                 (AlterMaterializedTableAsQueryOperation) parse(sql5);
 
         assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
-                .isEqualTo(
-                        Arrays.asList(
-                                TableChange.add(Column.physical("a0", 
DataTypes.INT())),
-                                TableChange.modifyDefinitionQuery(
-                                        "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`c` AS `a`\n"
-                                                + "FROM 
`builtin`.`default`.`t3` AS `t3`")));
+                .containsExactly(
+                        TableChange.add(Column.physical("a0", 
DataTypes.INT())),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`c` AS `a`\n"
+                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
     }
 
     @Test
@@ -648,15 +654,12 @@ class SqlMaterializedTableNodeToOperationConverterTest
         AlterMaterializedTableAsQueryOperation op =
                 (AlterMaterializedTableAsQueryOperation) operation;
         assertThat(op.getTableChanges())
-                .isEqualTo(
-                        Arrays.asList(
-                                TableChange.add(
-                                        Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
-                                TableChange.add(
-                                        Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
-                                TableChange.modifyDefinitionQuery(
-                                        "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
-                                                + "FROM 
`builtin`.`default`.`t3` AS `t3`")));
+                .containsExactly(
+                        TableChange.add(Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                        TableChange.add(Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
         assertThat(operation.asSummaryString())
                 .isEqualTo(
                         "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
@@ -692,12 +695,11 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         .collect(Collectors.toList());
         // added column should be a nullable column.
         assertThat(addedColumn)
-                .isEqualTo(
-                        Arrays.asList(
-                                new Schema.UnresolvedPhysicalColumn(
-                                        "e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
-                                new Schema.UnresolvedPhysicalColumn(
-                                        "f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))));
+                .containsExactly(
+                        new Schema.UnresolvedPhysicalColumn(
+                                "e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                        new Schema.UnresolvedPhysicalColumn(
+                                "f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
     }
 
     private static Collection<TestSpec> 
testDataForCreateAlterMaterializedTableFailedCase() {
@@ -1051,6 +1053,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
         list.addAll(alterAddSchemaSuccessCase());
         list.addAll(alterModifySchemaSuccessCase());
         list.addAll(alterDropSchemaSuccessCase());
+        list.addAll(alterQuerySuccessCase());
         return list;
     }
 
@@ -1177,6 +1180,40 @@ class SqlMaterializedTableNodeToOperationConverterTest
         return list;
     }
 
+    private static Collection<TestSpec> alterQuerySuccessCase() {
+        final Collection<TestSpec> list = new ArrayList<>();
+
+        list.add(
+                TestSpec.withExpectedSchema(
+                        "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted 
AS SELECT 1",
+                        "(\n"
+                                + "  `m` STRING METADATA VIRTUAL,\n"
+                                + "  `calc` AS 'a' || 'b',\n"
+                                + "  `EXPR$0` INT NOT NULL\n"
+                                + ")"));
+
+        list.add(
+                TestSpec.withExpectedSchema(
+                        "ALTER MATERIALIZED TABLE base_mtbl_with_non_persisted 
AS SELECT 2, 'a' AS sec",
+                        "(\n"
+                                + "  `m` STRING METADATA VIRTUAL,\n"
+                                + "  `calc` AS 'a' || 'b',\n"
+                                + "  `EXPR$0` INT NOT NULL,\n"
+                                + "  `sec` CHAR(1)\n"
+                                + ")"));
+
+        list.add(
+                TestSpec.withExpectedSchema(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
base_mtbl_with_non_persisted AS SELECT 2, 'a' AS sec",
+                        "(\n"
+                                + "  `m` STRING METADATA VIRTUAL,\n"
+                                + "  `calc` AS 'a' || 'b',\n"
+                                + "  `EXPR$0` INT NOT NULL,\n"
+                                + "  `sec` CHAR(1)\n"
+                                + ")"));
+        return list;
+    }
+
     private static Collection<Arguments> 
testDataWithDifferentSchemasSuccessCase() {
         final Collection<Arguments> list = new ArrayList<>();
         list.addAll(createOrAlter(CREATE_OPERATION));
@@ -1259,7 +1296,7 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 .build())
                 .comment("materialized table comment")
                 .options(Map.of("connector", "filesystem", "format", "json"))
-                .partitionKeys(Arrays.asList("a", "d"))
+                .partitionKeys(List.of("a", "d"))
                 .originalQuery("SELECT *\nFROM `t1`")
                 .expandedQuery(
                         "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"

Reply via email to