This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 769e531e0e [flink-cdc] Added support for nested schema evolution in 
CDC  (#5682)
769e531e0e is described below

commit 769e531e0edd3c2af8dca5a6271161d7bcb695e4
Author: 0dunay0 <[email protected]>
AuthorDate: Mon Jun 9 08:51:49 2025 +0100

    [flink-cdc] Added support for nested schema evolution in CDC  (#5682)
---
 .../apache/paimon/schema/NestedSchemaUtils.java    | 272 +++++++++
 .../paimon/schema/NestedSchemaUtilsTest.java       | 669 +++++++++++++++++++++
 ...MultiTableUpdatedDataFieldsProcessFunction.java |   2 +-
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |   2 +-
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  | 144 ++++-
 .../UpdatedDataFieldsProcessFunctionBaseTest.java  | 512 ++++++++++++++++
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 131 +---
 7 files changed, 1595 insertions(+), 137 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/NestedSchemaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/NestedSchemaUtils.java
new file mode 100644
index 0000000000..56d5fd99b3
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/NestedSchemaUtils.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Utility class for handling nested column schema changes. This provides 
shared logic for both
+ * FlinkCatalog and CDC sources to handle schema evolution for nested types 
(arrays, maps,
+ * multisets, rows) consistently.
+ */
+public class NestedSchemaUtils {
+
+    /**
+     * Generates nested column updates for schema evolution. Handles all 
nested types: ROW, ARRAY,
+     * MAP, MULTISET. Creates proper field paths with markers like "element" 
for arrays and "value"
+     * for maps.
+     *
+     * @param fieldNames The current field path as a list of field names
+     * @param oldType The old data type
+     * @param newType The new data type
+     * @param schemaChanges List to collect the generated schema changes
+     */
+    public static void generateNestedColumnUpdates(
+            List<String> fieldNames,
+            DataType oldType,
+            DataType newType,
+            List<SchemaChange> schemaChanges) {
+
+        if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
+            handleRowTypeUpdate(fieldNames, oldType, newType, schemaChanges);
+        } else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) {
+            handleArrayTypeUpdate(fieldNames, oldType, newType, schemaChanges);
+        } else if (oldType.getTypeRoot() == DataTypeRoot.MAP) {
+            handleMapTypeUpdate(fieldNames, oldType, newType, schemaChanges);
+        } else if (oldType.getTypeRoot() == DataTypeRoot.MULTISET) {
+            handleMultisetTypeUpdate(fieldNames, oldType, newType, 
schemaChanges);
+        } else {
+            // For primitive types, update the column type directly
+            handlePrimitiveTypeUpdate(fieldNames, oldType, newType, 
schemaChanges);
+        }
+
+        // Handle nullability changes for all types
+        handleNullabilityChange(fieldNames, oldType, newType, schemaChanges);
+    }
+
+    private static void handleRowTypeUpdate(
+            List<String> fieldNames,
+            DataType oldType,
+            DataType newType,
+            List<SchemaChange> schemaChanges) {
+
+        String joinedNames = String.join(".", fieldNames);
+
+        Preconditions.checkArgument(
+                newType.getTypeRoot() == DataTypeRoot.ROW,
+                "Column %s can only be updated to row type, and cannot be 
updated to %s type",
+                joinedNames,
+                newType.getTypeRoot());
+
+        RowType oldRowType = (RowType) oldType;
+        RowType newRowType = (RowType) newType;
+
+        // check that existing fields maintain their order
+        Map<String, Integer> oldFieldOrders = new HashMap<>();
+        for (int i = 0; i < oldRowType.getFieldCount(); i++) {
+            oldFieldOrders.put(oldRowType.getFields().get(i).name(), i);
+        }
+
+        int lastIdx = -1;
+        String lastFieldName = "";
+        for (DataField newField : newRowType.getFields()) {
+            String name = newField.name();
+            if (oldFieldOrders.containsKey(name)) {
+                int idx = oldFieldOrders.get(name);
+                Preconditions.checkState(
+                        lastIdx < idx,
+                        "Order of existing fields in column %s must be kept 
the same. "
+                                + "However, field %s and %s have changed their 
orders.",
+                        joinedNames,
+                        lastFieldName,
+                        name);
+                lastIdx = idx;
+                lastFieldName = name;
+            }
+        }
+
+        // drop fields
+        Set<String> newFieldNames = new HashSet<>(newRowType.getFieldNames());
+        for (String name : oldRowType.getFieldNames()) {
+            if (!newFieldNames.contains(name)) {
+                List<String> dropColumnNames = new ArrayList<>(fieldNames);
+                dropColumnNames.add(name);
+                
schemaChanges.add(SchemaChange.dropColumn(dropColumnNames.toArray(new 
String[0])));
+            }
+        }
+
+        for (int i = 0; i < newRowType.getFieldCount(); i++) {
+            DataField field = newRowType.getFields().get(i);
+            String name = field.name();
+            List<String> fullFieldNames = new ArrayList<>(fieldNames);
+            fullFieldNames.add(name);
+
+            if (!oldFieldOrders.containsKey(name)) {
+                // add fields
+                SchemaChange.Move move;
+                if (i == 0) {
+                    move = SchemaChange.Move.first(name);
+                } else {
+                    String lastName = newRowType.getFields().get(i - 1).name();
+                    move = SchemaChange.Move.after(name, lastName);
+                }
+                schemaChanges.add(
+                        SchemaChange.addColumn(
+                                fullFieldNames.toArray(new String[0]),
+                                field.type(),
+                                field.description(),
+                                move));
+            } else {
+                // update existing fields
+                DataField oldField = 
oldRowType.getFields().get(oldFieldOrders.get(name));
+                if (!Objects.equals(oldField.description(), 
field.description())) {
+                    schemaChanges.add(
+                            SchemaChange.updateColumnComment(
+                                    fullFieldNames.toArray(new String[0]), 
field.description()));
+                }
+                generateNestedColumnUpdates(
+                        fullFieldNames, oldField.type(), field.type(), 
schemaChanges);
+            }
+        }
+    }
+
+    private static void handleArrayTypeUpdate(
+            List<String> fieldNames,
+            DataType oldType,
+            DataType newType,
+            List<SchemaChange> schemaChanges) {
+
+        String joinedNames = String.join(".", fieldNames);
+        Preconditions.checkArgument(
+                newType.getTypeRoot() == DataTypeRoot.ARRAY,
+                "Column %s can only be updated to array type, and cannot be 
updated to %s type",
+                joinedNames,
+                newType);
+
+        List<String> fullFieldNames = new ArrayList<>(fieldNames);
+        // add a dummy column name indicating the element of array
+        fullFieldNames.add("element");
+
+        generateNestedColumnUpdates(
+                fullFieldNames,
+                ((ArrayType) oldType).getElementType(),
+                ((ArrayType) newType).getElementType(),
+                schemaChanges);
+    }
+
+    private static void handleMapTypeUpdate(
+            List<String> fieldNames,
+            DataType oldType,
+            DataType newType,
+            List<SchemaChange> schemaChanges) {
+
+        String joinedNames = String.join(".", fieldNames);
+        Preconditions.checkArgument(
+                newType.getTypeRoot() == DataTypeRoot.MAP,
+                "Column %s can only be updated to map type, and cannot be 
updated to %s type",
+                joinedNames,
+                newType);
+
+        MapType oldMapType = (MapType) oldType;
+        MapType newMapType = (MapType) newType;
+
+        Preconditions.checkArgument(
+                oldMapType.getKeyType().equals(newMapType.getKeyType()),
+                "Cannot update key type of column %s from %s type to %s type",
+                joinedNames,
+                oldMapType.getKeyType(),
+                newMapType.getKeyType());
+
+        List<String> fullFieldNames = new ArrayList<>(fieldNames);
+        // add a dummy column name indicating the value of map
+        fullFieldNames.add("value");
+
+        generateNestedColumnUpdates(
+                fullFieldNames,
+                oldMapType.getValueType(),
+                newMapType.getValueType(),
+                schemaChanges);
+    }
+
+    private static void handleMultisetTypeUpdate(
+            List<String> fieldNames,
+            DataType oldType,
+            DataType newType,
+            List<SchemaChange> schemaChanges) {
+
+        String joinedNames = String.join(".", fieldNames);
+
+        Preconditions.checkArgument(
+                newType.getTypeRoot() == DataTypeRoot.MULTISET,
+                "Column %s can only be updated to multiset type, and cannot be 
updated to %s type",
+                joinedNames,
+                newType);
+
+        List<String> fullFieldNames = new ArrayList<>(fieldNames);
+        // Add the special "element" marker for multiset element access
+        fullFieldNames.add("element");
+
+        generateNestedColumnUpdates(
+                fullFieldNames,
+                ((MultisetType) oldType).getElementType(),
+                ((MultisetType) newType).getElementType(),
+                schemaChanges);
+    }
+
+    private static void handlePrimitiveTypeUpdate(
+            List<String> fieldNames,
+            DataType oldType,
+            DataType newType,
+            List<SchemaChange> schemaChanges) {
+
+        if (!oldType.equalsIgnoreNullable(newType)) {
+            schemaChanges.add(
+                    SchemaChange.updateColumnType(
+                            fieldNames.toArray(new String[0]), newType, 
false));
+        }
+    }
+
+    private static void handleNullabilityChange(
+            List<String> fieldNames,
+            DataType oldType,
+            DataType newType,
+            List<SchemaChange> schemaChanges) {
+
+        if (oldType.isNullable() != newType.isNullable()) {
+            schemaChanges.add(
+                    SchemaChange.updateColumnNullability(
+                            fieldNames.toArray(new String[0]), 
newType.isNullable()));
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/NestedSchemaUtilsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/NestedSchemaUtilsTest.java
new file mode 100644
index 0000000000..ae5f7ada39
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/NestedSchemaUtilsTest.java
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link NestedSchemaUtils}. */
+public class NestedSchemaUtilsTest {
+
+    @Test
+    public void testPrimitiveTypeUpdate() {
+        List<String> fieldNames = Arrays.asList("column1");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Test type conversion (INT -> BIGINT)
+        NestedSchemaUtils.generateNestedColumnUpdates(
+                fieldNames, DataTypes.INT(), DataTypes.BIGINT(), 
schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+        SchemaChange.UpdateColumnType typeChange =
+                (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+        assertThat(typeChange.fieldNames()).containsExactly("column1");
+        assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+    }
+
+    @Test
+    public void testPrimitiveTypeUpdateWithSameType() {
+        List<String> fieldNames = Arrays.asList("column1");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Same type should not generate any changes
+        NestedSchemaUtils.generateNestedColumnUpdates(
+                fieldNames, DataTypes.INT(), DataTypes.INT(), schemaChanges);
+
+        assertThat(schemaChanges).isEmpty();
+    }
+
+    @Test
+    public void testNullabilityChangeOnly() {
+        List<String> fieldNames = Arrays.asList("column1");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Test nullable to non-nullable
+        NestedSchemaUtils.generateNestedColumnUpdates(
+                fieldNames, DataTypes.INT().nullable(), 
DataTypes.INT().notNull(), schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnNullability.class);
+        SchemaChange.UpdateColumnNullability nullabilityChange =
+                (SchemaChange.UpdateColumnNullability) schemaChanges.get(0);
+        assertThat(nullabilityChange.fieldNames()).containsExactly("column1");
+        assertThat(nullabilityChange.newNullability()).isFalse();
+    }
+
+    @Test
+    public void testTypeAndNullabilityChange() {
+        List<String> fieldNames = Arrays.asList("column1");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Test type change and nullability change together
+        NestedSchemaUtils.generateNestedColumnUpdates(
+                fieldNames,
+                DataTypes.INT().nullable(),
+                DataTypes.BIGINT().notNull(),
+                schemaChanges);
+
+        assertThat(schemaChanges).hasSize(2);
+
+        // Should have both type and nullability changes
+        assertThat(schemaChanges)
+                .anyMatch(change -> change instanceof 
SchemaChange.UpdateColumnType);
+        assertThat(schemaChanges)
+                .anyMatch(change -> change instanceof 
SchemaChange.UpdateColumnNullability);
+    }
+
+    @Test
+    public void testArrayTypeUpdateElement() {
+        List<String> fieldNames = Arrays.asList("arr_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        ArrayType oldType = new ArrayType(true, DataTypes.INT());
+        ArrayType newType = new ArrayType(true, DataTypes.BIGINT());
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+        SchemaChange.UpdateColumnType typeChange =
+                (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+        assertThat(typeChange.fieldNames()).containsExactly("arr_column", 
"element");
+        assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+    }
+
+    @Test
+    public void testArrayTypeUpdateNullability() {
+        List<String> fieldNames = Arrays.asList("arr_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        ArrayType oldType = new ArrayType(true, DataTypes.INT());
+        ArrayType newType = new ArrayType(false, DataTypes.INT());
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnNullability.class);
+        SchemaChange.UpdateColumnNullability nullabilityChange =
+                (SchemaChange.UpdateColumnNullability) schemaChanges.get(0);
+        
assertThat(nullabilityChange.fieldNames()).containsExactly("arr_column");
+        assertThat(nullabilityChange.newNullability()).isFalse();
+    }
+
+    @Test
+    public void testArrayWithIncompatibleTypeThrowsException() {
+        List<String> fieldNames = Arrays.asList("arr_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        ArrayType oldType = new ArrayType(true, DataTypes.INT());
+        RowType newElementType = RowType.of(DataTypes.STRING()); // 
Incompatible type
+
+        assertThatThrownBy(
+                        () -> {
+                            NestedSchemaUtils.generateNestedColumnUpdates(
+                                    fieldNames, oldType, newElementType, 
schemaChanges);
+                        })
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("can only be updated to array type");
+    }
+
+    @Test
+    public void testMapTypeUpdateValue() {
+        List<String> fieldNames = Arrays.asList("map_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        MapType oldType = new MapType(true, DataTypes.STRING(), 
DataTypes.INT());
+        MapType newType = new MapType(true, DataTypes.STRING(), 
DataTypes.BIGINT());
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+        SchemaChange.UpdateColumnType typeChange =
+                (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+        assertThat(typeChange.fieldNames()).containsExactly("map_column", 
"value");
+        assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+    }
+
+    @Test
+    public void testMapTypeUpdateNullability() {
+        List<String> fieldNames = Arrays.asList("map_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        MapType oldType = new MapType(true, DataTypes.STRING(), 
DataTypes.INT());
+        MapType newType = new MapType(false, DataTypes.STRING(), 
DataTypes.INT());
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnNullability.class);
+        SchemaChange.UpdateColumnNullability nullabilityChange =
+                (SchemaChange.UpdateColumnNullability) schemaChanges.get(0);
+        
assertThat(nullabilityChange.fieldNames()).containsExactly("map_column");
+        assertThat(nullabilityChange.newNullability()).isFalse();
+    }
+
+    @Test
+    public void testMapWithIncompatibleKeyTypeThrowsException() {
+        List<String> fieldNames = Arrays.asList("map_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        MapType oldType = new MapType(true, DataTypes.STRING(), 
DataTypes.INT());
+        MapType newType = new MapType(true, DataTypes.INT(), DataTypes.INT()); 
// Different key type
+
+        assertThatThrownBy(
+                        () -> {
+                            NestedSchemaUtils.generateNestedColumnUpdates(
+                                    fieldNames, oldType, newType, 
schemaChanges);
+                        })
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Cannot update key type");
+    }
+
+    @Test
+    public void testMapWithIncompatibleTypeThrowsException() {
+        List<String> fieldNames = Arrays.asList("map_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        MapType oldType = new MapType(true, DataTypes.STRING(), 
DataTypes.INT());
+        ArrayType newType = new ArrayType(true, DataTypes.INT()); // 
Incompatible type
+
+        assertThatThrownBy(
+                        () -> {
+                            NestedSchemaUtils.generateNestedColumnUpdates(
+                                    fieldNames, oldType, newType, 
schemaChanges);
+                        })
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("can only be updated to map type");
+    }
+
+    @Test
+    public void testMultisetTypeUpdateElement() {
+        List<String> fieldNames = Arrays.asList("multiset_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        MultisetType oldType = new MultisetType(true, DataTypes.INT());
+        MultisetType newType = new MultisetType(true, DataTypes.BIGINT());
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+        SchemaChange.UpdateColumnType typeChange =
+                (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+        assertThat(typeChange.fieldNames()).containsExactly("multiset_column", 
"element");
+        assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+    }
+
+    @Test
+    public void testMultisetWithIncompatibleTypeThrowsException() {
+        List<String> fieldNames = Arrays.asList("multiset_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        MultisetType oldType = new MultisetType(true, DataTypes.INT());
+        MapType newType =
+                new MapType(true, DataTypes.STRING(), DataTypes.INT()); // 
Incompatible type
+
+        assertThatThrownBy(
+                        () -> {
+                            NestedSchemaUtils.generateNestedColumnUpdates(
+                                    fieldNames, oldType, newType, 
schemaChanges);
+                        })
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("can only be updated to multiset type");
+    }
+
+    @Test
+    public void testRowTypeAddField() {
+        List<String> fieldNames = Arrays.asList("row_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        RowType oldType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT()),
+                        new DataField(1, "f2", DataTypes.STRING()));
+        RowType newType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT()),
+                        new DataField(1, "f2", DataTypes.STRING()),
+                        new DataField(2, "f3", DataTypes.DOUBLE()));
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.AddColumn.class);
+        SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn) 
schemaChanges.get(0);
+        assertThat(addColumn.fieldNames()).containsExactly("row_column", "f3");
+        assertThat(addColumn.dataType()).isEqualTo(DataTypes.DOUBLE());
+    }
+
+    @Test
+    public void testRowTypeDropField() {
+        List<String> fieldNames = Arrays.asList("row_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        RowType oldType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT()),
+                        new DataField(1, "f2", DataTypes.STRING()),
+                        new DataField(2, "f3", DataTypes.DOUBLE()));
+        RowType newType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT()),
+                        new DataField(1, "f2", DataTypes.STRING()));
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.DropColumn.class);
+        SchemaChange.DropColumn dropColumn = (SchemaChange.DropColumn) 
schemaChanges.get(0);
+        assertThat(dropColumn.fieldNames()).containsExactly("row_column", 
"f3");
+    }
+
+    @Test
+    public void testRowTypeUpdateExistingField() {
+        List<String> fieldNames = Arrays.asList("row_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        RowType oldType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT()),
+                        new DataField(1, "f2", DataTypes.STRING()));
+        RowType newType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.BIGINT()), // Type 
changed
+                        new DataField(1, "f2", DataTypes.STRING()));
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+        SchemaChange.UpdateColumnType typeChange =
+                (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+        assertThat(typeChange.fieldNames()).containsExactly("row_column", 
"f1");
+        assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+    }
+
+    @Test
+    public void testRowTypeUpdateFieldComment() {
+        List<String> fieldNames = Arrays.asList("row_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        RowType oldType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT(), "old comment"),
+                        new DataField(1, "f2", DataTypes.STRING()));
+        RowType newType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT(), "new comment"),
+                        new DataField(1, "f2", DataTypes.STRING()));
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnComment.class);
+        SchemaChange.UpdateColumnComment commentChange =
+                (SchemaChange.UpdateColumnComment) schemaChanges.get(0);
+        assertThat(commentChange.fieldNames()).containsExactly("row_column", 
"f1");
+        assertThat(commentChange.newDescription()).isEqualTo("new comment");
+    }
+
+    @Test
+    public void testRowTypeReorderFields() {
+        List<String> fieldNames = Arrays.asList("row_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        RowType oldType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT()),
+                        new DataField(1, "f2", DataTypes.STRING()),
+                        new DataField(2, "f3", DataTypes.DOUBLE()));
+        RowType newType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT()),
+                        new DataField(2, "f3", DataTypes.DOUBLE()),
+                        new DataField(1, "f2", DataTypes.STRING()) // f2 and 
f3 swapped
+                        );
+
+        assertThatThrownBy(
+                        () -> {
+                            NestedSchemaUtils.generateNestedColumnUpdates(
+                                    fieldNames, oldType, newType, 
schemaChanges);
+                        })
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("Order of existing fields");
+    }
+
+    @Test
+    public void testRowWithIncompatibleTypeThrowsException() {
+        List<String> fieldNames = Arrays.asList("row_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        RowType oldType = RowType.of(new DataField(0, "f1", DataTypes.INT()));
+        ArrayType newType = new ArrayType(true, DataTypes.INT()); // 
Incompatible type
+
+        assertThatThrownBy(
+                        () -> {
+                            NestedSchemaUtils.generateNestedColumnUpdates(
+                                    fieldNames, oldType, newType, 
schemaChanges);
+                        })
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("can only be updated to row type");
+    }
+
+    @Test
+    public void testComplexNestedStructure() {
+        List<String> fieldNames = Arrays.asList("complex_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Old: ARRAY<MAP<STRING, ROW<f1 INT, f2 STRING>>>
+        RowType oldInnerRowType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT()),
+                        new DataField(1, "f2", DataTypes.STRING()));
+        MapType oldMapType = new MapType(true, DataTypes.STRING(), 
oldInnerRowType);
+        ArrayType oldType = new ArrayType(true, oldMapType);
+
+        // New: ARRAY<MAP<STRING, ROW<f1 BIGINT, f2 STRING, f3 DOUBLE>>>
+        RowType newInnerRowType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.BIGINT()), // Type 
changed
+                        new DataField(1, "f2", DataTypes.STRING()),
+                        new DataField(2, "f3", DataTypes.DOUBLE()) // New 
field added
+                        );
+        MapType newMapType = new MapType(true, DataTypes.STRING(), 
newInnerRowType);
+        ArrayType newType = new ArrayType(true, newMapType);
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(2);
+
+        // Should have one type update and one add column
+        SchemaChange.UpdateColumnType typeChange =
+                schemaChanges.stream()
+                        .filter(change -> change instanceof 
SchemaChange.UpdateColumnType)
+                        .map(change -> (SchemaChange.UpdateColumnType) change)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(typeChange).isNotNull();
+        assertThat(typeChange.fieldNames())
+                .containsExactly("complex_column", "element", "value", "f1");
+        assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+
+        SchemaChange.AddColumn addColumn =
+                schemaChanges.stream()
+                        .filter(change -> change instanceof 
SchemaChange.AddColumn)
+                        .map(change -> (SchemaChange.AddColumn) change)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(addColumn).isNotNull();
+        assertThat(addColumn.fieldNames())
+                .containsExactly("complex_column", "element", "value", "f3");
+        assertThat(addColumn.dataType()).isEqualTo(DataTypes.DOUBLE());
+    }
+
+    @Test
+    public void testNestedFieldPaths() {
+        List<String> fieldNames = Arrays.asList("outer", "inner");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Test that nested field names are properly combined
+        NestedSchemaUtils.generateNestedColumnUpdates(
+                fieldNames, DataTypes.INT(), DataTypes.BIGINT(), 
schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+        SchemaChange.UpdateColumnType typeChange =
+                (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+        assertThat(typeChange.fieldNames()).containsExactly("outer", "inner");
+    }
+
+    @Test
+    public void testEmptyFieldNames() {
+        List<String> fieldNames = Collections.emptyList();
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        NestedSchemaUtils.generateNestedColumnUpdates(
+                fieldNames, DataTypes.INT(), DataTypes.BIGINT(), 
schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+        SchemaChange.UpdateColumnType typeChange =
+                (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+        assertThat(typeChange.fieldNames()).isEmpty();
+    }
+
+    @Test
+    public void testVarCharTypeExtension() {
+        List<String> fieldNames = Arrays.asList("varchar_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        VarCharType oldType = new VarCharType(true, 10);
+        VarCharType newType = new VarCharType(true, 20);
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(1);
+        
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+        SchemaChange.UpdateColumnType typeChange =
+                (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+        assertThat(typeChange.fieldNames()).containsExactly("varchar_column");
+        assertThat(typeChange.newDataType()).isEqualTo(newType);
+    }
+
+    @Test
+    public void testMultipleNestedChanges() {
+        List<String> fieldNames = Arrays.asList("root");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Old: ROW<f1 INT NULL, f2 STRING>
+        RowType oldType =
+                RowType.of(
+                        new DataField(0, "f1", DataTypes.INT().nullable()),
+                        new DataField(1, "f2", DataTypes.STRING()));
+
+        // New: ROW<f1 BIGINT NOT NULL, f2 STRING, f3 DOUBLE>
+        RowType newType =
+                RowType.of(
+                        new DataField(
+                                0,
+                                "f1",
+                                DataTypes.BIGINT().notNull()), // Type and 
nullability changed
+                        new DataField(1, "f2", DataTypes.STRING()),
+                        new DataField(2, "f3", DataTypes.DOUBLE()) // New field
+                        );
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(3);
+
+        // Should have: add column, update type, update nullability
+        assertThat(schemaChanges).anyMatch(change -> change instanceof 
SchemaChange.AddColumn);
+        assertThat(schemaChanges)
+                .anyMatch(change -> change instanceof 
SchemaChange.UpdateColumnType);
+        assertThat(schemaChanges)
+                .anyMatch(change -> change instanceof 
SchemaChange.UpdateColumnNullability);
+    }
+
+    @Test
+    public void testArrayOfRowsWithFieldChanges() {
+        List<String> fieldNames = Arrays.asList("array_of_rows");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Old: ARRAY<ROW<id INT, name STRING>>
+        RowType oldRowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.INT()),
+                        new DataField(1, "name", DataTypes.STRING()));
+        ArrayType oldType = new ArrayType(true, oldRowType);
+
+        // New: ARRAY<ROW<id BIGINT, name STRING, active BOOLEAN>>
+        RowType newRowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.BIGINT()), // Type 
changed
+                        new DataField(1, "name", DataTypes.STRING()),
+                        new DataField(2, "active", DataTypes.BOOLEAN()) // New 
field
+                        );
+        ArrayType newType = new ArrayType(true, newRowType);
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(2);
+
+        // Verify field paths include "element" for array access
+        SchemaChange.UpdateColumnType typeChange =
+                schemaChanges.stream()
+                        .filter(change -> change instanceof 
SchemaChange.UpdateColumnType)
+                        .map(change -> (SchemaChange.UpdateColumnType) change)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(typeChange).isNotNull();
+        assertThat(typeChange.fieldNames()).containsExactly("array_of_rows", 
"element", "id");
+
+        SchemaChange.AddColumn addColumn =
+                schemaChanges.stream()
+                        .filter(change -> change instanceof 
SchemaChange.AddColumn)
+                        .map(change -> (SchemaChange.AddColumn) change)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(addColumn).isNotNull();
+        assertThat(addColumn.fieldNames()).containsExactly("array_of_rows", 
"element", "active");
+    }
+
+    @Test
+    public void testMapOfRowsWithFieldChanges() {
+        List<String> fieldNames = Arrays.asList("map_of_rows");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Old: MAP<STRING, ROW<id INT, name STRING>>
+        RowType oldRowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.INT()),
+                        new DataField(1, "name", DataTypes.STRING()));
+        MapType oldType = new MapType(true, DataTypes.STRING(), oldRowType);
+
+        // New: MAP<STRING, ROW<id BIGINT, name STRING, active BOOLEAN>>
+        RowType newRowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.BIGINT()), // Type 
changed
+                        new DataField(1, "name", DataTypes.STRING()),
+                        new DataField(2, "active", DataTypes.BOOLEAN()) // New 
field
+                        );
+        MapType newType = new MapType(true, DataTypes.STRING(), newRowType);
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(2);
+
+        // Verify field paths include "value" for map value access
+        SchemaChange.UpdateColumnType typeChange =
+                schemaChanges.stream()
+                        .filter(change -> change instanceof 
SchemaChange.UpdateColumnType)
+                        .map(change -> (SchemaChange.UpdateColumnType) change)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(typeChange).isNotNull();
+        assertThat(typeChange.fieldNames()).containsExactly("map_of_rows", 
"value", "id");
+
+        SchemaChange.AddColumn addColumn =
+                schemaChanges.stream()
+                        .filter(change -> change instanceof 
SchemaChange.AddColumn)
+                        .map(change -> (SchemaChange.AddColumn) change)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(addColumn).isNotNull();
+        assertThat(addColumn.fieldNames()).containsExactly("map_of_rows", 
"value", "active");
+    }
+
+    @Test
+    public void testMultisetOfComplexType() {
+        List<String> fieldNames = Arrays.asList("multiset_column");
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+
+        // Old: MULTISET<ROW<id INT>>
+        RowType oldRowType = RowType.of(new DataField(0, "id", 
DataTypes.INT()));
+        MultisetType oldType = new MultisetType(true, oldRowType);
+
+        // New: MULTISET<ROW<id BIGINT, name STRING>>
+        RowType newRowType =
+                RowType.of(
+                        new DataField(0, "id", DataTypes.BIGINT()),
+                        new DataField(1, "name", DataTypes.STRING()));
+        MultisetType newType = new MultisetType(true, newRowType);
+
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
+
+        assertThat(schemaChanges).hasSize(2);
+
+        // Verify field paths include "element" for multiset element access
+        SchemaChange.UpdateColumnType typeChange =
+                schemaChanges.stream()
+                        .filter(change -> change instanceof 
SchemaChange.UpdateColumnType)
+                        .map(change -> (SchemaChange.UpdateColumnType) change)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(typeChange).isNotNull();
+        assertThat(typeChange.fieldNames()).containsExactly("multiset_column", 
"element", "id");
+
+        SchemaChange.AddColumn addColumn =
+                schemaChanges.stream()
+                        .filter(change -> change instanceof 
SchemaChange.AddColumn)
+                        .map(change -> (SchemaChange.AddColumn) change)
+                        .findFirst()
+                        .orElse(null);
+        assertThat(addColumn).isNotNull();
+        assertThat(addColumn.fieldNames()).containsExactly("multiset_column", 
"element", "name");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 42effccf7f..503f3593c2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -101,7 +101,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
                         updatedSchema.f1.comment());
 
         for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, 
actualUpdatedSchema)) {
-            applySchemaChange(schemaManager, schemaChange, tableId);
+            applySchemaChange(schemaManager, schemaChange, tableId, 
actualUpdatedSchema);
         }
         /*
          * Here, actualUpdatedDataFields cannot be used to update latestFields 
because there is a
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index e1b78926cc..d4319e868e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -74,7 +74,7 @@ public class UpdatedDataFieldsProcessFunction
                         updatedSchema.primaryKeys(),
                         updatedSchema.comment());
         for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, 
actualUpdatedSchema)) {
-            applySchemaChange(schemaManager, schemaChange, identifier);
+            applySchemaChange(schemaManager, schemaChange, identifier, 
actualUpdatedSchema);
         }
         /*
          * Here, actualUpdatedDataFields cannot be used to update latestFields 
because there is a
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 410425a7da..bb49de2ac1 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -22,14 +22,18 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.schema.NestedSchemaUtils;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.FieldIdentifier;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
@@ -98,7 +102,10 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
     }
 
     protected void applySchemaChange(
-            SchemaManager schemaManager, SchemaChange schemaChange, Identifier 
identifier)
+            SchemaManager schemaManager,
+            SchemaChange schemaChange,
+            Identifier identifier,
+            CdcSchema actualUpdatedSchema)
             throws Exception {
         if (schemaChange instanceof SchemaChange.AddColumn) {
             try {
@@ -118,9 +125,6 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         } else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
             SchemaChange.UpdateColumnType updateColumnType =
                     (SchemaChange.UpdateColumnType) schemaChange;
-            Preconditions.checkState(
-                    updateColumnType.fieldNames().length == 1,
-                    "Paimon CDC currently does not support nested type schema 
evolution.");
             TableSchema schema =
                     schemaManager
                             .latest()
@@ -136,6 +140,19 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                             + " does not exist in table. This is unexpected.");
             DataType oldType = schema.fields().get(idx).type();
             DataType newType = updateColumnType.newDataType();
+
+            // For complex types, extract the full new type from 
actualUpdatedSchema
+            // to preserve type context (e.g., ARRAY<BIGINT> instead of just 
BIGINT)
+            if (actualUpdatedSchema != null) {
+                String fieldName = updateColumnType.fieldNames()[0];
+                for (DataField field : actualUpdatedSchema.fields()) {
+                    if (fieldName.equals(field.name())) {
+                        newType = field.type();
+                        break;
+                    }
+                }
+            }
+
             switch (canConvert(oldType, newType, typeMapping)) {
                 case CONVERT:
                     catalog.alterTable(identifier, schemaChange, false);
@@ -165,9 +182,41 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
     public static ConvertAction canConvert(
             DataType oldType, DataType newType, TypeMapping typeMapping) {
         if (oldType.equalsIgnoreNullable(newType)) {
+            if (oldType.isNullable() && !newType.isNullable()) {
+                return ConvertAction.EXCEPTION; // Cannot make nullable field 
non-nullable
+            }
             return ConvertAction.CONVERT;
         }
 
+        if (oldType.getTypeRoot() == DataTypeRoot.ARRAY
+                && newType.getTypeRoot() == DataTypeRoot.ARRAY) {
+
+            ArrayType oldArrayType = (ArrayType) oldType;
+            ArrayType newArrayType = (ArrayType) newType;
+            return canConvertArray(oldArrayType, newArrayType, typeMapping);
+        }
+
+        if (oldType.getTypeRoot() == DataTypeRoot.MAP
+                && newType.getTypeRoot() == DataTypeRoot.MAP) {
+            MapType oldMapType = (MapType) oldType;
+            MapType newMapType = (MapType) newType;
+
+            return canConvertMap(oldMapType, newMapType, typeMapping);
+        }
+
+        if (oldType.getTypeRoot() == DataTypeRoot.MULTISET
+                && newType.getTypeRoot() == DataTypeRoot.MULTISET) {
+            MultisetType oldMultisetType = (MultisetType) oldType;
+            MultisetType newMultisetType = (MultisetType) newType;
+
+            return canConvertMultisetType(oldMultisetType, newMultisetType, 
typeMapping);
+        }
+
+        if (oldType.getTypeRoot() == DataTypeRoot.ROW
+                && newType.getTypeRoot() == DataTypeRoot.ROW) {
+            return canConvertRowType((RowType) oldType, (RowType) newType, 
typeMapping);
+        }
+
         int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
         int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
         if (oldIdx >= 0 && newIdx >= 0) {
@@ -223,6 +272,88 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
         return ConvertAction.EXCEPTION;
     }
 
+    private static ConvertAction canConvertArray(
+            ArrayType oldArrayType, ArrayType newArrayType, TypeMapping 
typeMapping) {
+        if (oldArrayType.isNullable() && !newArrayType.isNullable()) {
+            return ConvertAction.EXCEPTION;
+        }
+
+        return canConvert(
+                oldArrayType.getElementType(), newArrayType.getElementType(), 
typeMapping);
+    }
+
+    private static ConvertAction canConvertMap(
+            MapType oldMapType, MapType newMapType, TypeMapping typeMapping) {
+        if (oldMapType.isNullable() && !newMapType.isNullable()) {
+            return ConvertAction.EXCEPTION;
+        }
+
+        // For map keys, don't allow key type evolution
+        // hashcode will be different even if the value of the key is the same
+        if (!oldMapType.getKeyType().equals(newMapType.getKeyType())) {
+            return ConvertAction.EXCEPTION;
+        }
+
+        return canConvert(oldMapType.getValueType(), 
newMapType.getValueType(), typeMapping);
+    }
+
+    private static ConvertAction canConvertRowType(
+            RowType oldRowType, RowType newRowType, TypeMapping typeMapping) {
+        Map<String, DataField> oldFieldMap = new HashMap<>();
+        for (DataField field : oldRowType.getFields()) {
+            oldFieldMap.put(field.name(), field);
+        }
+
+        Map<String, DataField> newFieldMap = new HashMap<>();
+        for (DataField field : newRowType.getFields()) {
+            newFieldMap.put(field.name(), field);
+        }
+
+        // Rule 1: Check all non-nullable fields in the old type must exist in 
the new type
+        for (DataField oldField : oldRowType.getFields()) {
+            if (!oldField.type().isNullable()) {
+                if (!newFieldMap.containsKey(oldField.name())) {
+                    return ConvertAction.EXCEPTION;
+                }
+            }
+        }
+
+        // Rule 2: All fields common to both schemas must have compatible types
+        boolean needsConversion = false;
+        for (DataField newField : newRowType.getFields()) {
+            DataField oldField = oldFieldMap.get(newField.name());
+            if (oldField != null) {
+                ConvertAction fieldAction =
+                        canConvert(oldField.type(), newField.type(), 
typeMapping);
+                if (fieldAction == ConvertAction.EXCEPTION) {
+                    return ConvertAction.EXCEPTION;
+                }
+                if (fieldAction == ConvertAction.CONVERT) {
+                    needsConversion = true;
+                }
+            } else {
+                // Rule 3: New fields must be nullable
+                if (!newField.type().isNullable()) {
+                    return ConvertAction.EXCEPTION;
+                }
+                needsConversion = true;
+            }
+        }
+
+        return needsConversion ? ConvertAction.CONVERT : ConvertAction.IGNORE;
+    }
+
+    private static ConvertAction canConvertMultisetType(
+            MultisetType oldMultisetType, MultisetType newMultisetType, 
TypeMapping typeMapping) {
+
+        if (oldMultisetType.isNullable() && !newMultisetType.isNullable()) {
+            return ConvertAction.EXCEPTION;
+        }
+
+        return canConvert(
+                oldMultisetType.getElementType(), 
newMultisetType.getElementType(), typeMapping);
+    }
+
     protected List<SchemaChange> extractSchemaChanges(
             SchemaManager schemaManager, CdcSchema updatedSchema) {
         TableSchema oldTableSchema = schemaManager.latest().get();
@@ -258,8 +389,9 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                     if (oldField.type().is(DataTypeRoot.DECIMAL) && 
!allowDecimalTypeChange) {
                         continue;
                     }
-                    // update column type
-                    result.add(SchemaChange.updateColumnType(newFieldName, 
newField.type()));
+                    // Generate nested column updates if needed
+                    NestedSchemaUtils.generateNestedColumnUpdates(
+                            Arrays.asList(newFieldName), oldField.type(), 
newField.type(), result);
                     // update column comment
                     if (newField.description() != null) {
                         result.add(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
index 24d0a51453..a201e8308c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
@@ -19,15 +19,23 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.SmallIntType;
 import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.types.VarCharType;
 
 import org.junit.Test;
 
+import java.util.Arrays;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** IT cases for {@link UpdatedDataFieldsProcessFunctionBaseTest}. */
@@ -104,4 +112,508 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
 
         
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, 
convertAction);
     }
+
+    @Test
+    public void testArrayNullabilityEvolution() {
+        // Test 1: Nullable array to non-nullable array (should fail)
+        ArrayType oldType = new ArrayType(true, DataTypes.INT());
+        ArrayType newType = new ArrayType(false, DataTypes.INT());
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+
+        // Test 2: Non-nullable array to nullable array (should succeed)
+        oldType = new ArrayType(false, DataTypes.INT());
+        newType = new ArrayType(true, DataTypes.INT());
+
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+
+        // Test 3: Array with nullable elements to non-nullable elements 
(should fail)
+        oldType = new ArrayType(true, new IntType(true));
+        newType = new ArrayType(true, new IntType(false));
+
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testNestedRowNullabilityEvolution() {
+        // Old type: ROW(f1 ROW(inner1 INT NULL, inner2 STRING) NULL)
+        RowType oldInnerType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "inner1", new IntType(true)),
+                                new DataField(2, "inner2", 
DataTypes.STRING())));
+        RowType oldRowType = new RowType(true, Arrays.asList(new DataField(1, 
"f1", oldInnerType)));
+
+        // Test 1: Making nested row non-nullable (should fail)
+        RowType newInnerType =
+                new RowType(
+                        false, // Changed to non-nullable
+                        Arrays.asList(
+                                new DataField(1, "inner1", new IntType(true)),
+                                new DataField(2, "inner2", 
DataTypes.STRING())));
+        RowType newRowType = new RowType(true, Arrays.asList(new DataField(1, 
"f1", newInnerType)));
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+
+        // Test 2: Making nested field non-nullable (should fail)
+        newInnerType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(
+                                        1, "inner1", new IntType(false)), // 
Changed to non-nullable
+                                new DataField(2, "inner2", 
DataTypes.STRING())));
+        newRowType = new RowType(true, Arrays.asList(new DataField(1, "f1", 
newInnerType)));
+
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testMultisetNullabilityEvolution() {
+        // Test 1: Nullable multiset to non-nullable multiset (should fail)
+        MultisetType oldType = new MultisetType(true, DataTypes.INT());
+        MultisetType newType = new MultisetType(false, DataTypes.INT());
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+
+        // Test 2: Non-nullable multiset to nullable multiset (should succeed)
+        oldType = new MultisetType(false, DataTypes.INT());
+        newType = new MultisetType(true, DataTypes.INT());
+
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+
+        // Test 3: Multiset with nullable elements to non-nullable elements 
(should fail)
+        oldType = new MultisetType(true, new IntType(true));
+        newType = new MultisetType(true, new IntType(false));
+
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testComplexNestedNullabilityEvolution() {
+        // Old type: ROW(
+        //   f1 ARRAY<ROW<x INT NULL, y STRING> NULL> NULL,
+        //   f2 MAP<STRING, ROW<a INT NULL, b STRING> NULL> NULL
+        // )
+        RowType oldArrayInnerType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "x", new IntType(true)),
+                                new DataField(2, "y", DataTypes.STRING())));
+        ArrayType oldArrayType = new ArrayType(true, oldArrayInnerType);
+
+        RowType oldMapValueType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "a", new IntType(true)),
+                                new DataField(2, "b", DataTypes.STRING())));
+        MapType oldMapType = new MapType(true, DataTypes.STRING(), 
oldMapValueType);
+
+        RowType oldRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", oldArrayType),
+                                new DataField(2, "f2", oldMapType)));
+
+        // Test 1: Making nested array's row type non-nullable (should fail)
+        RowType newArrayInnerType =
+                new RowType(
+                        false, // Changed to non-nullable
+                        Arrays.asList(
+                                new DataField(1, "x", new IntType(true)),
+                                new DataField(2, "y", DataTypes.STRING())));
+        ArrayType newArrayType = new ArrayType(true, newArrayInnerType);
+        MapType newMapType = oldMapType; // Keep the same
+
+        RowType newRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", newArrayType),
+                                new DataField(2, "f2", newMapType)));
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+
+        // Test 2: Making map's value type's field non-nullable (should fail)
+        newArrayType = oldArrayType; // Restore to original
+        RowType newMapValueType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(
+                                        1, "a", new IntType(false)), // 
Changed to non-nullable
+                                new DataField(2, "b", DataTypes.STRING())));
+        newMapType = new MapType(true, DataTypes.STRING(), newMapValueType);
+
+        newRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", newArrayType),
+                                new DataField(2, "f2", newMapType)));
+
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testNestedRowTypeConversion() {
+        // Old type: ROW(f1 INT, f2 STRING)
+        RowType oldRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", DataTypes.INT()),
+                                new DataField(2, "f2", DataTypes.STRING())));
+
+        // New type: ROW(f1 BIGINT, f2 STRING, f3 DOUBLE)
+        RowType newRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", DataTypes.BIGINT()),
+                                new DataField(2, "f2", DataTypes.STRING()),
+                                new DataField(3, "f3", DataTypes.DOUBLE())));
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+    }
+
+    @Test
+    public void testNestedArrayTypeConversion() {
+        // Old type: ARRAY<INT>
+        ArrayType oldArrayType = new ArrayType(true, DataTypes.INT());
+
+        // New type: ARRAY<BIGINT>
+        ArrayType newArrayType = new ArrayType(true, DataTypes.BIGINT());
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldArrayType, newArrayType, 
TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+    }
+
+    @Test
+    public void testNestedMapTypeConversion() {
+        // Old type: MAP<STRING, INT>
+        MapType oldMapType = new MapType(true, DataTypes.STRING(), 
DataTypes.INT());
+
+        // New type: MAP<STRING, BIGINT>
+        MapType newMapType = new MapType(true, DataTypes.STRING(), 
DataTypes.BIGINT());
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldMapType, newMapType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+    }
+
+    @Test
+    public void testMapWithNullableComplexTypes() {
+        // Old type: MAP<STRING, ROW<f1 ARRAY<INT> NULL, f2 ROW<x INT, y 
STRING> NULL>>
+        RowType oldInnerRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "x", DataTypes.INT()),
+                                new DataField(2, "y", DataTypes.STRING())));
+
+        RowType oldValueType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", new ArrayType(true, 
DataTypes.INT())),
+                                new DataField(2, "f2", oldInnerRowType)));
+
+        MapType oldType = new MapType(true, DataTypes.STRING(), oldValueType);
+
+        // New type: MAP<STRING, ROW<f1 ARRAY<BIGINT> NULL, f2 ROW<x BIGINT, y 
STRING, z DOUBLE>
+        // NULL>>
+        RowType newInnerRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "x", DataTypes.BIGINT()),
+                                new DataField(2, "y", DataTypes.STRING()),
+                                new DataField(3, "z", DataTypes.DOUBLE())));
+
+        RowType newValueType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", new ArrayType(true, 
DataTypes.BIGINT())),
+                                new DataField(2, "f2", newInnerRowType)));
+
+        MapType newType = new MapType(true, DataTypes.STRING(), newValueType);
+
+        // Test compatible evolution
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+
+        // Test incompatible element type
+        RowType incompatibleValueType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(
+                                        1,
+                                        "f1",
+                                        new ArrayType(
+                                                true, DataTypes.STRING())), // 
INT to STRING is
+                                // incompatible
+                                new DataField(2, "f2", newInnerRowType)));
+
+        MapType incompatibleType = new MapType(true, DataTypes.STRING(), 
incompatibleValueType);
+
+        convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, incompatibleType, 
TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testComplexNestedTypeConversion() {
+        // Old type: ARRAY<MAP<INT, ROW(f1 INT, f2 STRING)>>
+        RowType oldInnerRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", DataTypes.INT()),
+                                new DataField(2, "f2", DataTypes.STRING())));
+        MapType oldMapType = new MapType(true, DataTypes.INT(), 
oldInnerRowType);
+        ArrayType oldType = new ArrayType(true, oldMapType);
+
+        // New type: ARRAY<MAP<INT, ROW(f1 BIGINT, f2 STRING, f3 DOUBLE)>>
+        RowType newInnerRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", DataTypes.BIGINT()),
+                                new DataField(2, "f2", DataTypes.STRING()),
+                                new DataField(3, "f3", DataTypes.DOUBLE())));
+        MapType newMapType = new MapType(true, DataTypes.INT(), 
newInnerRowType);
+        ArrayType newType = new ArrayType(true, newMapType);
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+    }
+
+    @Test
+    public void testRowTypeWithMultipleChanges() {
+        // Old type: ROW(f1 INT, f2 STRING, f3 DOUBLE)
+        RowType oldRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", DataTypes.INT()),
+                                new DataField(2, "f2", DataTypes.STRING()),
+                                new DataField(3, "f3", DataTypes.DOUBLE())));
+
+        // New type: ROW(f1 BIGINT, f2 VARCHAR(100), f4 INT)
+        RowType newRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", DataTypes.BIGINT()),
+                                new DataField(2, "f2", new VarCharType(true, 
100)),
+                                new DataField(4, "f4", DataTypes.INT())));
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+    }
+
+    @Test
+    public void testNonNullableFieldRemoval() {
+        // Old type: ROW(f1 INT NOT NULL, f2 STRING)
+        RowType oldRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", new IntType(false)),
+                                new DataField(2, "f2", DataTypes.STRING())));
+
+        // New type: ROW(f2 STRING)
+        RowType newRowType =
+                new RowType(true, Arrays.asList(new DataField(2, "f2", 
DataTypes.STRING())));
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testAddingNonNullableField() {
+        // Old type: ROW(f1 INT)
+        RowType oldRowType =
+                new RowType(true, Arrays.asList(new DataField(1, "f1", 
DataTypes.INT())));
+
+        // New type: ROW(f1 INT, f2 STRING NOT NULL)
+        RowType newRowType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "f1", DataTypes.INT()),
+                                new DataField(2, "f2", new VarCharType(false, 
100))));
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testNestedRowWithinRowEvolution() {
+        // Old type: ROW(f1 ROW(inner1 INT, inner2 STRING))
+        RowType oldInnerType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "inner1", DataTypes.INT()),
+                                new DataField(2, "inner2", 
DataTypes.STRING())));
+        RowType oldRowType = new RowType(true, Arrays.asList(new DataField(1, 
"f1", oldInnerType)));
+
+        // New type: ROW(f1 ROW(inner1 BIGINT, inner2 STRING, inner3 DOUBLE))
+        RowType newInnerType =
+                new RowType(
+                        true,
+                        Arrays.asList(
+                                new DataField(1, "inner1", DataTypes.BIGINT()),
+                                new DataField(2, "inner2", DataTypes.STRING()),
+                                new DataField(3, "inner3", 
DataTypes.DOUBLE())));
+        RowType newRowType = new RowType(true, Arrays.asList(new DataField(1, 
"f1", newInnerType)));
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldRowType, newRowType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+    }
+
+    @Test
+    public void testArrayOfArraysEvolution() {
+        // Old type: ARRAY<ARRAY<INT>>
+        ArrayType oldInnerArray = new ArrayType(true, DataTypes.INT());
+        ArrayType oldType = new ArrayType(true, oldInnerArray);
+
+        // New type: ARRAY<ARRAY<BIGINT>>
+        ArrayType newInnerArray = new ArrayType(true, DataTypes.BIGINT());
+        ArrayType newType = new ArrayType(true, newInnerArray);
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+    }
+
+    @Test
+    public void testArrayWithIncompatibleElementType() {
+        // Old type: ARRAY<INT>
+        ArrayType oldType = new ArrayType(true, DataTypes.INT());
+
+        // New type: ARRAY<STRING>
+        ArrayType newType = new ArrayType(true, DataTypes.STRING());
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testMapOfMapsEvolution() {
+        // Old type: MAP<STRING, MAP<INT, STRING>>
+        MapType oldInnerMap = new MapType(true, DataTypes.INT(), 
DataTypes.STRING());
+        MapType oldType = new MapType(true, DataTypes.STRING(), oldInnerMap);
+
+        // New type: MAP<STRING, MAP<INT, VARCHAR(100)>>
+        MapType newInnerMap = new MapType(true, DataTypes.INT(), new 
VarCharType(true, 100));
+        MapType newType = new MapType(true, DataTypes.STRING(), newInnerMap);
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE, 
convertAction);
+    }
+
+    @Test
+    public void testMapWithIncompatibleValueType() {
+        // Old type: MAP<STRING, INT>
+        MapType oldType = new MapType(true, DataTypes.STRING(), 
DataTypes.INT());
+
+        // New type: MAP<STRING, BOOLEAN>
+        MapType newType = new MapType(true, DataTypes.STRING(), 
DataTypes.BOOLEAN());
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
+
+    @Test
+    public void testMultisetTypeEvolution() {
+        // Old type: MULTISET<INT>
+        MultisetType oldType = new MultisetType(true, DataTypes.INT());
+
+        // New type: MULTISET<BIGINT>
+        MultisetType newType = new MultisetType(true, DataTypes.BIGINT());
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT, 
convertAction);
+    }
+
+    @Test
+    public void testIncompatibleMultisetTypeEvolution() {
+        // Old type: MULTISET<INT>
+        MultisetType oldType = new MultisetType(true, DataTypes.INT());
+
+        // New type: MULTISET<STRING>
+        MultisetType newType = new MultisetType(true, DataTypes.STRING());
+
+        UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+                UpdatedDataFieldsProcessFunctionBase.canConvert(
+                        oldType, newType, TypeMapping.defaultMapping());
+        
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION, 
convertAction);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 10f437e8bd..d791d33d89 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -34,6 +34,7 @@ import org.apache.paimon.function.FunctionDefinition;
 import org.apache.paimon.function.FunctionImpl;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.NestedSchemaUtils;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -43,8 +44,6 @@ import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Preconditions;
@@ -122,13 +121,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -672,132 +669,8 @@ public class FlinkCatalog extends AbstractCatalog {
             org.apache.paimon.types.DataType oldType,
             org.apache.paimon.types.DataType newType,
             List<SchemaChange> schemaChanges) {
-        String joinedNames = String.join(".", fieldNames);
-        if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
-            Preconditions.checkArgument(
-                    newType.getTypeRoot() == DataTypeRoot.ROW,
-                    "Column %s can only be updated to row type, and cannot be 
updated to %s type",
-                    joinedNames,
-                    newType.getTypeRoot());
-            org.apache.paimon.types.RowType oldRowType = 
(org.apache.paimon.types.RowType) oldType;
-            org.apache.paimon.types.RowType newRowType = 
(org.apache.paimon.types.RowType) newType;
-
-            // check that existing fields have same order
-            Map<String, Integer> oldFieldOrders = new HashMap<>();
-            for (int i = 0; i < oldRowType.getFieldCount(); i++) {
-                oldFieldOrders.put(oldRowType.getFields().get(i).name(), i);
-            }
-            int lastIdx = -1;
-            String lastFieldName = "";
-            for (DataField newField : newRowType.getFields()) {
-                String name = newField.name();
-                if (oldFieldOrders.containsKey(name)) {
-                    int idx = oldFieldOrders.get(name);
-                    Preconditions.checkState(
-                            lastIdx < idx,
-                            "Order of existing fields in column %s must be 
kept the same. "
-                                    + "However, field %s and %s have changed 
their orders.",
-                            joinedNames,
-                            lastFieldName,
-                            name);
-                    lastIdx = idx;
-                    lastFieldName = name;
-                }
-            }
 
-            // drop fields
-            Set<String> newFieldNames = new 
HashSet<>(newRowType.getFieldNames());
-            for (String name : oldRowType.getFieldNames()) {
-                if (!newFieldNames.contains(name)) {
-                    List<String> dropColumnNames = new ArrayList<>(fieldNames);
-                    dropColumnNames.add(name);
-                    schemaChanges.add(
-                            
SchemaChange.dropColumn(dropColumnNames.toArray(new String[0])));
-                }
-            }
-
-            for (int i = 0; i < newRowType.getFieldCount(); i++) {
-                DataField field = newRowType.getFields().get(i);
-                String name = field.name();
-                List<String> fullFieldNames = new ArrayList<>(fieldNames);
-                fullFieldNames.add(name);
-                if (!oldFieldOrders.containsKey(name)) {
-                    // add fields
-                    SchemaChange.Move move;
-                    if (i == 0) {
-                        move = SchemaChange.Move.first(name);
-                    } else {
-                        String lastName = newRowType.getFields().get(i - 
1).name();
-                        move = SchemaChange.Move.after(name, lastName);
-                    }
-                    schemaChanges.add(
-                            SchemaChange.addColumn(
-                                    fullFieldNames.toArray(new String[0]),
-                                    field.type(),
-                                    field.description(),
-                                    move));
-                } else {
-                    // update existing fields
-                    DataField oldField = 
oldRowType.getFields().get(oldFieldOrders.get(name));
-                    if (!Objects.equals(oldField.description(), 
field.description())) {
-                        schemaChanges.add(
-                                SchemaChange.updateColumnComment(
-                                        fullFieldNames.toArray(new String[0]),
-                                        field.description()));
-                    }
-                    generateNestedColumnUpdates(
-                            fullFieldNames, oldField.type(), field.type(), 
schemaChanges);
-                }
-            }
-        } else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) {
-            Preconditions.checkArgument(
-                    newType.getTypeRoot() == DataTypeRoot.ARRAY,
-                    "Column %s can only be updated to array type, and cannot 
be updated to %s type",
-                    joinedNames,
-                    newType);
-            List<String> fullFieldNames = new ArrayList<>(fieldNames);
-            // add a dummy column name indicating the element of array
-            fullFieldNames.add("element");
-            generateNestedColumnUpdates(
-                    fullFieldNames,
-                    ((org.apache.paimon.types.ArrayType) 
oldType).getElementType(),
-                    ((org.apache.paimon.types.ArrayType) 
newType).getElementType(),
-                    schemaChanges);
-        } else if (oldType.getTypeRoot() == DataTypeRoot.MAP) {
-            Preconditions.checkArgument(
-                    newType.getTypeRoot() == DataTypeRoot.MAP,
-                    "Column %s can only be updated to map type, and cannot be 
updated to %s type",
-                    joinedNames,
-                    newType);
-            org.apache.paimon.types.MapType oldMapType = 
(org.apache.paimon.types.MapType) oldType;
-            org.apache.paimon.types.MapType newMapType = 
(org.apache.paimon.types.MapType) newType;
-            Preconditions.checkArgument(
-                    oldMapType.getKeyType().equals(newMapType.getKeyType()),
-                    "Cannot update key type of column %s from %s type to %s 
type",
-                    joinedNames,
-                    oldMapType.getKeyType(),
-                    newMapType.getKeyType());
-            List<String> fullFieldNames = new ArrayList<>(fieldNames);
-            // add a dummy column name indicating the value of map
-            fullFieldNames.add("value");
-            generateNestedColumnUpdates(
-                    fullFieldNames,
-                    oldMapType.getValueType(),
-                    newMapType.getValueType(),
-                    schemaChanges);
-        } else {
-            if (!oldType.equalsIgnoreNullable(newType)) {
-                schemaChanges.add(
-                        SchemaChange.updateColumnType(
-                                fieldNames.toArray(new String[0]), newType, 
false));
-            }
-        }
-
-        if (oldType.isNullable() != newType.isNullable()) {
-            schemaChanges.add(
-                    SchemaChange.updateColumnNullability(
-                            fieldNames.toArray(new String[0]), 
newType.isNullable()));
-        }
+        NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType, 
newType, schemaChanges);
     }
 
     /**


Reply via email to