This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e2eff2f9d1 [core] fix column rename when columns referenced by table
options. (#5964)
e2eff2f9d1 is described below
commit e2eff2f9d12ed7d274105033feab73843d7f986e
Author: Yujiang Zhong <[email protected]>
AuthorDate: Thu Jul 31 13:42:09 2025 +0800
[core] fix column rename when columns referenced by table options. (#5964)
---
.../org/apache/paimon/schema/SchemaManager.java | 115 +++++++++++++++++-
.../apache/paimon/table/SchemaEvolutionTest.java | 132 +++++++++++++++++++++
2 files changed, 241 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 2d8a6052fd..64275c9b4e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -51,9 +51,11 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;
-import org.apache.paimon.shade.guava30.com.google.common.base.Joiner;
+import
org.apache.paimon.shade.guava30.com.google.common.collect.FluentIterable;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Streams;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -72,14 +74,23 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
+import static org.apache.paimon.CoreOptions.AGG_FUNCTION;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.DISTINCT;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+import static org.apache.paimon.CoreOptions.IGNORE_RETRACT;
+import static org.apache.paimon.CoreOptions.LIST_AGG_DELIMITER;
+import static org.apache.paimon.CoreOptions.NESTED_KEY;
+import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
+import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -715,17 +726,104 @@ public class SchemaManager implements Serializable {
private static Map<String, String> applySchemaChanges(
Map<String, String> options, Iterable<SchemaChange> changes) {
+ Iterable<RenameColumn> renameColumns =
+ FluentIterable.from(changes).filter(RenameColumn.class);
+
+ if (Iterables.isEmpty(renameColumns)) {
+ return options;
+ }
+
Map<String, String> newOptions = Maps.newHashMap(options);
+
+ Map<String, String> renameMappings =
+ Streams.stream(renameColumns)
+ .collect(
+ Collectors.toMap(
+ // currently only non-nested columns
are supported
+ rename -> rename.fieldNames()[0],
+ RenameColumn::newName));
+
+ // case 1: the option key is fixed and only value may contain field
names
+
+ // bucket key rename
String bucketKeysStr = options.get(BUCKET_KEY.key());
if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) {
List<String> bucketColumns =
Arrays.asList(bucketKeysStr.split(","));
List<String> newBucketColumns =
- applyNotNestedColumnRename(
- bucketColumns, Iterables.filter(changes,
RenameColumn.class));
- newOptions.put(BUCKET_KEY.key(),
Joiner.on(',').join(newBucketColumns));
+ applyNotNestedColumnRename(bucketColumns, renameMappings);
+ newOptions.put(BUCKET_KEY.key(), String.join(",",
newBucketColumns));
+ }
+
+ // sequence field rename
+ String sequenceFieldsStr = options.get(SEQUENCE_FIELD.key());
+ if (!StringUtils.isNullOrWhitespaceOnly(sequenceFieldsStr)) {
+ List<String> sequenceFields =
Arrays.asList(sequenceFieldsStr.split(","));
+ List<String> newSequenceFields =
+ applyNotNestedColumnRename(sequenceFields, renameMappings);
+ newOptions.put(SEQUENCE_FIELD.key(), String.join(",",
newSequenceFields));
+ }
+
+ // case 2: the option key is composed of certain fixed prefixes,
suffixes, and the field
+ // name, while the option value doesn't contain field names.
+ List<Function<String, String>> fieldNameToOptionKeys =
+ ImmutableList.of(
+ // NESTED_KEY is not added since renaming nested
columns is not supported
+ // currently
+ fieldName -> FIELDS_PREFIX + "." + fieldName + "." +
AGG_FUNCTION,
+ fieldName -> FIELDS_PREFIX + "." + fieldName + "." +
IGNORE_RETRACT,
+ fieldName -> FIELDS_PREFIX + "." + fieldName + "." +
DISTINCT,
+ fieldName -> FIELDS_PREFIX + "." + fieldName + "." +
LIST_AGG_DELIMITER);
+
+ for (RenameColumn rename : renameColumns) {
+ String fieldName = rename.fieldNames()[0];
+ String newFieldName = rename.newName();
+
+ for (Function<String, String> fieldNameToKey :
fieldNameToOptionKeys) {
+ String key = fieldNameToKey.apply(fieldName);
+ if (newOptions.containsKey(key)) {
+ String value = newOptions.remove(key);
+ newOptions.put(fieldNameToKey.apply(newFieldName), value);
+ }
+ }
+ }
+
+ // case 3: both option key and option value may contain field names
+ for (String key : options.keySet()) {
+ if (key.startsWith(FIELDS_PREFIX)) {
+ String matchedSuffix = null;
+ if (key.endsWith(SEQUENCE_GROUP)) {
+ matchedSuffix = SEQUENCE_GROUP;
+ } else if (key.endsWith(NESTED_KEY)) {
+ matchedSuffix = NESTED_KEY;
+ }
+
+ if (matchedSuffix != null) {
+ // Both the key and value may contain field names. If we
were to perform a
+ // "match then replace" operation, the conditions would
become quite complex.
+ // Instead, we directly make a replacement across all
instances
+ String keyFieldsStr =
+ key.substring(
+ FIELDS_PREFIX.length() + 1,
+ key.length() - matchedSuffix.length() - 1);
+ List<String> keyFields =
Arrays.asList(keyFieldsStr.split(","));
+ List<String> newKeyFields =
+ applyNotNestedColumnRename(keyFields,
renameMappings);
+
+ String valueFieldsStr = newOptions.remove(key);
+ List<String> valueFields =
Arrays.asList(valueFieldsStr.split(","));
+ List<String> newValueFields =
+ applyNotNestedColumnRename(valueFields,
renameMappings);
+ newOptions.put(
+ FIELDS_PREFIX
+ + "."
+ + String.join(",", newKeyFields)
+ + "."
+ + matchedSuffix,
+ String.join(",", newValueFields));
+ }
+ }
}
- // TODO: Apply changes to other options that contain column names,
such as `sequence.field`
return newOptions;
}
@@ -743,10 +841,15 @@ public class SchemaManager implements Serializable {
columnNames.put(renameColumn.fieldNames()[0],
renameColumn.newName());
}
}
+ return applyNotNestedColumnRename(columns, columnNames);
+ }
+
+ private static List<String> applyNotNestedColumnRename(
+ List<String> columns, Map<String, String> renameMapping) {
// The order of the column names will be preserved, as a non-parallel
stream is used here.
return columns.stream()
- .map(column -> columnNames.getOrDefault(column, column))
+ .map(column -> renameMapping.getOrDefault(column, column))
.collect(Collectors.toList());
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 24955dc94a..ad540d58b4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -39,6 +39,7 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -58,9 +59,17 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
+import static org.apache.paimon.CoreOptions.AGG_FUNCTION;
+import static org.apache.paimon.CoreOptions.DISTINCT;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+import static org.apache.paimon.CoreOptions.IGNORE_RETRACT;
+import static org.apache.paimon.CoreOptions.NESTED_KEY;
+import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD;
+import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -460,6 +469,129 @@ public class SchemaEvolutionTest {
assertThat(rows).containsExactlyInAnyOrder("2, 2", "3, 3");
}
+ @Test
+ public void testRenameFieldReferencedByOptions() throws Exception {
+ ImmutableMap.Builder<String, String> mapBuilder =
ImmutableMap.builder();
+
+ Schema schema =
+ new Schema(
+ ImmutableList.of(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.INT()),
+ new DataField(3, "f3", DataTypes.INT()),
+ new DataField(
+ 4,
+ "f4",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ new DataField(5, "f5",
DataTypes.INT())))),
+ new DataField(6, "f6",
DataTypes.ARRAY(DataTypes.INT()))),
+ ImmutableList.of("f0"),
+ ImmutableList.of(),
+ mapBuilder
+ .put(SEQUENCE_FIELD.key(), "f1,f2")
+ .put(FIELDS_PREFIX + "." + "f3" + "." +
IGNORE_RETRACT, "true")
+ .put(
+ FIELDS_PREFIX + "." + "f4" + "." +
AGG_FUNCTION,
+ "nested_update")
+ .put(FIELDS_PREFIX + "." + "f4" + "." +
NESTED_KEY, "f5")
+ .put(FIELDS_PREFIX + "." + "f6" + "." +
AGG_FUNCTION, "collect")
+ .put(FIELDS_PREFIX + "." + "f6" + "." +
DISTINCT, "true")
+ .build(),
+ "");
+
+ schemaManager.createTable(schema);
+
+ TableSchema newSchema =
+ schemaManager.commitChanges(
+ SchemaChange.renameColumn("f1", "f1_"),
+ SchemaChange.renameColumn("f2", "f2_"),
+ SchemaChange.renameColumn("f3", "f3_"),
+ SchemaChange.renameColumn("f4", "f4_"),
+ // doesn't support rename nested columns currently
+ // SchemaChange.renameColumn("f5", "f5_"),
+ SchemaChange.renameColumn("f6", "f6_"));
+
+ assertThat(newSchema.fieldNames()).containsExactly("f0", "f1_", "f2_",
"f3_", "f4_", "f6_");
+
+ assertThat(newSchema.options())
+ .doesNotContainKeys(
+ FIELDS_PREFIX + "." + "f3" + "." + IGNORE_RETRACT,
+ FIELDS_PREFIX + "." + "f4" + "." + AGG_FUNCTION,
+ FIELDS_PREFIX + "." + "f4" + "." + NESTED_KEY,
+ FIELDS_PREFIX + "." + "f6" + "." + AGG_FUNCTION,
+ FIELDS_PREFIX + "." + "f6" + "." + DISTINCT);
+
+ Map.Entry[] entries =
+ ImmutableMap.of(
+ SEQUENCE_FIELD.key(),
+ "f1_,f2_",
+ FIELDS_PREFIX + "." + "f3_" + "." +
IGNORE_RETRACT,
+ "true",
+ FIELDS_PREFIX + "." + "f4_" + "." +
AGG_FUNCTION,
+ "nested_update",
+ FIELDS_PREFIX + "." + "f6_" + "." +
AGG_FUNCTION,
+ "collect",
+ FIELDS_PREFIX + "." + "f6_" + "." + DISTINCT,
+ "true")
+ .entrySet()
+ .toArray(new Map.Entry[0]);
+
+ assertThat(newSchema.options()).contains(entries);
+ }
+
+ @Test
+ public void testRenameSeqGroupFields() throws Exception {
+ ImmutableMap.Builder<String, String> mapBuilder =
ImmutableMap.builder();
+
+ Schema schema =
+ new Schema(
+ ImmutableList.of(
+ new DataField(0, "f0", DataTypes.INT()),
+ new DataField(1, "f1", DataTypes.INT()),
+ new DataField(2, "f2", DataTypes.INT()),
+ new DataField(3, "f3", DataTypes.INT()),
+ new DataField(4, "f4", DataTypes.INT()),
+ new DataField(5, "f5", DataTypes.INT()),
+ new DataField(6, "f6", DataTypes.INT())),
+ ImmutableList.of("f0"),
+ ImmutableList.of(),
+ mapBuilder
+ .put(FIELDS_PREFIX + "." + "f1,f2" + "." +
SEQUENCE_GROUP, "f3")
+ .put(FIELDS_PREFIX + "." + "f4" + "." +
SEQUENCE_GROUP, "f5,f6")
+ .build(),
+ "");
+
+ schemaManager.createTable(schema);
+
+ TableSchema newSchema =
+ schemaManager.commitChanges(
+ SchemaChange.renameColumn("f1", "f1_"),
+ SchemaChange.renameColumn("f2", "f2_"),
+ SchemaChange.renameColumn("f3", "f3_"),
+ SchemaChange.renameColumn("f4", "f4_"),
+ SchemaChange.renameColumn("f5", "f5_"),
+ SchemaChange.renameColumn("f6", "f6_"));
+
+ assertThat(newSchema.fieldNames())
+ .containsExactly("f0", "f1_", "f2_", "f3_", "f4_", "f5_",
"f6_");
+
+ assertThat(newSchema.options())
+ .doesNotContainKeys(
+ FIELDS_PREFIX + "." + "f1,f2" + "." + SEQUENCE_GROUP,
+ FIELDS_PREFIX + "." + "f4" + "." + SEQUENCE_GROUP);
+
+ Map.Entry[] entries =
+ ImmutableMap.of(
+ FIELDS_PREFIX + "." + "f1_,f2_" + "." +
SEQUENCE_GROUP, "f3_",
+ FIELDS_PREFIX + "." + "f4_" + "." +
SEQUENCE_GROUP, "f5_,f6_")
+ .entrySet()
+ .toArray(new Map.Entry[0]);
+
+ assertThat(newSchema.options()).contains(entries);
+ }
+
private List<String> readRecords(FileStoreTable table, Predicate filter)
throws IOException {
List<String> results = new ArrayList<>();
forEachRemaining(