This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 769e531e0e [flink-cdc] Added support for nested schema evolution in
CDC (#5682)
769e531e0e is described below
commit 769e531e0edd3c2af8dca5a6271161d7bcb695e4
Author: 0dunay0 <[email protected]>
AuthorDate: Mon Jun 9 08:51:49 2025 +0100
[flink-cdc] Added support for nested schema evolution in CDC (#5682)
---
.../apache/paimon/schema/NestedSchemaUtils.java | 272 +++++++++
.../paimon/schema/NestedSchemaUtilsTest.java | 669 +++++++++++++++++++++
...MultiTableUpdatedDataFieldsProcessFunction.java | 2 +-
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 2 +-
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 144 ++++-
.../UpdatedDataFieldsProcessFunctionBaseTest.java | 512 ++++++++++++++++
.../java/org/apache/paimon/flink/FlinkCatalog.java | 131 +---
7 files changed, 1595 insertions(+), 137 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/NestedSchemaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/schema/NestedSchemaUtils.java
new file mode 100644
index 0000000000..56d5fd99b3
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/NestedSchemaUtils.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Utility class for handling nested column schema changes. This provides
shared logic for both
+ * FlinkCatalog and CDC sources to handle schema evolution for nested types
(arrays, maps,
+ * multisets, rows) consistently.
+ */
+public class NestedSchemaUtils {
+
+ /**
+ * Generates nested column updates for schema evolution. Handles all
nested types: ROW, ARRAY,
+ * MAP, MULTISET. Creates proper field paths with markers like "element"
for arrays and "value"
+ * for maps.
+ *
+ * @param fieldNames The current field path as a list of field names
+ * @param oldType The old data type
+ * @param newType The new data type
+ * @param schemaChanges List to collect the generated schema changes
+ */
+ public static void generateNestedColumnUpdates(
+ List<String> fieldNames,
+ DataType oldType,
+ DataType newType,
+ List<SchemaChange> schemaChanges) {
+
+ if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
+ handleRowTypeUpdate(fieldNames, oldType, newType, schemaChanges);
+ } else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) {
+ handleArrayTypeUpdate(fieldNames, oldType, newType, schemaChanges);
+ } else if (oldType.getTypeRoot() == DataTypeRoot.MAP) {
+ handleMapTypeUpdate(fieldNames, oldType, newType, schemaChanges);
+ } else if (oldType.getTypeRoot() == DataTypeRoot.MULTISET) {
+ handleMultisetTypeUpdate(fieldNames, oldType, newType,
schemaChanges);
+ } else {
+ // For primitive types, update the column type directly
+ handlePrimitiveTypeUpdate(fieldNames, oldType, newType,
schemaChanges);
+ }
+
+ // Handle nullability changes for all types
+ handleNullabilityChange(fieldNames, oldType, newType, schemaChanges);
+ }
+
+ private static void handleRowTypeUpdate(
+ List<String> fieldNames,
+ DataType oldType,
+ DataType newType,
+ List<SchemaChange> schemaChanges) {
+
+ String joinedNames = String.join(".", fieldNames);
+
+ Preconditions.checkArgument(
+ newType.getTypeRoot() == DataTypeRoot.ROW,
+ "Column %s can only be updated to row type, and cannot be
updated to %s type",
+ joinedNames,
+ newType.getTypeRoot());
+
+ RowType oldRowType = (RowType) oldType;
+ RowType newRowType = (RowType) newType;
+
+ // check that existing fields maintain their order
+ Map<String, Integer> oldFieldOrders = new HashMap<>();
+ for (int i = 0; i < oldRowType.getFieldCount(); i++) {
+ oldFieldOrders.put(oldRowType.getFields().get(i).name(), i);
+ }
+
+ int lastIdx = -1;
+ String lastFieldName = "";
+ for (DataField newField : newRowType.getFields()) {
+ String name = newField.name();
+ if (oldFieldOrders.containsKey(name)) {
+ int idx = oldFieldOrders.get(name);
+ Preconditions.checkState(
+ lastIdx < idx,
+ "Order of existing fields in column %s must be kept
the same. "
+ + "However, field %s and %s have changed their
orders.",
+ joinedNames,
+ lastFieldName,
+ name);
+ lastIdx = idx;
+ lastFieldName = name;
+ }
+ }
+
+ // drop fields
+ Set<String> newFieldNames = new HashSet<>(newRowType.getFieldNames());
+ for (String name : oldRowType.getFieldNames()) {
+ if (!newFieldNames.contains(name)) {
+ List<String> dropColumnNames = new ArrayList<>(fieldNames);
+ dropColumnNames.add(name);
+
schemaChanges.add(SchemaChange.dropColumn(dropColumnNames.toArray(new
String[0])));
+ }
+ }
+
+ for (int i = 0; i < newRowType.getFieldCount(); i++) {
+ DataField field = newRowType.getFields().get(i);
+ String name = field.name();
+ List<String> fullFieldNames = new ArrayList<>(fieldNames);
+ fullFieldNames.add(name);
+
+ if (!oldFieldOrders.containsKey(name)) {
+ // add fields
+ SchemaChange.Move move;
+ if (i == 0) {
+ move = SchemaChange.Move.first(name);
+ } else {
+ String lastName = newRowType.getFields().get(i - 1).name();
+ move = SchemaChange.Move.after(name, lastName);
+ }
+ schemaChanges.add(
+ SchemaChange.addColumn(
+ fullFieldNames.toArray(new String[0]),
+ field.type(),
+ field.description(),
+ move));
+ } else {
+ // update existing fields
+ DataField oldField =
oldRowType.getFields().get(oldFieldOrders.get(name));
+ if (!Objects.equals(oldField.description(),
field.description())) {
+ schemaChanges.add(
+ SchemaChange.updateColumnComment(
+ fullFieldNames.toArray(new String[0]),
field.description()));
+ }
+ generateNestedColumnUpdates(
+ fullFieldNames, oldField.type(), field.type(),
schemaChanges);
+ }
+ }
+ }
+
+ private static void handleArrayTypeUpdate(
+ List<String> fieldNames,
+ DataType oldType,
+ DataType newType,
+ List<SchemaChange> schemaChanges) {
+
+ String joinedNames = String.join(".", fieldNames);
+ Preconditions.checkArgument(
+ newType.getTypeRoot() == DataTypeRoot.ARRAY,
+ "Column %s can only be updated to array type, and cannot be
updated to %s type",
+ joinedNames,
+ newType);
+
+ List<String> fullFieldNames = new ArrayList<>(fieldNames);
+ // add a dummy column name indicating the element of array
+ fullFieldNames.add("element");
+
+ generateNestedColumnUpdates(
+ fullFieldNames,
+ ((ArrayType) oldType).getElementType(),
+ ((ArrayType) newType).getElementType(),
+ schemaChanges);
+ }
+
+ private static void handleMapTypeUpdate(
+ List<String> fieldNames,
+ DataType oldType,
+ DataType newType,
+ List<SchemaChange> schemaChanges) {
+
+ String joinedNames = String.join(".", fieldNames);
+ Preconditions.checkArgument(
+ newType.getTypeRoot() == DataTypeRoot.MAP,
+ "Column %s can only be updated to map type, and cannot be
updated to %s type",
+ joinedNames,
+ newType);
+
+ MapType oldMapType = (MapType) oldType;
+ MapType newMapType = (MapType) newType;
+
+ Preconditions.checkArgument(
+ oldMapType.getKeyType().equals(newMapType.getKeyType()),
+ "Cannot update key type of column %s from %s type to %s type",
+ joinedNames,
+ oldMapType.getKeyType(),
+ newMapType.getKeyType());
+
+ List<String> fullFieldNames = new ArrayList<>(fieldNames);
+ // add a dummy column name indicating the value of map
+ fullFieldNames.add("value");
+
+ generateNestedColumnUpdates(
+ fullFieldNames,
+ oldMapType.getValueType(),
+ newMapType.getValueType(),
+ schemaChanges);
+ }
+
+ private static void handleMultisetTypeUpdate(
+ List<String> fieldNames,
+ DataType oldType,
+ DataType newType,
+ List<SchemaChange> schemaChanges) {
+
+ String joinedNames = String.join(".", fieldNames);
+
+ Preconditions.checkArgument(
+ newType.getTypeRoot() == DataTypeRoot.MULTISET,
+ "Column %s can only be updated to multiset type, and cannot be
updated to %s type",
+ joinedNames,
+ newType);
+
+ List<String> fullFieldNames = new ArrayList<>(fieldNames);
+ // Add the special "element" marker for multiset element access
+ fullFieldNames.add("element");
+
+ generateNestedColumnUpdates(
+ fullFieldNames,
+ ((MultisetType) oldType).getElementType(),
+ ((MultisetType) newType).getElementType(),
+ schemaChanges);
+ }
+
+ private static void handlePrimitiveTypeUpdate(
+ List<String> fieldNames,
+ DataType oldType,
+ DataType newType,
+ List<SchemaChange> schemaChanges) {
+
+ if (!oldType.equalsIgnoreNullable(newType)) {
+ schemaChanges.add(
+ SchemaChange.updateColumnType(
+ fieldNames.toArray(new String[0]), newType,
false));
+ }
+ }
+
+ private static void handleNullabilityChange(
+ List<String> fieldNames,
+ DataType oldType,
+ DataType newType,
+ List<SchemaChange> schemaChanges) {
+
+ if (oldType.isNullable() != newType.isNullable()) {
+ schemaChanges.add(
+ SchemaChange.updateColumnNullability(
+ fieldNames.toArray(new String[0]),
newType.isNullable()));
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/NestedSchemaUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/NestedSchemaUtilsTest.java
new file mode 100644
index 0000000000..ae5f7ada39
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/NestedSchemaUtilsTest.java
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link NestedSchemaUtils}. */
+public class NestedSchemaUtilsTest {
+
+ @Test
+ public void testPrimitiveTypeUpdate() {
+ List<String> fieldNames = Arrays.asList("column1");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Test type conversion (INT -> BIGINT)
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, DataTypes.INT(), DataTypes.BIGINT(),
schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+ SchemaChange.UpdateColumnType typeChange =
+ (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+ assertThat(typeChange.fieldNames()).containsExactly("column1");
+ assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+ }
+
+ @Test
+ public void testPrimitiveTypeUpdateWithSameType() {
+ List<String> fieldNames = Arrays.asList("column1");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Same type should not generate any changes
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, DataTypes.INT(), DataTypes.INT(), schemaChanges);
+
+ assertThat(schemaChanges).isEmpty();
+ }
+
+ @Test
+ public void testNullabilityChangeOnly() {
+ List<String> fieldNames = Arrays.asList("column1");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Test nullable to non-nullable
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, DataTypes.INT().nullable(),
DataTypes.INT().notNull(), schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnNullability.class);
+ SchemaChange.UpdateColumnNullability nullabilityChange =
+ (SchemaChange.UpdateColumnNullability) schemaChanges.get(0);
+ assertThat(nullabilityChange.fieldNames()).containsExactly("column1");
+ assertThat(nullabilityChange.newNullability()).isFalse();
+ }
+
+ @Test
+ public void testTypeAndNullabilityChange() {
+ List<String> fieldNames = Arrays.asList("column1");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Test type change and nullability change together
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames,
+ DataTypes.INT().nullable(),
+ DataTypes.BIGINT().notNull(),
+ schemaChanges);
+
+ assertThat(schemaChanges).hasSize(2);
+
+ // Should have both type and nullability changes
+ assertThat(schemaChanges)
+ .anyMatch(change -> change instanceof
SchemaChange.UpdateColumnType);
+ assertThat(schemaChanges)
+ .anyMatch(change -> change instanceof
SchemaChange.UpdateColumnNullability);
+ }
+
+ @Test
+ public void testArrayTypeUpdateElement() {
+ List<String> fieldNames = Arrays.asList("arr_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ ArrayType oldType = new ArrayType(true, DataTypes.INT());
+ ArrayType newType = new ArrayType(true, DataTypes.BIGINT());
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+ SchemaChange.UpdateColumnType typeChange =
+ (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+ assertThat(typeChange.fieldNames()).containsExactly("arr_column",
"element");
+ assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+ }
+
+ @Test
+ public void testArrayTypeUpdateNullability() {
+ List<String> fieldNames = Arrays.asList("arr_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ ArrayType oldType = new ArrayType(true, DataTypes.INT());
+ ArrayType newType = new ArrayType(false, DataTypes.INT());
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnNullability.class);
+ SchemaChange.UpdateColumnNullability nullabilityChange =
+ (SchemaChange.UpdateColumnNullability) schemaChanges.get(0);
+
assertThat(nullabilityChange.fieldNames()).containsExactly("arr_column");
+ assertThat(nullabilityChange.newNullability()).isFalse();
+ }
+
+ @Test
+ public void testArrayWithIncompatibleTypeThrowsException() {
+ List<String> fieldNames = Arrays.asList("arr_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ ArrayType oldType = new ArrayType(true, DataTypes.INT());
+ RowType newElementType = RowType.of(DataTypes.STRING()); //
Incompatible type
+
+ assertThatThrownBy(
+ () -> {
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, oldType, newElementType,
schemaChanges);
+ })
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("can only be updated to array type");
+ }
+
+ @Test
+ public void testMapTypeUpdateValue() {
+ List<String> fieldNames = Arrays.asList("map_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ MapType oldType = new MapType(true, DataTypes.STRING(),
DataTypes.INT());
+ MapType newType = new MapType(true, DataTypes.STRING(),
DataTypes.BIGINT());
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+ SchemaChange.UpdateColumnType typeChange =
+ (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+ assertThat(typeChange.fieldNames()).containsExactly("map_column",
"value");
+ assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+ }
+
+ @Test
+ public void testMapTypeUpdateNullability() {
+ List<String> fieldNames = Arrays.asList("map_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ MapType oldType = new MapType(true, DataTypes.STRING(),
DataTypes.INT());
+ MapType newType = new MapType(false, DataTypes.STRING(),
DataTypes.INT());
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnNullability.class);
+ SchemaChange.UpdateColumnNullability nullabilityChange =
+ (SchemaChange.UpdateColumnNullability) schemaChanges.get(0);
+
assertThat(nullabilityChange.fieldNames()).containsExactly("map_column");
+ assertThat(nullabilityChange.newNullability()).isFalse();
+ }
+
+ @Test
+ public void testMapWithIncompatibleKeyTypeThrowsException() {
+ List<String> fieldNames = Arrays.asList("map_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ MapType oldType = new MapType(true, DataTypes.STRING(),
DataTypes.INT());
+ MapType newType = new MapType(true, DataTypes.INT(), DataTypes.INT());
// Different key type
+
+ assertThatThrownBy(
+ () -> {
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, oldType, newType,
schemaChanges);
+ })
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Cannot update key type");
+ }
+
+ @Test
+ public void testMapWithIncompatibleTypeThrowsException() {
+ List<String> fieldNames = Arrays.asList("map_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ MapType oldType = new MapType(true, DataTypes.STRING(),
DataTypes.INT());
+ ArrayType newType = new ArrayType(true, DataTypes.INT()); //
Incompatible type
+
+ assertThatThrownBy(
+ () -> {
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, oldType, newType,
schemaChanges);
+ })
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("can only be updated to map type");
+ }
+
+ @Test
+ public void testMultisetTypeUpdateElement() {
+ List<String> fieldNames = Arrays.asList("multiset_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ MultisetType oldType = new MultisetType(true, DataTypes.INT());
+ MultisetType newType = new MultisetType(true, DataTypes.BIGINT());
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+ SchemaChange.UpdateColumnType typeChange =
+ (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+ assertThat(typeChange.fieldNames()).containsExactly("multiset_column",
"element");
+ assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+ }
+
+ @Test
+ public void testMultisetWithIncompatibleTypeThrowsException() {
+ List<String> fieldNames = Arrays.asList("multiset_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ MultisetType oldType = new MultisetType(true, DataTypes.INT());
+ MapType newType =
+ new MapType(true, DataTypes.STRING(), DataTypes.INT()); //
Incompatible type
+
+ assertThatThrownBy(
+ () -> {
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, oldType, newType,
schemaChanges);
+ })
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("can only be updated to multiset type");
+ }
+
+ @Test
+ public void testRowTypeAddField() {
+ List<String> fieldNames = Arrays.asList("row_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ RowType oldType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT()),
+ new DataField(1, "f2", DataTypes.STRING()));
+ RowType newType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT()),
+ new DataField(1, "f2", DataTypes.STRING()),
+ new DataField(2, "f3", DataTypes.DOUBLE()));
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.AddColumn.class);
+ SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn)
schemaChanges.get(0);
+ assertThat(addColumn.fieldNames()).containsExactly("row_column", "f3");
+ assertThat(addColumn.dataType()).isEqualTo(DataTypes.DOUBLE());
+ }
+
+ @Test
+ public void testRowTypeDropField() {
+ List<String> fieldNames = Arrays.asList("row_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ RowType oldType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT()),
+ new DataField(1, "f2", DataTypes.STRING()),
+ new DataField(2, "f3", DataTypes.DOUBLE()));
+ RowType newType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT()),
+ new DataField(1, "f2", DataTypes.STRING()));
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.DropColumn.class);
+ SchemaChange.DropColumn dropColumn = (SchemaChange.DropColumn)
schemaChanges.get(0);
+ assertThat(dropColumn.fieldNames()).containsExactly("row_column",
"f3");
+ }
+
+ @Test
+ public void testRowTypeUpdateExistingField() {
+ List<String> fieldNames = Arrays.asList("row_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ RowType oldType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT()),
+ new DataField(1, "f2", DataTypes.STRING()));
+ RowType newType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.BIGINT()), // Type
changed
+ new DataField(1, "f2", DataTypes.STRING()));
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+ SchemaChange.UpdateColumnType typeChange =
+ (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+ assertThat(typeChange.fieldNames()).containsExactly("row_column",
"f1");
+ assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+ }
+
+ @Test
+ public void testRowTypeUpdateFieldComment() {
+ List<String> fieldNames = Arrays.asList("row_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ RowType oldType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT(), "old comment"),
+ new DataField(1, "f2", DataTypes.STRING()));
+ RowType newType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT(), "new comment"),
+ new DataField(1, "f2", DataTypes.STRING()));
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnComment.class);
+ SchemaChange.UpdateColumnComment commentChange =
+ (SchemaChange.UpdateColumnComment) schemaChanges.get(0);
+ assertThat(commentChange.fieldNames()).containsExactly("row_column",
"f1");
+ assertThat(commentChange.newDescription()).isEqualTo("new comment");
+ }
+
+ @Test
+ public void testRowTypeReorderFields() {
+ List<String> fieldNames = Arrays.asList("row_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ RowType oldType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT()),
+ new DataField(1, "f2", DataTypes.STRING()),
+ new DataField(2, "f3", DataTypes.DOUBLE()));
+ RowType newType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT()),
+ new DataField(2, "f3", DataTypes.DOUBLE()),
+ new DataField(1, "f2", DataTypes.STRING()) // f2 and
f3 swapped
+ );
+
+ assertThatThrownBy(
+ () -> {
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, oldType, newType,
schemaChanges);
+ })
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Order of existing fields");
+ }
+
+ @Test
+ public void testRowWithIncompatibleTypeThrowsException() {
+ List<String> fieldNames = Arrays.asList("row_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ RowType oldType = RowType.of(new DataField(0, "f1", DataTypes.INT()));
+ ArrayType newType = new ArrayType(true, DataTypes.INT()); //
Incompatible type
+
+ assertThatThrownBy(
+ () -> {
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, oldType, newType,
schemaChanges);
+ })
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("can only be updated to row type");
+ }
+
+ @Test
+ public void testComplexNestedStructure() {
+ List<String> fieldNames = Arrays.asList("complex_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Old: ARRAY<MAP<STRING, ROW<f1 INT, f2 STRING>>>
+ RowType oldInnerRowType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT()),
+ new DataField(1, "f2", DataTypes.STRING()));
+ MapType oldMapType = new MapType(true, DataTypes.STRING(),
oldInnerRowType);
+ ArrayType oldType = new ArrayType(true, oldMapType);
+
+ // New: ARRAY<MAP<STRING, ROW<f1 BIGINT, f2 STRING, f3 DOUBLE>>>
+ RowType newInnerRowType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.BIGINT()), // Type
changed
+ new DataField(1, "f2", DataTypes.STRING()),
+ new DataField(2, "f3", DataTypes.DOUBLE()) // New
field added
+ );
+ MapType newMapType = new MapType(true, DataTypes.STRING(),
newInnerRowType);
+ ArrayType newType = new ArrayType(true, newMapType);
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(2);
+
+ // Should have one type update and one add column
+ SchemaChange.UpdateColumnType typeChange =
+ schemaChanges.stream()
+ .filter(change -> change instanceof
SchemaChange.UpdateColumnType)
+ .map(change -> (SchemaChange.UpdateColumnType) change)
+ .findFirst()
+ .orElse(null);
+ assertThat(typeChange).isNotNull();
+ assertThat(typeChange.fieldNames())
+ .containsExactly("complex_column", "element", "value", "f1");
+ assertThat(typeChange.newDataType()).isEqualTo(DataTypes.BIGINT());
+
+ SchemaChange.AddColumn addColumn =
+ schemaChanges.stream()
+ .filter(change -> change instanceof
SchemaChange.AddColumn)
+ .map(change -> (SchemaChange.AddColumn) change)
+ .findFirst()
+ .orElse(null);
+ assertThat(addColumn).isNotNull();
+ assertThat(addColumn.fieldNames())
+ .containsExactly("complex_column", "element", "value", "f3");
+ assertThat(addColumn.dataType()).isEqualTo(DataTypes.DOUBLE());
+ }
+
+ @Test
+ public void testNestedFieldPaths() {
+ List<String> fieldNames = Arrays.asList("outer", "inner");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Test that nested field names are properly combined
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, DataTypes.INT(), DataTypes.BIGINT(),
schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+ SchemaChange.UpdateColumnType typeChange =
+ (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+ assertThat(typeChange.fieldNames()).containsExactly("outer", "inner");
+ }
+
+ @Test
+ public void testEmptyFieldNames() {
+ List<String> fieldNames = Collections.emptyList();
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ fieldNames, DataTypes.INT(), DataTypes.BIGINT(),
schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+ SchemaChange.UpdateColumnType typeChange =
+ (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+ assertThat(typeChange.fieldNames()).isEmpty();
+ }
+
+ @Test
+ public void testVarCharTypeExtension() {
+ List<String> fieldNames = Arrays.asList("varchar_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ VarCharType oldType = new VarCharType(true, 10);
+ VarCharType newType = new VarCharType(true, 20);
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(1);
+
assertThat(schemaChanges.get(0)).isInstanceOf(SchemaChange.UpdateColumnType.class);
+ SchemaChange.UpdateColumnType typeChange =
+ (SchemaChange.UpdateColumnType) schemaChanges.get(0);
+ assertThat(typeChange.fieldNames()).containsExactly("varchar_column");
+ assertThat(typeChange.newDataType()).isEqualTo(newType);
+ }
+
+ @Test
+ public void testMultipleNestedChanges() {
+ List<String> fieldNames = Arrays.asList("root");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Old: ROW<f1 INT NULL, f2 STRING>
+ RowType oldType =
+ RowType.of(
+ new DataField(0, "f1", DataTypes.INT().nullable()),
+ new DataField(1, "f2", DataTypes.STRING()));
+
+ // New: ROW<f1 BIGINT NOT NULL, f2 STRING, f3 DOUBLE>
+ RowType newType =
+ RowType.of(
+ new DataField(
+ 0,
+ "f1",
+ DataTypes.BIGINT().notNull()), // Type and
nullability changed
+ new DataField(1, "f2", DataTypes.STRING()),
+ new DataField(2, "f3", DataTypes.DOUBLE()) // New field
+ );
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(3);
+
+ // Should have: add column, update type, update nullability
+ assertThat(schemaChanges).anyMatch(change -> change instanceof
SchemaChange.AddColumn);
+ assertThat(schemaChanges)
+ .anyMatch(change -> change instanceof
SchemaChange.UpdateColumnType);
+ assertThat(schemaChanges)
+ .anyMatch(change -> change instanceof
SchemaChange.UpdateColumnNullability);
+ }
+
+ @Test
+ public void testArrayOfRowsWithFieldChanges() {
+ List<String> fieldNames = Arrays.asList("array_of_rows");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Old: ARRAY<ROW<id INT, name STRING>>
+ RowType oldRowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING()));
+ ArrayType oldType = new ArrayType(true, oldRowType);
+
+ // New: ARRAY<ROW<id BIGINT, name STRING, active BOOLEAN>>
+ RowType newRowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.BIGINT()), // Type
changed
+ new DataField(1, "name", DataTypes.STRING()),
+ new DataField(2, "active", DataTypes.BOOLEAN()) // New
field
+ );
+ ArrayType newType = new ArrayType(true, newRowType);
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(2);
+
+ // Verify field paths include "element" for array access
+ SchemaChange.UpdateColumnType typeChange =
+ schemaChanges.stream()
+ .filter(change -> change instanceof
SchemaChange.UpdateColumnType)
+ .map(change -> (SchemaChange.UpdateColumnType) change)
+ .findFirst()
+ .orElse(null);
+ assertThat(typeChange).isNotNull();
+ assertThat(typeChange.fieldNames()).containsExactly("array_of_rows",
"element", "id");
+
+ SchemaChange.AddColumn addColumn =
+ schemaChanges.stream()
+ .filter(change -> change instanceof
SchemaChange.AddColumn)
+ .map(change -> (SchemaChange.AddColumn) change)
+ .findFirst()
+ .orElse(null);
+ assertThat(addColumn).isNotNull();
+ assertThat(addColumn.fieldNames()).containsExactly("array_of_rows",
"element", "active");
+ }
+
+ @Test
+ public void testMapOfRowsWithFieldChanges() {
+ List<String> fieldNames = Arrays.asList("map_of_rows");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Old: MAP<STRING, ROW<id INT, name STRING>>
+ RowType oldRowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING()));
+ MapType oldType = new MapType(true, DataTypes.STRING(), oldRowType);
+
+ // New: MAP<STRING, ROW<id BIGINT, name STRING, active BOOLEAN>>
+ RowType newRowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.BIGINT()), // Type
changed
+ new DataField(1, "name", DataTypes.STRING()),
+ new DataField(2, "active", DataTypes.BOOLEAN()) // New
field
+ );
+ MapType newType = new MapType(true, DataTypes.STRING(), newRowType);
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(2);
+
+ // Verify field paths include "value" for map value access
+ SchemaChange.UpdateColumnType typeChange =
+ schemaChanges.stream()
+ .filter(change -> change instanceof
SchemaChange.UpdateColumnType)
+ .map(change -> (SchemaChange.UpdateColumnType) change)
+ .findFirst()
+ .orElse(null);
+ assertThat(typeChange).isNotNull();
+ assertThat(typeChange.fieldNames()).containsExactly("map_of_rows",
"value", "id");
+
+ SchemaChange.AddColumn addColumn =
+ schemaChanges.stream()
+ .filter(change -> change instanceof
SchemaChange.AddColumn)
+ .map(change -> (SchemaChange.AddColumn) change)
+ .findFirst()
+ .orElse(null);
+ assertThat(addColumn).isNotNull();
+ assertThat(addColumn.fieldNames()).containsExactly("map_of_rows",
"value", "active");
+ }
+
+ @Test
+ public void testMultisetOfComplexType() {
+ List<String> fieldNames = Arrays.asList("multiset_column");
+ List<SchemaChange> schemaChanges = new ArrayList<>();
+
+ // Old: MULTISET<ROW<id INT>>
+ RowType oldRowType = RowType.of(new DataField(0, "id",
DataTypes.INT()));
+ MultisetType oldType = new MultisetType(true, oldRowType);
+
+ // New: MULTISET<ROW<id BIGINT, name STRING>>
+ RowType newRowType =
+ RowType.of(
+ new DataField(0, "id", DataTypes.BIGINT()),
+ new DataField(1, "name", DataTypes.STRING()));
+ MultisetType newType = new MultisetType(true, newRowType);
+
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
+
+ assertThat(schemaChanges).hasSize(2);
+
+ // Verify field paths include "element" for multiset element access
+ SchemaChange.UpdateColumnType typeChange =
+ schemaChanges.stream()
+ .filter(change -> change instanceof
SchemaChange.UpdateColumnType)
+ .map(change -> (SchemaChange.UpdateColumnType) change)
+ .findFirst()
+ .orElse(null);
+ assertThat(typeChange).isNotNull();
+ assertThat(typeChange.fieldNames()).containsExactly("multiset_column",
"element", "id");
+
+ SchemaChange.AddColumn addColumn =
+ schemaChanges.stream()
+ .filter(change -> change instanceof
SchemaChange.AddColumn)
+ .map(change -> (SchemaChange.AddColumn) change)
+ .findFirst()
+ .orElse(null);
+ assertThat(addColumn).isNotNull();
+ assertThat(addColumn.fieldNames()).containsExactly("multiset_column",
"element", "name");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 42effccf7f..503f3593c2 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -101,7 +101,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
updatedSchema.f1.comment());
for (SchemaChange schemaChange : extractSchemaChanges(schemaManager,
actualUpdatedSchema)) {
- applySchemaChange(schemaManager, schemaChange, tableId);
+ applySchemaChange(schemaManager, schemaChange, tableId,
actualUpdatedSchema);
}
/*
* Here, actualUpdatedDataFields cannot be used to update latestFields
because there is a
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index e1b78926cc..d4319e868e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -74,7 +74,7 @@ public class UpdatedDataFieldsProcessFunction
updatedSchema.primaryKeys(),
updatedSchema.comment());
for (SchemaChange schemaChange : extractSchemaChanges(schemaManager,
actualUpdatedSchema)) {
- applySchemaChange(schemaManager, schemaChange, identifier);
+ applySchemaChange(schemaManager, schemaChange, identifier,
actualUpdatedSchema);
}
/*
* Here, actualUpdatedDataFields cannot be used to update latestFields
because there is a
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 410425a7da..bb49de2ac1 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -22,14 +22,18 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.schema.NestedSchemaUtils;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.FieldIdentifier;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -98,7 +102,10 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
protected void applySchemaChange(
- SchemaManager schemaManager, SchemaChange schemaChange, Identifier
identifier)
+ SchemaManager schemaManager,
+ SchemaChange schemaChange,
+ Identifier identifier,
+ CdcSchema actualUpdatedSchema)
throws Exception {
if (schemaChange instanceof SchemaChange.AddColumn) {
try {
@@ -118,9 +125,6 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
} else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
SchemaChange.UpdateColumnType updateColumnType =
(SchemaChange.UpdateColumnType) schemaChange;
- Preconditions.checkState(
- updateColumnType.fieldNames().length == 1,
- "Paimon CDC currently does not support nested type schema
evolution.");
TableSchema schema =
schemaManager
.latest()
@@ -136,6 +140,19 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
+
+ // For complex types, extract the full new type from
actualUpdatedSchema
+ // to preserve type context (e.g., ARRAY<BIGINT> instead of just
BIGINT)
+ if (actualUpdatedSchema != null) {
+ String fieldName = updateColumnType.fieldNames()[0];
+ for (DataField field : actualUpdatedSchema.fields()) {
+ if (fieldName.equals(field.name())) {
+ newType = field.type();
+ break;
+ }
+ }
+ }
+
switch (canConvert(oldType, newType, typeMapping)) {
case CONVERT:
catalog.alterTable(identifier, schemaChange, false);
@@ -165,9 +182,41 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
public static ConvertAction canConvert(
DataType oldType, DataType newType, TypeMapping typeMapping) {
if (oldType.equalsIgnoreNullable(newType)) {
+ if (oldType.isNullable() && !newType.isNullable()) {
+ return ConvertAction.EXCEPTION; // Cannot make nullable field
non-nullable
+ }
return ConvertAction.CONVERT;
}
+ if (oldType.getTypeRoot() == DataTypeRoot.ARRAY
+ && newType.getTypeRoot() == DataTypeRoot.ARRAY) {
+
+ ArrayType oldArrayType = (ArrayType) oldType;
+ ArrayType newArrayType = (ArrayType) newType;
+ return canConvertArray(oldArrayType, newArrayType, typeMapping);
+ }
+
+ if (oldType.getTypeRoot() == DataTypeRoot.MAP
+ && newType.getTypeRoot() == DataTypeRoot.MAP) {
+ MapType oldMapType = (MapType) oldType;
+ MapType newMapType = (MapType) newType;
+
+ return canConvertMap(oldMapType, newMapType, typeMapping);
+ }
+
+ if (oldType.getTypeRoot() == DataTypeRoot.MULTISET
+ && newType.getTypeRoot() == DataTypeRoot.MULTISET) {
+ MultisetType oldMultisetType = (MultisetType) oldType;
+ MultisetType newMultisetType = (MultisetType) newType;
+
+ return canConvertMultisetType(oldMultisetType, newMultisetType,
typeMapping);
+ }
+
+ if (oldType.getTypeRoot() == DataTypeRoot.ROW
+ && newType.getTypeRoot() == DataTypeRoot.ROW) {
+ return canConvertRowType((RowType) oldType, (RowType) newType,
typeMapping);
+ }
+
int oldIdx = STRING_TYPES.indexOf(oldType.getTypeRoot());
int newIdx = STRING_TYPES.indexOf(newType.getTypeRoot());
if (oldIdx >= 0 && newIdx >= 0) {
@@ -223,6 +272,88 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
return ConvertAction.EXCEPTION;
}
+ private static ConvertAction canConvertArray(
+ ArrayType oldArrayType, ArrayType newArrayType, TypeMapping
typeMapping) {
+ if (oldArrayType.isNullable() && !newArrayType.isNullable()) {
+ return ConvertAction.EXCEPTION;
+ }
+
+ return canConvert(
+ oldArrayType.getElementType(), newArrayType.getElementType(),
typeMapping);
+ }
+
+ private static ConvertAction canConvertMap(
+ MapType oldMapType, MapType newMapType, TypeMapping typeMapping) {
+ if (oldMapType.isNullable() && !newMapType.isNullable()) {
+ return ConvertAction.EXCEPTION;
+ }
+
+ // For map keys, don't allow key type evolution
+ // hashcode will be different even if the value of the key is the same
+ if (!oldMapType.getKeyType().equals(newMapType.getKeyType())) {
+ return ConvertAction.EXCEPTION;
+ }
+
+ return canConvert(oldMapType.getValueType(),
newMapType.getValueType(), typeMapping);
+ }
+
+ private static ConvertAction canConvertRowType(
+ RowType oldRowType, RowType newRowType, TypeMapping typeMapping) {
+ Map<String, DataField> oldFieldMap = new HashMap<>();
+ for (DataField field : oldRowType.getFields()) {
+ oldFieldMap.put(field.name(), field);
+ }
+
+ Map<String, DataField> newFieldMap = new HashMap<>();
+ for (DataField field : newRowType.getFields()) {
+ newFieldMap.put(field.name(), field);
+ }
+
+ // Rule 1: Check all non-nullable fields in the old type must exist in
the new type
+ for (DataField oldField : oldRowType.getFields()) {
+ if (!oldField.type().isNullable()) {
+ if (!newFieldMap.containsKey(oldField.name())) {
+ return ConvertAction.EXCEPTION;
+ }
+ }
+ }
+
+ // Rule 2: All fields common to both schemas must have compatible types
+ boolean needsConversion = false;
+ for (DataField newField : newRowType.getFields()) {
+ DataField oldField = oldFieldMap.get(newField.name());
+ if (oldField != null) {
+ ConvertAction fieldAction =
+ canConvert(oldField.type(), newField.type(),
typeMapping);
+ if (fieldAction == ConvertAction.EXCEPTION) {
+ return ConvertAction.EXCEPTION;
+ }
+ if (fieldAction == ConvertAction.CONVERT) {
+ needsConversion = true;
+ }
+ } else {
+ // Rule 3: New fields must be nullable
+ if (!newField.type().isNullable()) {
+ return ConvertAction.EXCEPTION;
+ }
+ needsConversion = true;
+ }
+ }
+
+ return needsConversion ? ConvertAction.CONVERT : ConvertAction.IGNORE;
+ }
+
+ private static ConvertAction canConvertMultisetType(
+ MultisetType oldMultisetType, MultisetType newMultisetType,
TypeMapping typeMapping) {
+
+ if (oldMultisetType.isNullable() && !newMultisetType.isNullable()) {
+ return ConvertAction.EXCEPTION;
+ }
+
+ return canConvert(
+ oldMultisetType.getElementType(),
newMultisetType.getElementType(), typeMapping);
+ }
+
protected List<SchemaChange> extractSchemaChanges(
SchemaManager schemaManager, CdcSchema updatedSchema) {
TableSchema oldTableSchema = schemaManager.latest().get();
@@ -258,8 +389,9 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
if (oldField.type().is(DataTypeRoot.DECIMAL) &&
!allowDecimalTypeChange) {
continue;
}
- // update column type
- result.add(SchemaChange.updateColumnType(newFieldName,
newField.type()));
+ // Generate nested column updates if needed
+ NestedSchemaUtils.generateNestedColumnUpdates(
+ Arrays.asList(newFieldName), oldField.type(),
newField.type(), result);
// update column comment
if (newField.description() != null) {
result.add(
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
index 24d0a51453..a201e8308c 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBaseTest.java
@@ -19,15 +19,23 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;
import org.junit.Test;
+import java.util.Arrays;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
/** IT cases for {@link UpdatedDataFieldsProcessFunctionBaseTest}. */
@@ -104,4 +112,508 @@ public class UpdatedDataFieldsProcessFunctionBaseTest {
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE,
convertAction);
}
+
+ @Test
+ public void testArrayNullabilityEvolution() {
+ // Test 1: Nullable array to non-nullable array (should fail)
+ ArrayType oldType = new ArrayType(true, DataTypes.INT());
+ ArrayType newType = new ArrayType(false, DataTypes.INT());
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+
+ // Test 2: Non-nullable array to nullable array (should succeed)
+ oldType = new ArrayType(false, DataTypes.INT());
+ newType = new ArrayType(true, DataTypes.INT());
+
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+
+ // Test 3: Array with nullable elements to non-nullable elements
(should fail)
+ oldType = new ArrayType(true, new IntType(true));
+ newType = new ArrayType(true, new IntType(false));
+
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testNestedRowNullabilityEvolution() {
+ // Old type: ROW(f1 ROW(inner1 INT NULL, inner2 STRING) NULL)
+ RowType oldInnerType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "inner1", new IntType(true)),
+ new DataField(2, "inner2",
DataTypes.STRING())));
+ RowType oldRowType = new RowType(true, Arrays.asList(new DataField(1,
"f1", oldInnerType)));
+
+ // Test 1: Making nested row non-nullable (should fail)
+ RowType newInnerType =
+ new RowType(
+ false, // Changed to non-nullable
+ Arrays.asList(
+ new DataField(1, "inner1", new IntType(true)),
+ new DataField(2, "inner2",
DataTypes.STRING())));
+ RowType newRowType = new RowType(true, Arrays.asList(new DataField(1,
"f1", newInnerType)));
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+
+ // Test 2: Making nested field non-nullable (should fail)
+ newInnerType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(
+ 1, "inner1", new IntType(false)), //
Changed to non-nullable
+ new DataField(2, "inner2",
DataTypes.STRING())));
+ newRowType = new RowType(true, Arrays.asList(new DataField(1, "f1",
newInnerType)));
+
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testMultisetNullabilityEvolution() {
+ // Test 1: Nullable multiset to non-nullable multiset (should fail)
+ MultisetType oldType = new MultisetType(true, DataTypes.INT());
+ MultisetType newType = new MultisetType(false, DataTypes.INT());
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+
+ // Test 2: Non-nullable multiset to nullable multiset (should succeed)
+ oldType = new MultisetType(false, DataTypes.INT());
+ newType = new MultisetType(true, DataTypes.INT());
+
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+
+ // Test 3: Multiset with nullable elements to non-nullable elements
(should fail)
+ oldType = new MultisetType(true, new IntType(true));
+ newType = new MultisetType(true, new IntType(false));
+
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testComplexNestedNullabilityEvolution() {
+ // Old type: ROW(
+ // f1 ARRAY<ROW<x INT NULL, y STRING> NULL> NULL,
+ // f2 MAP<STRING, ROW<a INT NULL, b STRING> NULL> NULL
+ // )
+ RowType oldArrayInnerType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "x", new IntType(true)),
+ new DataField(2, "y", DataTypes.STRING())));
+ ArrayType oldArrayType = new ArrayType(true, oldArrayInnerType);
+
+ RowType oldMapValueType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "a", new IntType(true)),
+ new DataField(2, "b", DataTypes.STRING())));
+ MapType oldMapType = new MapType(true, DataTypes.STRING(),
oldMapValueType);
+
+ RowType oldRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", oldArrayType),
+ new DataField(2, "f2", oldMapType)));
+
+ // Test 1: Making nested array's row type non-nullable (should fail)
+ RowType newArrayInnerType =
+ new RowType(
+ false, // Changed to non-nullable
+ Arrays.asList(
+ new DataField(1, "x", new IntType(true)),
+ new DataField(2, "y", DataTypes.STRING())));
+ ArrayType newArrayType = new ArrayType(true, newArrayInnerType);
+ MapType newMapType = oldMapType; // Keep the same
+
+ RowType newRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", newArrayType),
+ new DataField(2, "f2", newMapType)));
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+
+ // Test 2: Making map's value type's field non-nullable (should fail)
+ newArrayType = oldArrayType; // Restore to original
+ RowType newMapValueType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(
+ 1, "a", new IntType(false)), //
Changed to non-nullable
+ new DataField(2, "b", DataTypes.STRING())));
+ newMapType = new MapType(true, DataTypes.STRING(), newMapValueType);
+
+ newRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", newArrayType),
+ new DataField(2, "f2", newMapType)));
+
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testNestedRowTypeConversion() {
+ // Old type: ROW(f1 INT, f2 STRING)
+ RowType oldRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.STRING())));
+
+ // New type: ROW(f1 BIGINT, f2 STRING, f3 DOUBLE)
+ RowType newRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", DataTypes.BIGINT()),
+ new DataField(2, "f2", DataTypes.STRING()),
+ new DataField(3, "f3", DataTypes.DOUBLE())));
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+ }
+
+ @Test
+ public void testNestedArrayTypeConversion() {
+ // Old type: ARRAY<INT>
+ ArrayType oldArrayType = new ArrayType(true, DataTypes.INT());
+
+ // New type: ARRAY<BIGINT>
+ ArrayType newArrayType = new ArrayType(true, DataTypes.BIGINT());
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldArrayType, newArrayType,
TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+ }
+
+ @Test
+ public void testNestedMapTypeConversion() {
+ // Old type: MAP<STRING, INT>
+ MapType oldMapType = new MapType(true, DataTypes.STRING(),
DataTypes.INT());
+
+ // New type: MAP<STRING, BIGINT>
+ MapType newMapType = new MapType(true, DataTypes.STRING(),
DataTypes.BIGINT());
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldMapType, newMapType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+ }
+
+ @Test
+ public void testMapWithNullableComplexTypes() {
+ // Old type: MAP<STRING, ROW<f1 ARRAY<INT> NULL, f2 ROW<x INT, y
STRING> NULL>>
+ RowType oldInnerRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "x", DataTypes.INT()),
+ new DataField(2, "y", DataTypes.STRING())));
+
+ RowType oldValueType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", new ArrayType(true,
DataTypes.INT())),
+ new DataField(2, "f2", oldInnerRowType)));
+
+ MapType oldType = new MapType(true, DataTypes.STRING(), oldValueType);
+
+ // New type: MAP<STRING, ROW<f1 ARRAY<BIGINT> NULL, f2 ROW<x BIGINT, y
STRING, z DOUBLE>
+ // NULL>>
+ RowType newInnerRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "x", DataTypes.BIGINT()),
+ new DataField(2, "y", DataTypes.STRING()),
+ new DataField(3, "z", DataTypes.DOUBLE())));
+
+ RowType newValueType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", new ArrayType(true,
DataTypes.BIGINT())),
+ new DataField(2, "f2", newInnerRowType)));
+
+ MapType newType = new MapType(true, DataTypes.STRING(), newValueType);
+
+ // Test compatible evolution
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+
+ // Test incompatible element type
+ RowType incompatibleValueType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(
+ 1,
+ "f1",
+ new ArrayType(
+ true, DataTypes.STRING())), //
INT to STRING is
+ // incompatible
+ new DataField(2, "f2", newInnerRowType)));
+
+ MapType incompatibleType = new MapType(true, DataTypes.STRING(),
incompatibleValueType);
+
+ convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, incompatibleType,
TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testComplexNestedTypeConversion() {
+ // Old type: ARRAY<MAP<INT, ROW(f1 INT, f2 STRING)>>
+ RowType oldInnerRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.STRING())));
+ MapType oldMapType = new MapType(true, DataTypes.INT(),
oldInnerRowType);
+ ArrayType oldType = new ArrayType(true, oldMapType);
+
+ // New type: ARRAY<MAP<INT, ROW(f1 BIGINT, f2 STRING, f3 DOUBLE)>>
+ RowType newInnerRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", DataTypes.BIGINT()),
+ new DataField(2, "f2", DataTypes.STRING()),
+ new DataField(3, "f3", DataTypes.DOUBLE())));
+ MapType newMapType = new MapType(true, DataTypes.INT(),
newInnerRowType);
+ ArrayType newType = new ArrayType(true, newMapType);
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+ }
+
+ @Test
+ public void testRowTypeWithMultipleChanges() {
+ // Old type: ROW(f1 INT, f2 STRING, f3 DOUBLE)
+ RowType oldRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.STRING()),
+ new DataField(3, "f3", DataTypes.DOUBLE())));
+
+ // New type: ROW(f1 BIGINT, f2 VARCHAR(100), f4 INT)
+ RowType newRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", DataTypes.BIGINT()),
+ new DataField(2, "f2", new VarCharType(true,
100)),
+ new DataField(4, "f4", DataTypes.INT())));
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+ }
+
+ @Test
+ public void testNonNullableFieldRemoval() {
+ // Old type: ROW(f1 INT NOT NULL, f2 STRING)
+ RowType oldRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", new IntType(false)),
+ new DataField(2, "f2", DataTypes.STRING())));
+
+ // New type: ROW(f2 STRING)
+ RowType newRowType =
+ new RowType(true, Arrays.asList(new DataField(2, "f2",
DataTypes.STRING())));
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testAddingNonNullableField() {
+ // Old type: ROW(f1 INT)
+ RowType oldRowType =
+ new RowType(true, Arrays.asList(new DataField(1, "f1",
DataTypes.INT())));
+
+ // New type: ROW(f1 INT, f2 STRING NOT NULL)
+ RowType newRowType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", new VarCharType(false,
100))));
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testNestedRowWithinRowEvolution() {
+ // Old type: ROW(f1 ROW(inner1 INT, inner2 STRING))
+ RowType oldInnerType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "inner1", DataTypes.INT()),
+ new DataField(2, "inner2",
DataTypes.STRING())));
+ RowType oldRowType = new RowType(true, Arrays.asList(new DataField(1,
"f1", oldInnerType)));
+
+ // New type: ROW(f1 ROW(inner1 BIGINT, inner2 STRING, inner3 DOUBLE))
+ RowType newInnerType =
+ new RowType(
+ true,
+ Arrays.asList(
+ new DataField(1, "inner1", DataTypes.BIGINT()),
+ new DataField(2, "inner2", DataTypes.STRING()),
+ new DataField(3, "inner3",
DataTypes.DOUBLE())));
+ RowType newRowType = new RowType(true, Arrays.asList(new DataField(1,
"f1", newInnerType)));
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldRowType, newRowType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+ }
+
+ @Test
+ public void testArrayOfArraysEvolution() {
+ // Old type: ARRAY<ARRAY<INT>>
+ ArrayType oldInnerArray = new ArrayType(true, DataTypes.INT());
+ ArrayType oldType = new ArrayType(true, oldInnerArray);
+
+ // New type: ARRAY<ARRAY<BIGINT>>
+ ArrayType newInnerArray = new ArrayType(true, DataTypes.BIGINT());
+ ArrayType newType = new ArrayType(true, newInnerArray);
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+ }
+
+ @Test
+ public void testArrayWithIncompatibleElementType() {
+ // Old type: ARRAY<INT>
+ ArrayType oldType = new ArrayType(true, DataTypes.INT());
+
+ // New type: ARRAY<STRING>
+ ArrayType newType = new ArrayType(true, DataTypes.STRING());
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testMapOfMapsEvolution() {
+ // Old type: MAP<STRING, MAP<INT, STRING>>
+ MapType oldInnerMap = new MapType(true, DataTypes.INT(),
DataTypes.STRING());
+ MapType oldType = new MapType(true, DataTypes.STRING(), oldInnerMap);
+
+ // New type: MAP<STRING, MAP<INT, VARCHAR(100)>>
+ MapType newInnerMap = new MapType(true, DataTypes.INT(), new
VarCharType(true, 100));
+ MapType newType = new MapType(true, DataTypes.STRING(), newInnerMap);
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.IGNORE,
convertAction);
+ }
+
+ @Test
+ public void testMapWithIncompatibleValueType() {
+ // Old type: MAP<STRING, INT>
+ MapType oldType = new MapType(true, DataTypes.STRING(),
DataTypes.INT());
+
+ // New type: MAP<STRING, BOOLEAN>
+ MapType newType = new MapType(true, DataTypes.STRING(),
DataTypes.BOOLEAN());
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
+
+ @Test
+ public void testMultisetTypeEvolution() {
+ // Old type: MULTISET<INT>
+ MultisetType oldType = new MultisetType(true, DataTypes.INT());
+
+ // New type: MULTISET<BIGINT>
+ MultisetType newType = new MultisetType(true, DataTypes.BIGINT());
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.CONVERT,
convertAction);
+ }
+
+ @Test
+ public void testIncompatibleMultisetTypeEvolution() {
+ // Old type: MULTISET<INT>
+ MultisetType oldType = new MultisetType(true, DataTypes.INT());
+
+ // New type: MULTISET<STRING>
+ MultisetType newType = new MultisetType(true, DataTypes.STRING());
+
+ UpdatedDataFieldsProcessFunctionBase.ConvertAction convertAction =
+ UpdatedDataFieldsProcessFunctionBase.canConvert(
+ oldType, newType, TypeMapping.defaultMapping());
+
assertEquals(UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION,
convertAction);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 10f437e8bd..d791d33d89 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -34,6 +34,7 @@ import org.apache.paimon.function.FunctionDefinition;
import org.apache.paimon.function.FunctionImpl;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.NestedSchemaUtils;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -43,8 +44,6 @@ import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
@@ -122,13 +121,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -672,132 +669,8 @@ public class FlinkCatalog extends AbstractCatalog {
org.apache.paimon.types.DataType oldType,
org.apache.paimon.types.DataType newType,
List<SchemaChange> schemaChanges) {
- String joinedNames = String.join(".", fieldNames);
- if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
- Preconditions.checkArgument(
- newType.getTypeRoot() == DataTypeRoot.ROW,
- "Column %s can only be updated to row type, and cannot be
updated to %s type",
- joinedNames,
- newType.getTypeRoot());
- org.apache.paimon.types.RowType oldRowType =
(org.apache.paimon.types.RowType) oldType;
- org.apache.paimon.types.RowType newRowType =
(org.apache.paimon.types.RowType) newType;
-
- // check that existing fields have same order
- Map<String, Integer> oldFieldOrders = new HashMap<>();
- for (int i = 0; i < oldRowType.getFieldCount(); i++) {
- oldFieldOrders.put(oldRowType.getFields().get(i).name(), i);
- }
- int lastIdx = -1;
- String lastFieldName = "";
- for (DataField newField : newRowType.getFields()) {
- String name = newField.name();
- if (oldFieldOrders.containsKey(name)) {
- int idx = oldFieldOrders.get(name);
- Preconditions.checkState(
- lastIdx < idx,
- "Order of existing fields in column %s must be
kept the same. "
- + "However, field %s and %s have changed
their orders.",
- joinedNames,
- lastFieldName,
- name);
- lastIdx = idx;
- lastFieldName = name;
- }
- }
- // drop fields
- Set<String> newFieldNames = new
HashSet<>(newRowType.getFieldNames());
- for (String name : oldRowType.getFieldNames()) {
- if (!newFieldNames.contains(name)) {
- List<String> dropColumnNames = new ArrayList<>(fieldNames);
- dropColumnNames.add(name);
- schemaChanges.add(
-
SchemaChange.dropColumn(dropColumnNames.toArray(new String[0])));
- }
- }
-
- for (int i = 0; i < newRowType.getFieldCount(); i++) {
- DataField field = newRowType.getFields().get(i);
- String name = field.name();
- List<String> fullFieldNames = new ArrayList<>(fieldNames);
- fullFieldNames.add(name);
- if (!oldFieldOrders.containsKey(name)) {
- // add fields
- SchemaChange.Move move;
- if (i == 0) {
- move = SchemaChange.Move.first(name);
- } else {
- String lastName = newRowType.getFields().get(i -
1).name();
- move = SchemaChange.Move.after(name, lastName);
- }
- schemaChanges.add(
- SchemaChange.addColumn(
- fullFieldNames.toArray(new String[0]),
- field.type(),
- field.description(),
- move));
- } else {
- // update existing fields
- DataField oldField =
oldRowType.getFields().get(oldFieldOrders.get(name));
- if (!Objects.equals(oldField.description(),
field.description())) {
- schemaChanges.add(
- SchemaChange.updateColumnComment(
- fullFieldNames.toArray(new String[0]),
- field.description()));
- }
- generateNestedColumnUpdates(
- fullFieldNames, oldField.type(), field.type(),
schemaChanges);
- }
- }
- } else if (oldType.getTypeRoot() == DataTypeRoot.ARRAY) {
- Preconditions.checkArgument(
- newType.getTypeRoot() == DataTypeRoot.ARRAY,
- "Column %s can only be updated to array type, and cannot
be updated to %s type",
- joinedNames,
- newType);
- List<String> fullFieldNames = new ArrayList<>(fieldNames);
- // add a dummy column name indicating the element of array
- fullFieldNames.add("element");
- generateNestedColumnUpdates(
- fullFieldNames,
- ((org.apache.paimon.types.ArrayType)
oldType).getElementType(),
- ((org.apache.paimon.types.ArrayType)
newType).getElementType(),
- schemaChanges);
- } else if (oldType.getTypeRoot() == DataTypeRoot.MAP) {
- Preconditions.checkArgument(
- newType.getTypeRoot() == DataTypeRoot.MAP,
- "Column %s can only be updated to map type, and cannot be
updated to %s type",
- joinedNames,
- newType);
- org.apache.paimon.types.MapType oldMapType =
(org.apache.paimon.types.MapType) oldType;
- org.apache.paimon.types.MapType newMapType =
(org.apache.paimon.types.MapType) newType;
- Preconditions.checkArgument(
- oldMapType.getKeyType().equals(newMapType.getKeyType()),
- "Cannot update key type of column %s from %s type to %s
type",
- joinedNames,
- oldMapType.getKeyType(),
- newMapType.getKeyType());
- List<String> fullFieldNames = new ArrayList<>(fieldNames);
- // add a dummy column name indicating the value of map
- fullFieldNames.add("value");
- generateNestedColumnUpdates(
- fullFieldNames,
- oldMapType.getValueType(),
- newMapType.getValueType(),
- schemaChanges);
- } else {
- if (!oldType.equalsIgnoreNullable(newType)) {
- schemaChanges.add(
- SchemaChange.updateColumnType(
- fieldNames.toArray(new String[0]), newType,
false));
- }
- }
-
- if (oldType.isNullable() != newType.isNullable()) {
- schemaChanges.add(
- SchemaChange.updateColumnNullability(
- fieldNames.toArray(new String[0]),
newType.isNullable()));
- }
+ NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
}
/**