This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c44c0e6974 [core] Add support for map value type and array element 
type evolution (#5673)
c44c0e6974 is described below

commit c44c0e6974be3b0871521ea2238c341eb72b8db0
Author: Ashish Khatkar <[email protected]>
AuthorDate: Tue Jun 3 12:06:49 2025 +0100

    [core] Add support for map value type and array element type evolution 
(#5673)
---
 .../org/apache/paimon/schema/SchemaManager.java    | 165 +++++++++++-----
 .../apache/paimon/flink/SchemaChangeITCase.java    | 210 +++++++++++++++++++++
 2 files changed, 333 insertions(+), 42 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index cf60da9cf6..59c3c42dd5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -70,7 +70,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
+import java.util.function.BiFunction;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
@@ -344,7 +344,8 @@ public class SchemaManager implements Serializable {
 
                 new NestedColumnModifier(addColumn.fieldNames()) {
                     @Override
-                    protected void updateLastColumn(List<DataField> newFields, 
String fieldName)
+                    protected void updateLastColumn(
+                            int depth, List<DataField> newFields, String 
fieldName)
                             throws Catalog.ColumnAlreadyExistException {
                         assertColumnNotExists(newFields, fieldName);
 
@@ -374,7 +375,8 @@ public class SchemaManager implements Serializable {
                 assertNotUpdatingPrimaryKeys(oldTableSchema, 
rename.fieldNames(), "rename");
                 new NestedColumnModifier(rename.fieldNames()) {
                     @Override
-                    protected void updateLastColumn(List<DataField> newFields, 
String fieldName)
+                    protected void updateLastColumn(
+                            int depth, List<DataField> newFields, String 
fieldName)
                             throws Catalog.ColumnNotExistException,
                                     Catalog.ColumnAlreadyExistException {
                         assertColumnExists(newFields, fieldName);
@@ -401,7 +403,8 @@ public class SchemaManager implements Serializable {
                 dropColumnValidation(oldTableSchema, drop);
                 new NestedColumnModifier(drop.fieldNames()) {
                     @Override
-                    protected void updateLastColumn(List<DataField> newFields, 
String fieldName)
+                    protected void updateLastColumn(
+                            int depth, List<DataField> newFields, String 
fieldName)
                             throws Catalog.ColumnNotExistException {
                         assertColumnExists(newFields, fieldName);
                         newFields.removeIf(f -> f.name().equals(fieldName));
@@ -416,26 +419,37 @@ public class SchemaManager implements Serializable {
                 updateNestedColumn(
                         newFields,
                         update.fieldNames(),
-                        (field) -> {
-                            DataType targetType = update.newDataType();
+                        (field, depth) -> {
+                            // find the dataType at depth and update the type 
for it
+                            DataType sourceRootType =
+                                    getRootType(field.type(), depth, 
update.fieldNames().length);
+                            DataType targetRootType = update.newDataType();
                             if (update.keepNullability()) {
-                                targetType = 
targetType.copy(field.type().isNullable());
+                                targetRootType = 
targetRootType.copy(sourceRootType.isNullable());
                             } else {
                                 assertNullabilityChange(
-                                        field.type().isNullable(),
-                                        update.newDataType().isNullable(),
+                                        sourceRootType.isNullable(),
+                                        targetRootType.isNullable(),
                                         
StringUtils.join(Arrays.asList(update.fieldNames()), "."),
                                         disableNullToNotNull);
                             }
                             checkState(
-                                    
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
-                                            && 
CastExecutors.resolve(field.type(), targetType)
+                                    DataTypeCasts.supportsExplicitCast(
+                                                    sourceRootType, 
targetRootType)
+                                            && 
CastExecutors.resolve(sourceRootType, targetRootType)
                                                     != null,
                                     String.format(
                                             "Column type %s[%s] cannot be 
converted to %s without loosing information.",
-                                            field.name(), field.type(), 
targetType));
+                                            field.name(), sourceRootType, 
targetRootType));
                             return new DataField(
-                                    field.id(), field.name(), targetType, 
field.description());
+                                    field.id(),
+                                    field.name(),
+                                    getArrayMapTypeWithTargetTypeRoot(
+                                            field.type(),
+                                            targetRootType,
+                                            depth,
+                                            update.fieldNames().length),
+                                    field.description());
                         });
             } else if (change instanceof UpdateColumnNullability) {
                 UpdateColumnNullability update = (UpdateColumnNullability) 
change;
@@ -448,16 +462,24 @@ public class SchemaManager implements Serializable {
                 updateNestedColumn(
                         newFields,
                         update.fieldNames(),
-                        (field) -> {
+                        (field, depth) -> {
+                            // find the DataType at depth and update that 
DataTypes nullability
+                            DataType sourceRootType =
+                                    getRootType(field.type(), depth, 
update.fieldNames().length);
                             assertNullabilityChange(
-                                    field.type().isNullable(),
+                                    sourceRootType.isNullable(),
                                     update.newNullability(),
                                     
StringUtils.join(Arrays.asList(update.fieldNames()), "."),
                                     disableNullToNotNull);
+                            sourceRootType = 
sourceRootType.copy(update.newNullability());
                             return new DataField(
                                     field.id(),
                                     field.name(),
-                                    field.type().copy(update.newNullability()),
+                                    getArrayMapTypeWithTargetTypeRoot(
+                                            field.type(),
+                                            sourceRootType,
+                                            depth,
+                                            update.fieldNames().length),
                                     field.description());
                         });
             } else if (change instanceof UpdateColumnComment) {
@@ -465,7 +487,7 @@ public class SchemaManager implements Serializable {
                 updateNestedColumn(
                         newFields,
                         update.fieldNames(),
-                        (field) ->
+                        (field, depth) ->
                                 new DataField(
                                         field.id(),
                                         field.name(),
@@ -502,6 +524,58 @@ public class SchemaManager implements Serializable {
                 newSchema.comment());
     }
 
+    // gets the rootType at the defined depth
+    // ex: ARRAY<MAP<STRING, ARRAY<INT>>>
+    // if we want to update ARRAY<INT> -> ARRAY<BIGINT>
+    // the maxDepth will be based on updateFieldNames
+    // which in the case will be [v, element, value, element],
+    // so maxDepth is 4 and return DataType will be INT
+    private DataType getRootType(DataType type, int currDepth, int maxDepth) {
+        if (currDepth == maxDepth - 1) {
+            return type;
+        }
+        switch (type.getTypeRoot()) {
+            case ARRAY:
+                return getRootType(((ArrayType) type).getElementType(), 
currDepth + 1, maxDepth);
+            case MAP:
+                return getRootType(((MapType) type).getValueType(), currDepth 
+ 1, maxDepth);
+            default:
+                return type;
+        }
+    }
+
+    // builds the targetType from source type based on the maxDepth which 
needs to be updated
+    // ex: ARRAY<MAP<STRING, ARRAY<INT>>> -> ARRAY<MAP<STRING, ARRAY<BIGINT>>>
+    // here we only need to update type of ARRAY<INT> to ARRAY<BIGINT> and 
rest of the type
+    // remains same. This function achieves this.
+    private DataType getArrayMapTypeWithTargetTypeRoot(
+            DataType source, DataType target, int currDepth, int maxDepth) {
+        if (currDepth == maxDepth - 1) {
+            return target;
+        }
+        switch (source.getTypeRoot()) {
+            case ARRAY:
+                return new ArrayType(
+                        source.isNullable(),
+                        getArrayMapTypeWithTargetTypeRoot(
+                                ((ArrayType) source).getElementType(),
+                                target,
+                                currDepth + 1,
+                                maxDepth));
+            case MAP:
+                return new MapType(
+                        source.isNullable(),
+                        ((MapType) source).getKeyType(),
+                        getArrayMapTypeWithTargetTypeRoot(
+                                ((MapType) source).getValueType(),
+                                target,
+                                currDepth + 1,
+                                maxDepth));
+            default:
+                return target;
+        }
+    }
+
     private void assertNullabilityChange(
             boolean oldNullability,
             boolean newNullability,
@@ -671,10 +745,22 @@ public class SchemaManager implements Serializable {
             this.updateFieldNames = updateFieldNames;
         }
 
-        public void updateIntermediateColumn(List<DataField> newFields, int 
depth)
+        private void updateIntermediateColumn(
+                List<DataField> newFields, List<DataField> previousFields, int 
depth, int prevDepth)
                 throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException {
             if (depth == updateFieldNames.length - 1) {
-                updateLastColumn(newFields, updateFieldNames[depth]);
+                updateLastColumn(depth, newFields, updateFieldNames[depth]);
+                return;
+            } else if (depth >= updateFieldNames.length) {
+                // to handle the case of ARRAY or MAP type evolution
+                // for instance : ARRAY<INT> -> ARRAY<BIGINT>
+                // the updateFieldNames in this case is [v, element] where v 
is array field name
+                // the depth returned by extractRowDataFields is 2 which will 
overflow.
+                // So the logic is to go to previous depth and update the 
column using previous
+                // fields which will have DataFields from prevDepth
+                // The reason for this handling is the addition of element and 
value for array
+                // and map type in FlinkCatalog as dummy column name
+                updateLastColumn(prevDepth, previousFields, 
updateFieldNames[prevDepth]);
                 return;
             }
 
@@ -683,13 +769,10 @@ public class SchemaManager implements Serializable {
                 if (!field.name().equals(updateFieldNames[depth])) {
                     continue;
                 }
-
-                String fullFieldName =
-                        String.join(".", 
Arrays.asList(updateFieldNames).subList(0, depth + 1));
                 List<DataField> nestedFields = new ArrayList<>();
-                int newDepth =
-                        depth + extractRowDataFields(field.type(), 
fullFieldName, nestedFields);
-                updateIntermediateColumn(nestedFields, newDepth);
+                int newDepth = depth + extractRowDataFields(field.type(), 
nestedFields);
+                updateIntermediateColumn(nestedFields, newFields, newDepth, 
depth);
+                field = newFields.get(i);
                 newFields.set(
                         i,
                         new DataField(
@@ -705,25 +788,23 @@ public class SchemaManager implements Serializable {
                     String.join(".", 
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
         }
 
-        private int extractRowDataFields(
-                DataType type, String fullFieldName, List<DataField> 
nestedFields) {
+        public void updateIntermediateColumn(List<DataField> newFields, int 
depth)
+                throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException {
+            updateIntermediateColumn(newFields, newFields, depth, depth);
+        }
+
+        private int extractRowDataFields(DataType type, List<DataField> 
nestedFields) {
             switch (type.getTypeRoot()) {
                 case ROW:
                     nestedFields.addAll(((RowType) type).getFields());
                     return 1;
                 case ARRAY:
-                    return extractRowDataFields(
-                                    ((ArrayType) type).getElementType(),
-                                    fullFieldName,
-                                    nestedFields)
+                    return extractRowDataFields(((ArrayType) 
type).getElementType(), nestedFields)
                             + 1;
                 case MAP:
-                    return extractRowDataFields(
-                                    ((MapType) type).getValueType(), 
fullFieldName, nestedFields)
-                            + 1;
+                    return extractRowDataFields(((MapType) 
type).getValueType(), nestedFields) + 1;
                 default:
-                    throw new IllegalArgumentException(
-                            fullFieldName + " is not a structured type.");
+                    return 1;
             }
         }
 
@@ -742,12 +823,12 @@ public class SchemaManager implements Serializable {
                             mapType.getKeyType(),
                             wrapNewRowType(mapType.getValueType(), 
nestedFields));
                 default:
-                    throw new IllegalStateException(
-                            "Trying to wrap a row type in " + type + ". This 
is unexpected.");
+                    return type;
             }
         }
 
-        protected abstract void updateLastColumn(List<DataField> newFields, 
String fieldName)
+        protected abstract void updateLastColumn(
+                int depth, List<DataField> newFields, String fieldName)
                 throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException;
 
         protected void assertColumnExists(List<DataField> newFields, String 
fieldName)
@@ -786,11 +867,11 @@ public class SchemaManager implements Serializable {
     private void updateNestedColumn(
             List<DataField> newFields,
             String[] updateFieldNames,
-            Function<DataField, DataField> updateFunc)
+            BiFunction<DataField, Integer, DataField> updateFunc)
             throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException {
         new NestedColumnModifier(updateFieldNames) {
             @Override
-            protected void updateLastColumn(List<DataField> newFields, String 
fieldName)
+            protected void updateLastColumn(int depth, List<DataField> 
newFields, String fieldName)
                     throws Catalog.ColumnNotExistException {
                 for (int i = 0; i < newFields.size(); i++) {
                     DataField field = newFields.get(i);
@@ -798,7 +879,7 @@ public class SchemaManager implements Serializable {
                         continue;
                     }
 
-                    newFields.set(i, updateFunc.apply(field));
+                    newFields.set(i, updateFunc.apply(field, depth));
                     return;
                 }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 4dcdf3284c..03b568202e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -1351,4 +1351,214 @@ public class SchemaChangeITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM T"))
                 .containsExactlyInAnyOrder(Row.of(1, 10L), Row.of(2, 20L));
     }
+
+    @Test
+    public void testAlterColumnTypeNestedArrayAndMap() {
+        sql("CREATE TABLE T ( k INT, v ARRAY<ARRAY<ARRAY<INT>>>, PRIMARY 
KEY(k) NOT ENFORCED )");
+        sql("INSERT INTO T VALUES (1, ARRAY[ARRAY[ARRAY[1, 2]]]), (2, 
ARRAY[ARRAY[ARRAY[3, 4]]])");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, new Integer[][][] {{{1, 2}}}),
+                        Row.of(2, new Integer[][][] {{{3, 4}}}));
+        sql("ALTER TABLE T MODIFY v ARRAY<ARRAY<ARRAY<BIGINT>>>");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, new Long[][][] {{{1L, 2L}}}),
+                        Row.of(2, new Long[][][] {{{3L, 4L}}}));
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY v 
ARRAY<ARRAY<ARRAY<BIGINT NOT NULL>>>"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v.element.element.element");
+
+        sql("DROP TABLE T");
+
+        sql(
+                "CREATE TABLE T ( k INT, v MAP<STRING, MAP<STRING, MAP<STRING, 
INT NOT NULL>>>, PRIMARY KEY(k) NOT ENFORCED )");
+        Map<String, Map<String, Map<String, Integer>>> mp1 = new HashMap<>();
+        Map<String, Map<String, Integer>> l1Mp1 = new HashMap<>();
+        Map<String, Integer> l2Mp1 = new HashMap<>();
+        l2Mp1.put("aaa", 1);
+        l2Mp1.put("aab", 2);
+        l1Mp1.put("aa", l2Mp1);
+        mp1.put("a", l1Mp1);
+        Map<String, Map<String, Map<String, Integer>>> mp2 = new HashMap<>();
+        Map<String, Map<String, Integer>> l1Mp2 = new HashMap<>();
+        Map<String, Integer> l2Mp2 = new HashMap<>();
+        l2Mp2.put("bbb", 3);
+        l2Mp2.put("bbc", 4);
+        l1Mp2.put("bb", l2Mp2);
+        mp2.put("b", l1Mp2);
+        sql(
+                "INSERT INTO T VALUES (1, MAP['a', MAP['aa', MAP['aaa', 1, 
'aab', 2]]]), (2, MAP['b', MAP['bb', MAP['bbb', 3, 'bbc', 4]]])");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(1, mp1), Row.of(2, mp2));
+        sql("ALTER TABLE T MODIFY v MAP<STRING, MAP<STRING, MAP<STRING, 
BIGINT>>>");
+        Map<String, Map<String, Map<String, Long>>> mp3 = new HashMap<>();
+        Map<String, Map<String, Long>> l1Mp3 = new HashMap<>();
+        Map<String, Long> l2Mp3 = new HashMap<>();
+        l2Mp3.put("aaa", 1L);
+        l2Mp3.put("aab", 2L);
+        l1Mp3.put("aa", l2Mp3);
+        mp3.put("a", l1Mp3);
+        Map<String, Map<String, Map<String, Long>>> mp4 = new HashMap<>();
+        Map<String, Map<String, Long>> l1Mp4 = new HashMap<>();
+        Map<String, Long> l2Mp4 = new HashMap<>();
+        l2Mp4.put("bbb", 3L);
+        l2Mp4.put("bbc", 4L);
+        l1Mp4.put("bb", l2Mp4);
+        mp4.put("b", l1Mp4);
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(1, mp3), Row.of(2, mp4));
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "ALTER TABLE T MODIFY v MAP<STRING, 
MAP<STRING, MAP<STRING, BIGINT NOT NULL>>>"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for v.value.value.value");
+
+        sql("DROP TABLE T");
+
+        sql(
+                "CREATE TABLE T ( k INT, a ARRAY<INT NOT NULL>, b MAP<STRING, 
INT NOT NULL>, PRIMARY KEY(k) NOT ENFORCED )");
+        sql("INSERT INTO T VALUES (1, ARRAY[1, 2, 3, 4], MAP['a', 1, 'b', 
2])");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                1,
+                                new Integer[] {1, 2, 3, 4},
+                                new HashMap<String, Integer>() {
+                                    {
+                                        put("a", 1);
+                                    }
+
+                                    {
+                                        put("b", 2);
+                                    }
+                                }));
+        sql("ALTER TABLE T MODIFY a ARRAY<BIGINT>");
+        sql("INSERT INTO T VALUES (2, ARRAY[5, 6, 7, CAST(NULL AS BIGINT)], 
MAP['c', 3, 'd', 4])");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                1,
+                                new Long[] {1L, 2L, 3L, 4L},
+                                new HashMap<String, Integer>() {
+                                    {
+                                        put("a", 1);
+                                    }
+
+                                    {
+                                        put("b", 2);
+                                    }
+                                }),
+                        Row.of(
+                                2,
+                                new Long[] {5L, 6L, 7L, null},
+                                new HashMap<String, Integer>() {
+                                    {
+                                        put("c", 3);
+                                    }
+
+                                    {
+                                        put("d", 4);
+                                    }
+                                }));
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY a ARRAY<BIGINT NOT 
NULL>"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for a.element");
+        sql("ALTER TABLE T MODIFY b MAP<STRING, BIGINT>");
+        sql(
+                "INSERT INTO T VALUES (2, ARRAY[5, 6, 7, CAST(NULL AS 
BIGINT)], MAP['c', 3, 'd', CAST(NULL AS BIGINT)])");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                1,
+                                new Long[] {1L, 2L, 3L, 4L},
+                                new HashMap<String, Long>() {
+                                    {
+                                        put("a", 1L);
+                                    }
+
+                                    {
+                                        put("b", 2L);
+                                    }
+                                }),
+                        Row.of(
+                                2,
+                                new Long[] {5L, 6L, 7L, null},
+                                new HashMap<String, Long>() {
+                                    {
+                                        put("c", 3L);
+                                    }
+
+                                    {
+                                        put("d", null);
+                                    }
+                                }));
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY b MAP<STRING, BIGINT 
NOT NULL>"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for b.value");
+
+        sql("DROP TABLE T");
+        sql(
+                "CREATE TABLE T ( k INT, a MAP<STRING, ARRAY<INT NOT NULL>>, 
PRIMARY KEY(k) NOT ENFORCED )");
+        sql("INSERT INTO T VALUES (1, MAP['a', ARRAY[1, 2, 3]])");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                1,
+                                new HashMap<String, Integer[]>() {
+                                    {
+                                        put("a", new Integer[] {1, 2, 3});
+                                    }
+                                }));
+        sql("ALTER TABLE T MODIFY a MAP<STRING, ARRAY<BIGINT>>");
+        sql(
+                "INSERT INTO T VALUES(1, MAP['a', ARRAY[1, 2, 3], 'b', 
ARRAY[2, 3, CAST(NULL AS BIGINT)]])");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(
+                                1,
+                                new HashMap<String, Long[]>() {
+                                    {
+                                        put("a", new Long[] {1L, 2L, 3L});
+                                    }
+
+                                    {
+                                        put("b", new Long[] {2L, 3L, null});
+                                    }
+                                }));
+        assertThatCode(() -> sql("ALTER TABLE T MODIFY a MAP<STRING, 
ARRAY<BIGINT NOT NULL>>"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for a.value.element");
+
+        sql("DROP TABLE T");
+
+        sql(
+                "CREATE TABLE T ( k INT, a ROW(c1 DOUBLE, c2 ARRAY<BOOLEAN> 
NOT NULL) NOT NULL, PRIMARY KEY(k) NOT ENFORCED )");
+        sql("INSERT INTO T VALUES (1, ROW(1.0, ARRAY[true, false]))");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(Row.of(1, Row.of(1.0, new Boolean[] 
{true, false})));
+        sql("ALTER TABLE T MODIFY a ROW(c1 DOUBLE, c2 ARRAY<BOOLEAN>) NOT 
NULL");
+        sql("INSERT INTO T VALUES (2, ROW(2.0, CAST(NULL AS 
ARRAY<BOOLEAN>)))");
+        assertThat(sql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, Row.of(1.0, new Boolean[] {true, false})),
+                        Row.of(2, Row.of(2.0, null)));
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "ALTER TABLE T MODIFY a ROW(c1 DOUBLE, 
c2 ARRAY<BOOLEAN> NOT NULL) NOT NULL"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for a.c2");
+        sql(
+                "ALTER TABLE T MODIFY a ROW(c1 DOUBLE, c2 ARRAY<BOOLEAN>, c3 
ARRAY<MAP<STRING, BOOLEAN NOT NULL>>) NOT NULL");
+        sql(
+                "ALTER TABLE T MODIFY a ROW(c1 DOUBLE, c2 ARRAY<BOOLEAN>, c3 
ARRAY<MAP<STRING, BOOLEAN>>) NOT NULL");
+        assertThatCode(
+                        () ->
+                                sql(
+                                        "ALTER TABLE T MODIFY a ROW(c1 DOUBLE, 
c2 ARRAY<BOOLEAN>, c3 ARRAY<MAP<STRING, BOOLEAN NOT NULL>>) NOT NULL"))
+                .hasStackTraceContaining(
+                        "Cannot update column type from nullable to non 
nullable for a.c3.element.value");
+    }
 }

Reply via email to