This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 37004bb3953 [FLINK-39276][table] `CREATE OR ALTER MATERIALIZED TABLE` 
should respect nullability defined in schema
37004bb3953 is described below

commit 37004bb3953fe06015ef4af2902d658c1ee1eb4c
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 20 15:12:54 2026 +0100

    [FLINK-39276][table] `CREATE OR ALTER MATERIALIZED TABLE` should respect 
nullability defined in schema
---
 .../AbstractCreateMaterializedTableConverter.java  |  2 +
 ...SqlCreateOrAlterMaterializedTableConverter.java | 18 +++++-
 .../planner/utils/MaterializedTableUtils.java      |  7 ++-
 ...erializedTableNodeToOperationConverterTest.java | 64 +++++++++++++++++++++-
 4 files changed, 84 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index 7f3e6d2b826..65076bc8503 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -57,6 +57,8 @@ public abstract class 
AbstractCreateMaterializedTableConverter<T extends SqlCrea
         implements SqlNodeConverter<T> {
     /** Context of create table converters while merging source and derived 
items. */
     protected interface MergeContext {
+        boolean hasSchemaDefinition();
+
         Schema getMergedSchema();
 
         Map<String, String> getMergedTableOptions();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index c7ce5f9f173..a9eba27c5b2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.SchemaResolver;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.operations.Operation;
@@ -103,7 +104,9 @@ public class SqlCreateOrAlterMaterializedTableConverter
             final ObjectIdentifier identifier) {
         final MergeContext mergeContext = 
getMergeContext(sqlCreateOrAlterTable, context);
         return new FullAlterMaterializedTableOperation(
-                identifier, buildTableChanges(mergeContext), oldTable);
+                identifier,
+                buildTableChanges(mergeContext, 
context.getCatalogManager().getSchemaResolver()),
+                oldTable);
     }
 
     private Operation handleCreate(
@@ -117,14 +120,16 @@ public class SqlCreateOrAlterMaterializedTableConverter
     }
 
     private Function<ResolvedCatalogMaterializedTable, List<TableChange>> 
buildTableChanges(
-            final MergeContext mergeContext) {
+            final MergeContext mergeContext, final SchemaResolver 
schemaResolver) {
         return oldTable -> {
             final List<TableChange> changes = new ArrayList<>();
 
             final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
             final List<Column> newColumns =
                     MaterializedTableUtils.validateAndExtractNewColumns(
-                            oldSchema, mergeContext.getMergedQuerySchema());
+                            oldSchema,
+                            
schemaResolver.resolve(mergeContext.getMergedSchema()),
+                            mergeContext.hasSchemaDefinition());
 
             newColumns.forEach(column -> changes.add(TableChange.add(column)));
             changes.add(
@@ -173,6 +178,13 @@ public class SqlCreateOrAlterMaterializedTableConverter
                     
SqlCreateOrAlterMaterializedTableConverter.this.getQueryResolvedSchema(
                             sqlCreateMaterializedTable, context);
 
+            @Override
+            public boolean hasSchemaDefinition() {
+                final SqlNodeList sqlNodeList = 
sqlCreateMaterializedTable.getColumnList();
+                return !sqlNodeList.getList().isEmpty()
+                        && sqlNodeList.getList().get(0) instanceof 
SqlRegularColumn;
+            }
+
             @Override
             public Schema getMergedSchema() {
                 final Set<String> querySchemaColumnNames =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 954f876240b..7d3b2898124 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -279,7 +279,7 @@ public class MaterializedTableUtils {
     }
 
     public static List<Column> validateAndExtractNewColumns(
-            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+            ResolvedSchema oldSchema, ResolvedSchema newSchema, boolean 
schemaDefinedInQuery) {
         final List<Column> newColumns = getPersistedColumns(newSchema);
         final List<Column> oldColumns = getPersistedColumns(oldSchema);
         final int originalColumnSize = oldColumns.size();
@@ -310,7 +310,10 @@ public class MaterializedTableUtils {
         final List<Column> newAddedColumns = new ArrayList<>();
         for (int i = oldColumns.size(); i < newColumns.size(); i++) {
             Column newColumn = newColumns.get(i);
-            
newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable()));
+            newAddedColumns.add(
+                    schemaDefinedInQuery
+                            ? newColumn
+                            : 
newColumn.copy(newColumn.getDataType().nullable()));
         }
 
         return newAddedColumns;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 02d4275c8e4..bcb1705eb03 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -583,9 +583,9 @@ class SqlMaterializedTableNodeToOperationConverterTest
 
     @Test
     void testAlterMaterializedTableAsQueryWithConflictColumnName() {
-        String sql5 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, 
d, c as a FROM t3";
+        String sql = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d, 
c as a FROM t3";
         AlterMaterializedTableAsQueryOperation 
sqlAlterMaterializedTableAsQuery =
-                (AlterMaterializedTableAsQueryOperation) parse(sql5);
+                (AlterMaterializedTableAsQueryOperation) parse(sql);
 
         assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
                 .containsExactly(
@@ -596,6 +596,48 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                         + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
     }
 
+    @Test
+    void testAlterMaterializedTableAsQueryWithDefinedSchema() {
+        String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl ("
+                        + "`a` BIGINT NOT NULL, `b` STRING, `c` INT, `d` 
STRING, `a1` BIGINT NOT NULL, `f` INT) "
+                        + "AS SELECT a, b, c, d, a as `a1`, 3 as f FROM t3";
+        FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery =
+                (FullAlterMaterializedTableOperation) parse(sql);
+
+        assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
+                .containsExactly(
+                        // If NOT NULL is defined in schema, it should stay
+                        TableChange.add(Column.physical("a1", 
DataTypes.BIGINT().notNull())),
+                        TableChange.add(Column.physical("f", DataTypes.INT())),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS 
`f`\nFROM `t3`",
+                                "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`a` AS `a1`, 3 AS `f`\n"
+                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"),
+                        TableChange.reset("connector"),
+                        TableChange.reset("format"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryWithoutDefinedSchema() {
+        String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl "
+                        + "AS SELECT a, b, c, d, a as `a1` FROM t3";
+        FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery =
+                (FullAlterMaterializedTableOperation) parse(sql);
+
+        assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
+                .containsExactly(
+                        // No explicit schema, so nullable will be used
+                        TableChange.add(Column.physical("a1", 
DataTypes.BIGINT())),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM 
`t3`",
+                                "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`a` AS `a1`\n"
+                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"),
+                        TableChange.reset("connector"),
+                        TableChange.reset("format"));
+    }
+
     @Test
     void testDropMaterializedTable() {
         final String sql = "DROP MATERIALIZED TABLE mtbl1";
@@ -1291,6 +1333,16 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                 + "  `sec` CHAR(1)\n"
                                 + ")"));
 
+        list.add(
+                TestSpec.withExpectedSchema(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
base_mtbl_with_non_persisted (`EXPR$0` INT NOT NULL, `sec` CHAR(1)) AS SELECT 
2, 'a' AS sec",
+                        "(\n"
+                                + "  `m` STRING METADATA VIRTUAL,\n"
+                                + "  `calc` AS ['a' || 'b'],\n"
+                                + "  `EXPR$0` INT NOT NULL,\n"
+                                + "  `sec` CHAR(1)\n"
+                                + ")"));
+
         list.add(
                 TestSpec.withExpectedSchema(
                         // Schema doesn't change, so should be ok
@@ -1343,6 +1395,14 @@ class SqlMaterializedTableNodeToOperationConverterTest
                         ResolvedSchema.of(
                                 Column.physical("shop_id", DataTypes.BIGINT()),
                                 Column.physical("user_id", DataTypes.INT()))),
+                Arguments.of(
+                        operation
+                                + "MATERIALIZED TABLE users_shops (user_id 
INT, shop_id BIGINT NOT NULL)"
+                                + " FRESHNESS = INTERVAL '30' SECOND"
+                                + " AS SELECT 1 AS shop_id, 2 AS user_id",
+                        ResolvedSchema.of(
+                                Column.physical("shop_id", 
DataTypes.BIGINT().notNull()),
+                                Column.physical("user_id", DataTypes.INT()))),
                 Arguments.of(
                         operation
                                 + "MATERIALIZED TABLE users_shops (user_id 
INT, shop_id BIGINT, PRIMARY KEY(user_id) NOT ENFORCED)"

Reply via email to