This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d257de5ca9 [flink] Don't allow converting nullable columns and fields 
to non nullable (#5630)
d257de5ca9 is described below

commit d257de5ca9d3eb567ef5e6c2407f0ff4f65a1897
Author: Ashish Khatkar <[email protected]>
AuthorDate: Wed May 21 11:18:08 2025 +0100

    [flink] Don't allow converting nullable columns and fields to non nullable 
(#5630)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  12 ++
 .../org/apache/paimon/schema/SchemaManager.java    |  46 ++++++-
 .../org/apache/paimon/catalog/CatalogTestBase.java |   7 +
 .../apache/paimon/flink/SchemaChangeITCase.java    | 152 +++++++++++++++++++++
 5 files changed, 217 insertions(+), 6 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 4cd943f8b2..b8fdc7b9ac 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -32,6 +32,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to remove the whole row in aggregation engine when -D 
records are received.</td>
         </tr>
+        <tr>
+            <td><h5>alter-column-null-to-not-null.disabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>If true, it disables altering column type from null to not 
null. Default is true. Users can disable this option to explicitly convert null 
column type to not null.</td>
+        </tr>
         <tr>
             <td><h5>async-file-write</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index f3db58af6b..6473ca6240 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1721,6 +1721,14 @@ public class CoreOptions implements Serializable {
                     .noDefaultValue()
                     .withDescription("The serialized refresh handler of 
materialized table.");
 
+    public static final ConfigOption<Boolean> 
DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL =
+            ConfigOptions.key("alter-column-null-to-not-null.disabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "If true, it disables altering column type from 
null to not null. Default is true. "
+                                    + "Users can disable this option to 
explicitly convert null column type to not null.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -2232,6 +2240,10 @@ public class CoreOptions implements Serializable {
         return options.get(MANIFEST_DELETE_FILE_DROP_STATS);
     }
 
+    public boolean disableNullToNotNull() {
+        return options.get(DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL);
+    }
+
     public LookupStrategy lookupStrategy() {
         return LookupStrategy.from(
                 mergeEngine().equals(MergeEngine.FIRST_ROW),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index c31a3a616d..cf60da9cf6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -301,6 +301,13 @@ public class SchemaManager implements Serializable {
             throws Catalog.ColumnAlreadyExistException, 
Catalog.ColumnNotExistException {
         Map<String, String> oldOptions = new 
HashMap<>(oldTableSchema.options());
         Map<String, String> newOptions = new 
HashMap<>(oldTableSchema.options());
+        boolean disableNullToNotNull =
+                Boolean.parseBoolean(
+                        oldOptions.getOrDefault(
+                                
CoreOptions.DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL.key(),
+                                
CoreOptions.DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL
+                                        .defaultValue()
+                                        .toString()));
         List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
         AtomicInteger highestFieldId = new 
AtomicInteger(oldTableSchema.highestFieldId());
         String newComment = oldTableSchema.comment();
@@ -413,6 +420,12 @@ public class SchemaManager implements Serializable {
                             DataType targetType = update.newDataType();
                             if (update.keepNullability()) {
                                 targetType = 
targetType.copy(field.type().isNullable());
+                            } else {
+                                assertNullabilityChange(
+                                        field.type().isNullable(),
+                                        update.newDataType().isNullable(),
+                                        
StringUtils.join(Arrays.asList(update.fieldNames()), "."),
+                                        disableNullToNotNull);
                             }
                             checkState(
                                     
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
@@ -435,12 +448,18 @@ public class SchemaManager implements Serializable {
                 updateNestedColumn(
                         newFields,
                         update.fieldNames(),
-                        (field) ->
-                                new DataField(
-                                        field.id(),
-                                        field.name(),
-                                        
field.type().copy(update.newNullability()),
-                                        field.description()));
+                        (field) -> {
+                            assertNullabilityChange(
+                                    field.type().isNullable(),
+                                    update.newNullability(),
+                                    
StringUtils.join(Arrays.asList(update.fieldNames()), "."),
+                                    disableNullToNotNull);
+                            return new DataField(
+                                    field.id(),
+                                    field.name(),
+                                    field.type().copy(update.newNullability()),
+                                    field.description());
+                        });
             } else if (change instanceof UpdateColumnComment) {
                 UpdateColumnComment update = (UpdateColumnComment) change;
                 updateNestedColumn(
@@ -483,6 +502,21 @@ public class SchemaManager implements Serializable {
                 newSchema.comment());
     }
 
+    private void assertNullabilityChange(
+            boolean oldNullability,
+            boolean newNullability,
+            String fieldName,
+            boolean disableNullToNotNull) {
+        if (disableNullToNotNull && oldNullability && !newNullability) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Cannot update column type from nullable to non 
nullable for %s. "
+                                    + "You can set table configuration option 
'alter-column-null-to-not-null.disabled' = 'false' "
+                                    + "to allow converting null columns to not 
null",
+                            fieldName));
+        }
+    }
+
     public void applyMove(List<DataField> newFields, SchemaChange.Move move) {
         Map<String, Integer> map = new HashMap<>();
         for (int i = 0; i < newFields.size(); i++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index da60f47b8b..a877408e41 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -1011,6 +1011,13 @@ public abstract class CatalogTestBase {
                         ""),
                 false);
 
+        catalog.alterTable(
+                identifier,
+                Lists.newArrayList(
+                        SchemaChange.setOption(
+                                
CoreOptions.DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL.key(), "false")),
+                false);
+
         catalog.alterTable(
                 identifier,
                 
Lists.newArrayList(SchemaChange.updateColumnNullability("col1", false)),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index eafa3b6916..4dcdf3284c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -700,6 +700,7 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
                                 + "  `e` FLOAT");
 
         // Nullable -> not null
+        sql("ALTER TABLE T SET ('alter-column-null-to-not-null.disabled' = 
'false')");
         sql("ALTER TABLE T MODIFY c STRING NOT NULL");
         result = sql("SHOW CREATE TABLE T");
         assertThat(result.toString())
@@ -1199,4 +1200,155 @@ public class SchemaChangeITCase extends 
CatalogITCaseBase {
                                 },
                                 3));
     }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateNullabilityPrimitiveType(String formatType) {
+        sql(
+                "CREATE TABLE T "
+                        + "( k INT, v INT NOT NULL, PRIMARY KEY (k) NOT 
ENFORCED ) "
+                        + "WITH ( 'bucket' = '1', 'file.format' = '"
+                        + formatType
+                        + "' )");
+        sql("INSERT INTO T VALUES (1, 100), (2, 200)");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200));
+
+        sql("ALTER TABLE T MODIFY v INT"); // convert non nullable to nullable
+        sql("INSERT INTO T VALUES " + "(3, CAST(NULL AS INT))");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(1, 100), Row.of(2, 200), 
Row.of(3, null));
+
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY v INT NOT NULL"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v");
+    }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateNullabilityRowType(String formatType) {
+        sql(
+                "CREATE TABLE T "
+                        + "( k INT, v ROW(f1 INT, f2 INT NOT NULL) NOT NULL, 
PRIMARY KEY (k) NOT ENFORCED ) "
+                        + "WITH ( 'bucket' = '1', 'file.format' = '"
+                        + formatType
+                        + "' )");
+        sql("INSERT INTO T VALUES (1, ROW(10, 100)), (2, ROW(20, 200))");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(1, Row.of(10, 100)), 
Row.of(2, Row.of(20, 200)));
+
+        sql("ALTER TABLE T MODIFY (v ROW(f1 INT, f2 INT) NOT NULL)"); // 
convert non nullable
+        // field in row to
+        // nullable
+        sql("INSERT INTO T VALUES " + "(3, ROW(30, CAST(NULL AS INT)))");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, Row.of(10, 100)),
+                        Row.of(2, Row.of(20, 200)),
+                        Row.of(3, Row.of(30, null)));
+
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 INT NOT NULL, 
f2 INT) NOT NULL)"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v.f1");
+
+        sql("ALTER TABLE T MODIFY (v ROW(f1 INT, f2 INT))"); // convert entire 
row to nullable
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 INT, f2 INT) 
NOT NULL)"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v");
+    }
+
+    @ParameterizedTest()
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateNullabilityArrayAndMapType(String formatType) {
+        sql(
+                "CREATE TABLE T "
+                        + "( k INT, v1 ARRAY<ROW(f1 INT, f2 INT) NOT NULL>, v2 
MAP<INT, ROW(f1 INT, f2 INT) NOT NULL>, PRIMARY KEY (k) NOT ENFORCED ) "
+                        + "WITH ( 'bucket' = '1', 'file.format' = '"
+                        + formatType
+                        + "' )");
+        sql(
+                "INSERT INTO T VALUES "
+                        + "(1, ARRAY[ROW(10, 100), ROW(20, 200)], MAP[11, 
ROW(10, 100), 12, ROW(11, 110)]), "
+                        + "(2, ARRAY[ROW(30, 300), ROW(40, 400)], MAP[21, 
ROW(20, 200), 22, ROW(21, 210)])");
+        Map<Integer, Row> map1 = new HashMap<>();
+        map1.put(11, Row.of(10, 100));
+        map1.put(12, Row.of(11, 110));
+
+        Map<Integer, Row> map2 = new HashMap<>();
+        map2.put(21, Row.of(20, 200));
+        map2.put(22, Row.of(21, 210));
+
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, new Row[] {Row.of(10, 100), Row.of(20, 
200)}, map1),
+                        Row.of(2, new Row[] {Row.of(30, 300), Row.of(40, 
400)}, map2));
+
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "ALTER TABLE T MODIFY (v1 ARRAY<ROW(f1 
INT, f2 INT) NOT NULL> NOT NULL)"))
+                .hasRootCauseMessage(
+                        "Cannot update column type from nullable to non 
nullable for v1. You can set table configuration option 
'alter-column-null-to-not-null.disabled' = 'false' to allow converting null 
columns to not null");
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "ALTER TABLE T MODIFY (v1 ARRAY<ROW(f1 
INT NOT NULL, f2 INT) NOT NULL>)"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v1.element.f1");
+
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "ALTER TABLE T MODIFY (v2 MAP<INT, 
ROW(f1 INT, f2 INT) NOT NULL> NOT NULL)"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v2");
+
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "ALTER TABLE T MODIFY (v2 MAP<INT, 
ROW(f1 INT, f2 INT NOT NULL) NOT NULL>)"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v2.value.f2");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"orc", "avro", "parquet"})
+    public void testUpdateNullabilityByEnablingNullToNotNullOption(String 
formatType) {
+        sql(
+                "CREATE TABLE T "
+                        + "( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) "
+                        + "WITH ( 'bucket' = '1', 'file.format' = '"
+                        + formatType
+                        + "' )");
+
+        sql("INSERT INTO T VALUES (1, 10), (2, 20)");
+        assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 
10), Row.of(2, 20));
+
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY v INT NOT NULL"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v");
+
+        // enable null to not null option
+        sql("ALTER TABLE T SET ('alter-column-null-to-not-null.disabled' = 
'false')");
+        sql("ALTER TABLE T MODIFY v INT NOT NULL");
+        assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 
10), Row.of(2, 20));
+    }
+
+    @Test
+    public void testAlterColumnTypeWithNullabilityUpdate() {
+        sql("CREATE TABLE T ( k INT, v INT, PRIMARY KEY(k) NOT ENFORCED )");
+
+        sql("INSERT INTO T VALUES (1, 10), (2, 20)");
+        assertThat(sql("SELECT * FROM T")).containsExactlyInAnyOrder(Row.of(1, 
10), Row.of(2, 20));
+
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY v BIGINT NOT NULL"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v");
+
+        // enable null to not null option
+        sql("ALTER TABLE T SET ('alter-column-null-to-not-null.disabled' = 
'false')");
+        sql("ALTER TABLE T MODIFY v BIGINT NOT NULL");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(1, 10L), Row.of(2, 20L));
+    }
 }

Reply via email to