This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e2eff2f9d1 [core] fix column rename when columns referenced by table 
options. (#5964)
e2eff2f9d1 is described below

commit e2eff2f9d12ed7d274105033feab73843d7f986e
Author: Yujiang Zhong <[email protected]>
AuthorDate: Thu Jul 31 13:42:09 2025 +0800

    [core] fix column rename when columns referenced by table options. (#5964)
---
 .../org/apache/paimon/schema/SchemaManager.java    | 115 +++++++++++++++++-
 .../apache/paimon/table/SchemaEvolutionTest.java   | 132 +++++++++++++++++++++
 2 files changed, 241 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 2d8a6052fd..64275c9b4e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -51,9 +51,11 @@ import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.StringUtils;
 
-import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;
+import 
org.apache.paimon.shade.guava30.com.google.common.collect.FluentIterable;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Streams;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -72,14 +74,23 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
+import static org.apache.paimon.CoreOptions.AGG_FUNCTION;
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.DISTINCT;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+import static org.apache.paimon.CoreOptions.IGNORE_RETRACT;
+import static org.apache.paimon.CoreOptions.LIST_AGG_DELIMITER;
+import static org.apache.paimon.CoreOptions.NESTED_KEY;
+import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
 import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
 import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
+import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
 import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
@@ -715,17 +726,104 @@ public class SchemaManager implements Serializable {
 
     private static Map<String, String> applySchemaChanges(
             Map<String, String> options, Iterable<SchemaChange> changes) {
+        Iterable<RenameColumn> renameColumns =
+                FluentIterable.from(changes).filter(RenameColumn.class);
+
+        if (Iterables.isEmpty(renameColumns)) {
+            return options;
+        }
+
         Map<String, String> newOptions = Maps.newHashMap(options);
+
+        Map<String, String> renameMappings =
+                Streams.stream(renameColumns)
+                        .collect(
+                                Collectors.toMap(
+                                        // currently only non-nested columns 
are supported
+                                        rename -> rename.fieldNames()[0],
+                                        RenameColumn::newName));
+
+        // case 1: the option key is fixed and only value may contain field 
names
+
+        // bucket key rename
         String bucketKeysStr = options.get(BUCKET_KEY.key());
         if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) {
             List<String> bucketColumns = 
Arrays.asList(bucketKeysStr.split(","));
             List<String> newBucketColumns =
-                    applyNotNestedColumnRename(
-                            bucketColumns, Iterables.filter(changes, 
RenameColumn.class));
-            newOptions.put(BUCKET_KEY.key(), 
Joiner.on(',').join(newBucketColumns));
+                    applyNotNestedColumnRename(bucketColumns, renameMappings);
+            newOptions.put(BUCKET_KEY.key(), String.join(",", 
newBucketColumns));
+        }
+
+        // sequence field rename
+        String sequenceFieldsStr = options.get(SEQUENCE_FIELD.key());
+        if (!StringUtils.isNullOrWhitespaceOnly(sequenceFieldsStr)) {
+            List<String> sequenceFields = 
Arrays.asList(sequenceFieldsStr.split(","));
+            List<String> newSequenceFields =
+                    applyNotNestedColumnRename(sequenceFields, renameMappings);
+            newOptions.put(SEQUENCE_FIELD.key(), String.join(",", 
newSequenceFields));
+        }
+
+        // case 2: the option key is composed of certain fixed prefixes, 
suffixes, and the field
+        // name, while the option value doesn't contain field names.
+        List<Function<String, String>> fieldNameToOptionKeys =
+                ImmutableList.of(
+                        // NESTED_KEY is not added since renaming nested 
columns is not supported
+                        // currently
+                        fieldName -> FIELDS_PREFIX + "." + fieldName + "." + 
AGG_FUNCTION,
+                        fieldName -> FIELDS_PREFIX + "." + fieldName + "." + 
IGNORE_RETRACT,
+                        fieldName -> FIELDS_PREFIX + "." + fieldName + "." + 
DISTINCT,
+                        fieldName -> FIELDS_PREFIX + "." + fieldName + "." + 
LIST_AGG_DELIMITER);
+
+        for (RenameColumn rename : renameColumns) {
+            String fieldName = rename.fieldNames()[0];
+            String newFieldName = rename.newName();
+
+            for (Function<String, String> fieldNameToKey : 
fieldNameToOptionKeys) {
+                String key = fieldNameToKey.apply(fieldName);
+                if (newOptions.containsKey(key)) {
+                    String value = newOptions.remove(key);
+                    newOptions.put(fieldNameToKey.apply(newFieldName), value);
+                }
+            }
+        }
+
+        // case 3: both option key and option value may contain field names
+        for (String key : options.keySet()) {
+            if (key.startsWith(FIELDS_PREFIX)) {
+                String matchedSuffix = null;
+                if (key.endsWith(SEQUENCE_GROUP)) {
+                    matchedSuffix = SEQUENCE_GROUP;
+                } else if (key.endsWith(NESTED_KEY)) {
+                    matchedSuffix = NESTED_KEY;
+                }
+
+                if (matchedSuffix != null) {
+                    // Both the key and value may contain field names. If we 
were to perform a
+                    // "match then replace" operation, the conditions would 
become quite complex.
+                    // Instead, we directly make a replacement across all 
instances
+                    String keyFieldsStr =
+                            key.substring(
+                                    FIELDS_PREFIX.length() + 1,
+                                    key.length() - matchedSuffix.length() - 1);
+                    List<String> keyFields = 
Arrays.asList(keyFieldsStr.split(","));
+                    List<String> newKeyFields =
+                            applyNotNestedColumnRename(keyFields, 
renameMappings);
+
+                    String valueFieldsStr = newOptions.remove(key);
+                    List<String> valueFields = 
Arrays.asList(valueFieldsStr.split(","));
+                    List<String> newValueFields =
+                            applyNotNestedColumnRename(valueFields, 
renameMappings);
+                    newOptions.put(
+                            FIELDS_PREFIX
+                                    + "."
+                                    + String.join(",", newKeyFields)
+                                    + "."
+                                    + matchedSuffix,
+                            String.join(",", newValueFields));
+                }
+            }
         }
 
-        // TODO: Apply changes to other options that contain column names, 
such as `sequence.field`
         return newOptions;
     }
 
@@ -743,10 +841,15 @@ public class SchemaManager implements Serializable {
                 columnNames.put(renameColumn.fieldNames()[0], 
renameColumn.newName());
             }
         }
+        return applyNotNestedColumnRename(columns, columnNames);
+    }
+
+    private static List<String> applyNotNestedColumnRename(
+            List<String> columns, Map<String, String> renameMapping) {
 
         // The order of the column names will be preserved, as a non-parallel 
stream is used here.
         return columns.stream()
-                .map(column -> columnNames.getOrDefault(column, column))
+                .map(column -> renameMapping.getOrDefault(column, column))
                 .collect(Collectors.toList());
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 24955dc94a..ad540d58b4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -39,6 +39,7 @@ import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -58,9 +59,17 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 
+import static org.apache.paimon.CoreOptions.AGG_FUNCTION;
+import static org.apache.paimon.CoreOptions.DISTINCT;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+import static org.apache.paimon.CoreOptions.IGNORE_RETRACT;
+import static org.apache.paimon.CoreOptions.NESTED_KEY;
+import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
+import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
 import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
 import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -460,6 +469,129 @@ public class SchemaEvolutionTest {
         assertThat(rows).containsExactlyInAnyOrder("2, 2", "3, 3");
     }
 
+    @Test
+    public void testRenameFieldReferencedByOptions() throws Exception {
+        ImmutableMap.Builder<String, String> mapBuilder = 
ImmutableMap.builder();
+
+        Schema schema =
+                new Schema(
+                        ImmutableList.of(
+                                new DataField(0, "f0", DataTypes.INT()),
+                                new DataField(1, "f1", DataTypes.INT()),
+                                new DataField(2, "f2", DataTypes.INT()),
+                                new DataField(3, "f3", DataTypes.INT()),
+                                new DataField(
+                                        4,
+                                        "f4",
+                                        DataTypes.ARRAY(
+                                                DataTypes.ROW(
+                                                        new DataField(5, "f5", 
DataTypes.INT())))),
+                                new DataField(6, "f6", 
DataTypes.ARRAY(DataTypes.INT()))),
+                        ImmutableList.of("f0"),
+                        ImmutableList.of(),
+                        mapBuilder
+                                .put(SEQUENCE_FIELD.key(), "f1,f2")
+                                .put(FIELDS_PREFIX + "." + "f3" + "." + 
IGNORE_RETRACT, "true")
+                                .put(
+                                        FIELDS_PREFIX + "." + "f4" + "." + 
AGG_FUNCTION,
+                                        "nested_update")
+                                .put(FIELDS_PREFIX + "." + "f4" + "." + 
NESTED_KEY, "f5")
+                                .put(FIELDS_PREFIX + "." + "f6" + "." + 
AGG_FUNCTION, "collect")
+                                .put(FIELDS_PREFIX + "." + "f6" + "." + 
DISTINCT, "true")
+                                .build(),
+                        "");
+
+        schemaManager.createTable(schema);
+
+        TableSchema newSchema =
+                schemaManager.commitChanges(
+                        SchemaChange.renameColumn("f1", "f1_"),
+                        SchemaChange.renameColumn("f2", "f2_"),
+                        SchemaChange.renameColumn("f3", "f3_"),
+                        SchemaChange.renameColumn("f4", "f4_"),
+                        // doesn't support rename nested columns currently
+                        // SchemaChange.renameColumn("f5", "f5_"),
+                        SchemaChange.renameColumn("f6", "f6_"));
+
+        assertThat(newSchema.fieldNames()).containsExactly("f0", "f1_", "f2_", 
"f3_", "f4_", "f6_");
+
+        assertThat(newSchema.options())
+                .doesNotContainKeys(
+                        FIELDS_PREFIX + "." + "f3" + "." + IGNORE_RETRACT,
+                        FIELDS_PREFIX + "." + "f4" + "." + AGG_FUNCTION,
+                        FIELDS_PREFIX + "." + "f4" + "." + NESTED_KEY,
+                        FIELDS_PREFIX + "." + "f6" + "." + AGG_FUNCTION,
+                        FIELDS_PREFIX + "." + "f6" + "." + DISTINCT);
+
+        Map.Entry[] entries =
+                ImmutableMap.of(
+                                SEQUENCE_FIELD.key(),
+                                "f1_,f2_",
+                                FIELDS_PREFIX + "." + "f3_" + "." + 
IGNORE_RETRACT,
+                                "true",
+                                FIELDS_PREFIX + "." + "f4_" + "." + 
AGG_FUNCTION,
+                                "nested_update",
+                                FIELDS_PREFIX + "." + "f6_" + "." + 
AGG_FUNCTION,
+                                "collect",
+                                FIELDS_PREFIX + "." + "f6_" + "." + DISTINCT,
+                                "true")
+                        .entrySet()
+                        .toArray(new Map.Entry[0]);
+
+        assertThat(newSchema.options()).contains(entries);
+    }
+
+    @Test
+    public void testRenameSeqGroupFields() throws Exception {
+        ImmutableMap.Builder<String, String> mapBuilder = 
ImmutableMap.builder();
+
+        Schema schema =
+                new Schema(
+                        ImmutableList.of(
+                                new DataField(0, "f0", DataTypes.INT()),
+                                new DataField(1, "f1", DataTypes.INT()),
+                                new DataField(2, "f2", DataTypes.INT()),
+                                new DataField(3, "f3", DataTypes.INT()),
+                                new DataField(4, "f4", DataTypes.INT()),
+                                new DataField(5, "f5", DataTypes.INT()),
+                                new DataField(6, "f6", DataTypes.INT())),
+                        ImmutableList.of("f0"),
+                        ImmutableList.of(),
+                        mapBuilder
+                                .put(FIELDS_PREFIX + "." + "f1,f2" + "." + 
SEQUENCE_GROUP, "f3")
+                                .put(FIELDS_PREFIX + "." + "f4" + "." + 
SEQUENCE_GROUP, "f5,f6")
+                                .build(),
+                        "");
+
+        schemaManager.createTable(schema);
+
+        TableSchema newSchema =
+                schemaManager.commitChanges(
+                        SchemaChange.renameColumn("f1", "f1_"),
+                        SchemaChange.renameColumn("f2", "f2_"),
+                        SchemaChange.renameColumn("f3", "f3_"),
+                        SchemaChange.renameColumn("f4", "f4_"),
+                        SchemaChange.renameColumn("f5", "f5_"),
+                        SchemaChange.renameColumn("f6", "f6_"));
+
+        assertThat(newSchema.fieldNames())
+                .containsExactly("f0", "f1_", "f2_", "f3_", "f4_", "f5_", 
"f6_");
+
+        assertThat(newSchema.options())
+                .doesNotContainKeys(
+                        FIELDS_PREFIX + "." + "f1,f2" + "." + SEQUENCE_GROUP,
+                        FIELDS_PREFIX + "." + "f4" + "." + SEQUENCE_GROUP);
+
+        Map.Entry[] entries =
+                ImmutableMap.of(
+                                FIELDS_PREFIX + "." + "f1_,f2_" + "." + 
SEQUENCE_GROUP, "f3_",
+                                FIELDS_PREFIX + "." + "f4_" + "." + 
SEQUENCE_GROUP, "f5_,f6_")
+                        .entrySet()
+                        .toArray(new Map.Entry[0]);
+
+        assertThat(newSchema.options()).contains(entries);
+    }
+
     private List<String> readRecords(FileStoreTable table, Predicate filter) 
throws IOException {
         List<String> results = new ArrayList<>();
         forEachRemaining(

Reply via email to