This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 37004bb3953 [FLINK-39276][table] `CREATE OR ALTER MATERIALIZED TABLE`
should respect nullability defined in schema
37004bb3953 is described below
commit 37004bb3953fe06015ef4af2902d658c1ee1eb4c
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Fri Mar 20 15:12:54 2026 +0100
[FLINK-39276][table] `CREATE OR ALTER MATERIALIZED TABLE` should respect
nullability defined in schema
---
.../AbstractCreateMaterializedTableConverter.java | 2 +
...SqlCreateOrAlterMaterializedTableConverter.java | 18 +++++-
.../planner/utils/MaterializedTableUtils.java | 7 ++-
...erializedTableNodeToOperationConverterTest.java | 64 +++++++++++++++++++++-
4 files changed, 84 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
index 7f3e6d2b826..65076bc8503 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/AbstractCreateMaterializedTableConverter.java
@@ -57,6 +57,8 @@ public abstract class
AbstractCreateMaterializedTableConverter<T extends SqlCrea
implements SqlNodeConverter<T> {
/** Context of create table converters while merging source and derived
items. */
protected interface MergeContext {
+ boolean hasSchemaDefinition();
+
Schema getMergedSchema();
Map<String, String> getMergedTableOptions();
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
index c7ce5f9f173..a9eba27c5b2 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlCreateOrAlterMaterializedTableConverter.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.SchemaResolver;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.operations.Operation;
@@ -103,7 +104,9 @@ public class SqlCreateOrAlterMaterializedTableConverter
final ObjectIdentifier identifier) {
final MergeContext mergeContext =
getMergeContext(sqlCreateOrAlterTable, context);
return new FullAlterMaterializedTableOperation(
- identifier, buildTableChanges(mergeContext), oldTable);
+ identifier,
+ buildTableChanges(mergeContext,
context.getCatalogManager().getSchemaResolver()),
+ oldTable);
}
private Operation handleCreate(
@@ -117,14 +120,16 @@ public class SqlCreateOrAlterMaterializedTableConverter
}
private Function<ResolvedCatalogMaterializedTable, List<TableChange>>
buildTableChanges(
- final MergeContext mergeContext) {
+ final MergeContext mergeContext, final SchemaResolver
schemaResolver) {
return oldTable -> {
final List<TableChange> changes = new ArrayList<>();
final ResolvedSchema oldSchema = oldTable.getResolvedSchema();
final List<Column> newColumns =
MaterializedTableUtils.validateAndExtractNewColumns(
- oldSchema, mergeContext.getMergedQuerySchema());
+ oldSchema,
+
schemaResolver.resolve(mergeContext.getMergedSchema()),
+ mergeContext.hasSchemaDefinition());
newColumns.forEach(column -> changes.add(TableChange.add(column)));
changes.add(
@@ -173,6 +178,13 @@ public class SqlCreateOrAlterMaterializedTableConverter
SqlCreateOrAlterMaterializedTableConverter.this.getQueryResolvedSchema(
sqlCreateMaterializedTable, context);
+ @Override
+ public boolean hasSchemaDefinition() {
+ final SqlNodeList sqlNodeList =
sqlCreateMaterializedTable.getColumnList();
+ return !sqlNodeList.getList().isEmpty()
+ && sqlNodeList.getList().get(0) instanceof
SqlRegularColumn;
+ }
+
@Override
public Schema getMergedSchema() {
final Set<String> querySchemaColumnNames =
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
index 954f876240b..7d3b2898124 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/MaterializedTableUtils.java
@@ -279,7 +279,7 @@ public class MaterializedTableUtils {
}
public static List<Column> validateAndExtractNewColumns(
- ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+ ResolvedSchema oldSchema, ResolvedSchema newSchema, boolean
schemaDefinedInQuery) {
final List<Column> newColumns = getPersistedColumns(newSchema);
final List<Column> oldColumns = getPersistedColumns(oldSchema);
final int originalColumnSize = oldColumns.size();
@@ -310,7 +310,10 @@ public class MaterializedTableUtils {
final List<Column> newAddedColumns = new ArrayList<>();
for (int i = oldColumns.size(); i < newColumns.size(); i++) {
Column newColumn = newColumns.get(i);
-
newAddedColumns.add(newColumn.copy(newColumn.getDataType().nullable()));
+ newAddedColumns.add(
+ schemaDefinedInQuery
+ ? newColumn
+ :
newColumn.copy(newColumn.getDataType().nullable()));
}
return newAddedColumns;
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 02d4275c8e4..bcb1705eb03 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -583,9 +583,9 @@ class SqlMaterializedTableNodeToOperationConverterTest
@Test
void testAlterMaterializedTableAsQueryWithConflictColumnName() {
- String sql5 = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c,
d, c as a FROM t3";
+ String sql = "ALTER MATERIALIZED TABLE base_mtbl AS SELECT a, b, c, d,
c as a FROM t3";
AlterMaterializedTableAsQueryOperation
sqlAlterMaterializedTableAsQuery =
- (AlterMaterializedTableAsQueryOperation) parse(sql5);
+ (AlterMaterializedTableAsQueryOperation) parse(sql);
assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
.containsExactly(
@@ -596,6 +596,48 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ "FROM `builtin`.`default`.`t3` AS
`t3`"));
}
+ @Test
+ void testAlterMaterializedTableAsQueryWithDefinedSchema() {
+ String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE base_mtbl ("
+ + "`a` BIGINT NOT NULL, `b` STRING, `c` INT, `d`
STRING, `a1` BIGINT NOT NULL, `f` INT) "
+ + "AS SELECT a, b, c, d, a as `a1`, 3 as f FROM t3";
+ FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery =
+ (FullAlterMaterializedTableOperation) parse(sql);
+
+ assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
+ .containsExactly(
+ // If NOT NULL is defined in schema, it should stay
+ TableChange.add(Column.physical("a1",
DataTypes.BIGINT().notNull())),
+ TableChange.add(Column.physical("f", DataTypes.INT())),
+ TableChange.modifyDefinitionQuery(
+ "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS
`f`\nFROM `t3`",
+ "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`,
`t3`.`d`, `t3`.`a` AS `a1`, 3 AS `f`\n"
+ + "FROM `builtin`.`default`.`t3` AS
`t3`"),
+ TableChange.reset("connector"),
+ TableChange.reset("format"));
+ }
+
+ @Test
+ void testAlterMaterializedTableAsQueryWithoutDefinedSchema() {
+ String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE base_mtbl "
+ + "AS SELECT a, b, c, d, a as `a1` FROM t3";
+ FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery =
+ (FullAlterMaterializedTableOperation) parse(sql);
+
+ assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
+ .containsExactly(
+ // No explicit schema, so nullable will be used
+ TableChange.add(Column.physical("a1",
DataTypes.BIGINT())),
+ TableChange.modifyDefinitionQuery(
+ "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM
`t3`",
+ "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`,
`t3`.`d`, `t3`.`a` AS `a1`\n"
+ + "FROM `builtin`.`default`.`t3` AS
`t3`"),
+ TableChange.reset("connector"),
+ TableChange.reset("format"));
+ }
+
@Test
void testDropMaterializedTable() {
final String sql = "DROP MATERIALIZED TABLE mtbl1";
@@ -1291,6 +1333,16 @@ class SqlMaterializedTableNodeToOperationConverterTest
+ " `sec` CHAR(1)\n"
+ ")"));
+ list.add(
+ TestSpec.withExpectedSchema(
+ "CREATE OR ALTER MATERIALIZED TABLE
base_mtbl_with_non_persisted (`EXPR$0` INT NOT NULL, `sec` CHAR(1)) AS SELECT
2, 'a' AS sec",
+ "(\n"
+ + " `m` STRING METADATA VIRTUAL,\n"
+ + " `calc` AS ['a' || 'b'],\n"
+ + " `EXPR$0` INT NOT NULL,\n"
+ + " `sec` CHAR(1)\n"
+ + ")"));
+
list.add(
TestSpec.withExpectedSchema(
// Schema doesn't change, so should be ok
@@ -1343,6 +1395,14 @@ class SqlMaterializedTableNodeToOperationConverterTest
ResolvedSchema.of(
Column.physical("shop_id", DataTypes.BIGINT()),
Column.physical("user_id", DataTypes.INT()))),
+ Arguments.of(
+ operation
+ + "MATERIALIZED TABLE users_shops (user_id
INT, shop_id BIGINT NOT NULL)"
+ + " FRESHNESS = INTERVAL '30' SECOND"
+ + " AS SELECT 1 AS shop_id, 2 AS user_id",
+ ResolvedSchema.of(
+ Column.physical("shop_id",
DataTypes.BIGINT().notNull()),
+ Column.physical("user_id", DataTypes.INT()))),
Arguments.of(
operation
+ "MATERIALIZED TABLE users_shops (user_id
INT, shop_id BIGINT, PRIMARY KEY(user_id) NOT ENFORCED)"