This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 34153b386 [core][spark] Support renaming nested columns in Spark
(#4489)
34153b386 is described below
commit 34153b386468d563250f3f3df7da5e57c69acf1d
Author: tsreaper <[email protected]>
AuthorDate: Mon Nov 11 15:19:15 2024 +0800
[core][spark] Support renaming nested columns in Spark (#4489)
---
.../org/apache/paimon/schema/SchemaChange.java | 20 ++--
.../org/apache/paimon/schema/SchemaManager.java | 121 ++++++++++++++-------
.../org/apache/paimon/schema/SchemaValidation.java | 8 +-
.../org/apache/paimon/catalog/CatalogTestBase.java | 10 +-
.../apache/paimon/schema/SchemaManagerTest.java | 57 ++++++++++
.../apache/paimon/table/SchemaEvolutionTest.java | 2 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 3 +-
.../paimon/spark/SparkSchemaEvolutionITCase.java | 29 +++++
8 files changed, 189 insertions(+), 61 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 1b4c58e30..7e94b0a77 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -67,7 +67,11 @@ public interface SchemaChange extends Serializable {
}
static SchemaChange renameColumn(String fieldName, String newName) {
- return new RenameColumn(fieldName, newName);
+ return new RenameColumn(Collections.singletonList(fieldName), newName);
+ }
+
+ static SchemaChange renameColumn(List<String> fieldNames, String newName) {
+ return new RenameColumn(fieldNames, newName);
}
static SchemaChange dropColumn(String fieldName) {
@@ -278,16 +282,16 @@ public interface SchemaChange extends Serializable {
private static final long serialVersionUID = 1L;
- private final String fieldName;
+ private final List<String> fieldNames;
private final String newName;
- private RenameColumn(String fieldName, String newName) {
- this.fieldName = fieldName;
+ private RenameColumn(List<String> fieldNames, String newName) {
+ this.fieldNames = fieldNames;
this.newName = newName;
}
- public String fieldName() {
- return fieldName;
+ public List<String> fieldNames() {
+ return fieldNames;
}
public String newName() {
@@ -303,14 +307,14 @@ public interface SchemaChange extends Serializable {
return false;
}
RenameColumn that = (RenameColumn) o;
- return Objects.equals(fieldName, that.fieldName)
+ return Objects.equals(fieldNames, that.fieldNames)
&& Objects.equals(newName, that.newName);
}
@Override
public int hashCode() {
int result = Objects.hash(newName);
- result = 31 * result + Objects.hashCode(fieldName);
+ result = 31 * result + Objects.hashCode(fieldNames);
return result;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 6b4127cee..5ffeca65d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -290,18 +290,11 @@ public class SchemaManager implements Serializable {
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(),
highestFieldId);
- new
NestedColumnModifier<Catalog.ColumnAlreadyExistException>(
- addColumn.fieldNames().toArray(new String[0])) {
+ new
NestedColumnModifier(addColumn.fieldNames().toArray(new String[0])) {
@Override
protected void updateLastColumn(List<DataField>
newFields, String fieldName)
throws Catalog.ColumnAlreadyExistException {
- for (DataField field : newFields) {
- if (field.name().equals(fieldName)) {
- throw new
Catalog.ColumnAlreadyExistException(
-
identifierFromPath(tableRoot.toString(), true, branch),
- String.join(".",
addColumn.fieldNames()));
- }
- }
+ assertColumnNotExists(newFields, fieldName);
DataField dataField =
new DataField(id, fieldName, dataType,
addColumn.description());
@@ -327,34 +320,39 @@ public class SchemaManager implements Serializable {
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
renameColumnValidation(oldTableSchema, rename);
- if (newFields.stream().anyMatch(f ->
f.name().equals(rename.newName()))) {
- throw new Catalog.ColumnAlreadyExistException(
- identifierFromPath(tableRoot.toString(), true,
branch),
- rename.fieldName());
- }
+ new NestedColumnModifier(rename.fieldNames().toArray(new
String[0])) {
+ @Override
+ protected void updateLastColumn(List<DataField>
newFields, String fieldName)
+ throws Catalog.ColumnNotExistException,
+ Catalog.ColumnAlreadyExistException {
+ assertColumnExists(newFields, fieldName);
+ assertColumnNotExists(newFields, rename.newName());
+ for (int i = 0; i < newFields.size(); i++) {
+ DataField field = newFields.get(i);
+ if (!field.name().equals(fieldName)) {
+ continue;
+ }
- updateNestedColumn(
- newFields,
- new String[] {rename.fieldName()},
- (field) ->
- new DataField(
- field.id(),
- rename.newName(),
- field.type(),
- field.description()));
+ DataField newField =
+ new DataField(
+ field.id(),
+ rename.newName(),
+ field.type(),
+ field.description());
+ newFields.set(i, newField);
+ return;
+ }
+ }
+ }.updateIntermediateColumn(newFields, 0);
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
dropColumnValidation(oldTableSchema, drop);
- new NestedColumnModifier<Catalog.ColumnNotExistException>(
- drop.fieldNames().toArray(new String[0])) {
+ new NestedColumnModifier(drop.fieldNames().toArray(new
String[0])) {
@Override
protected void updateLastColumn(List<DataField>
newFields, String fieldName)
throws Catalog.ColumnNotExistException {
- if (!newFields.removeIf(f ->
f.name().equals(fieldName))) {
- throw new Catalog.ColumnNotExistException(
-
identifierFromPath(tableRoot.toString(), true, branch),
- String.join(".", drop.fieldNames()));
- }
+ assertColumnExists(newFields, fieldName);
+ newFields.removeIf(f ->
f.name().equals(fieldName));
if (newFields.isEmpty()) {
throw new IllegalArgumentException(
"Cannot drop all fields in table");
@@ -438,7 +436,7 @@ public class SchemaManager implements Serializable {
new Schema(
newFields,
oldTableSchema.partitionKeys(),
- applyColumnRename(
+ applyNotNestedColumnRename(
oldTableSchema.primaryKeys(),
Iterables.filter(changes,
RenameColumn.class)),
applySchemaChanges(newOptions, changes),
@@ -553,7 +551,8 @@ public class SchemaManager implements Serializable {
if (!StringUtils.isNullOrWhitespaceOnly(bucketKeysStr)) {
List<String> bucketColumns =
Arrays.asList(bucketKeysStr.split(","));
List<String> newBucketColumns =
- applyColumnRename(bucketColumns, Iterables.filter(changes,
RenameColumn.class));
+ applyNotNestedColumnRename(
+ bucketColumns, Iterables.filter(changes,
RenameColumn.class));
newOptions.put(BUCKET_KEY.key(),
Joiner.on(',').join(newBucketColumns));
}
@@ -561,9 +560,9 @@ public class SchemaManager implements Serializable {
return newOptions;
}
- // Apply column rename changes to the list of column names, this will not
change the order of
- // the column names
- private static List<String> applyColumnRename(
+ // Apply column rename changes on not nested columns to the list of column
names, this will not
+ // change the order of the column names
+ private static List<String> applyNotNestedColumnRename(
List<String> columns, Iterable<RenameColumn> renames) {
if (Iterables.isEmpty(renames)) {
return columns;
@@ -571,7 +570,9 @@ public class SchemaManager implements Serializable {
Map<String, String> columnNames = Maps.newHashMap();
for (RenameColumn renameColumn : renames) {
- columnNames.put(renameColumn.fieldName(), renameColumn.newName());
+ if (renameColumn.fieldNames().size() == 1) {
+ columnNames.put(renameColumn.fieldNames().get(0),
renameColumn.newName());
+ }
}
// The order of the column names will be preserved, as a non-parallel
stream is used here.
@@ -594,14 +595,18 @@ public class SchemaManager implements Serializable {
}
private static void renameColumnValidation(TableSchema schema,
RenameColumn change) {
- String columnToRename = change.fieldName();
+ // partition keys can't be nested columns
+ if (change.fieldNames().size() > 1) {
+ return;
+ }
+ String columnToRename = change.fieldNames().get(0);
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
String.format("Cannot rename partition column: [%s]",
columnToRename));
}
}
- private abstract class NestedColumnModifier<E extends Exception> {
+ private abstract class NestedColumnModifier {
private final String[] updateFieldNames;
@@ -610,7 +615,7 @@ public class SchemaManager implements Serializable {
}
public void updateIntermediateColumn(List<DataField> newFields, int
depth)
- throws Catalog.ColumnNotExistException, E {
+ throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException {
if (depth == updateFieldNames.length - 1) {
updateLastColumn(newFields, updateFieldNames[depth]);
return;
@@ -643,15 +648,47 @@ public class SchemaManager implements Serializable {
}
protected abstract void updateLastColumn(List<DataField> newFields,
String fieldName)
- throws E;
+ throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException;
+
+ protected void assertColumnExists(List<DataField> newFields, String
fieldName)
+ throws Catalog.ColumnNotExistException {
+ for (DataField field : newFields) {
+ if (field.name().equals(fieldName)) {
+ return;
+ }
+ }
+ throw new Catalog.ColumnNotExistException(
+ identifierFromPath(tableRoot.toString(), true, branch),
+ getLastFieldName(fieldName));
+ }
+
+ protected void assertColumnNotExists(List<DataField> newFields, String
fieldName)
+ throws Catalog.ColumnAlreadyExistException {
+ for (DataField field : newFields) {
+ if (field.name().equals(fieldName)) {
+ throw new Catalog.ColumnAlreadyExistException(
+ identifierFromPath(tableRoot.toString(), true,
branch),
+ getLastFieldName(fieldName));
+ }
+ }
+ }
+
+ private String getLastFieldName(String fieldName) {
+ List<String> fieldNames = new ArrayList<>();
+ for (int i = 0; i + 1 < updateFieldNames.length; i++) {
+ fieldNames.add(updateFieldNames[i]);
+ }
+ fieldNames.add(fieldName);
+ return String.join(".", fieldNames);
+ }
}
private void updateNestedColumn(
List<DataField> newFields,
String[] updateFieldNames,
Function<DataField, DataField> updateFunc)
- throws Catalog.ColumnNotExistException {
- new
NestedColumnModifier<Catalog.ColumnNotExistException>(updateFieldNames) {
+ throws Catalog.ColumnNotExistException,
Catalog.ColumnAlreadyExistException {
+ new NestedColumnModifier(updateFieldNames) {
@Override
protected void updateLastColumn(List<DataField> newFields, String
fieldName)
throws Catalog.ColumnNotExistException {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index bb9b440ce..20cbdea66 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -515,7 +515,7 @@ public class SchemaValidation {
private static void validateSequenceField(TableSchema schema, CoreOptions
options) {
List<String> sequenceField = options.sequenceField();
- if (sequenceField.size() > 0) {
+ if (!sequenceField.isEmpty()) {
Map<String, Integer> fieldCount =
sequenceField.stream()
.collect(Collectors.toMap(field -> field, field ->
1, Integer::sum));
@@ -596,12 +596,12 @@ public class SchemaValidation {
== MAP
||
dataField.type().getTypeRoot()
==
ROW))
- .map(dataField -> dataField.name())
+ .map(DataField::name)
.collect(Collectors.toList());
- if (nestedFields.size() > 0) {
+ if (!nestedFields.isEmpty()) {
throw new RuntimeException(
"nested type can not in bucket-key, in your table
these key are "
- + nestedFields.toString());
+ + nestedFields);
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 24eefbcb6..3bba6d562 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -513,7 +513,9 @@ public abstract class CatalogTestBase {
catalog.createTable(
identifier,
new Schema(
- Lists.newArrayList(new DataField(0, "col1",
DataTypes.STRING())),
+ Lists.newArrayList(
+ new DataField(0, "col1", DataTypes.STRING()),
+ new DataField(1, "col2", DataTypes.STRING())),
Collections.emptyList(),
Collections.emptyList(),
Maps.newHashMap(),
@@ -525,7 +527,7 @@ public abstract class CatalogTestBase {
false);
Table table = catalog.getTable(identifier);
- assertThat(table.rowType().getFields()).hasSize(1);
+ assertThat(table.rowType().getFields()).hasSize(2);
assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);
@@ -536,12 +538,12 @@ public abstract class CatalogTestBase {
catalog.alterTable(
identifier,
Lists.newArrayList(
-
SchemaChange.renameColumn("col1", "new_col1")),
+
SchemaChange.renameColumn("col2", "new_col1")),
false))
.satisfies(
anyCauseMatches(
Catalog.ColumnAlreadyExistException.class,
- "Column col1 already exists in the
test_db.test_table table."));
+ "Column new_col1 already exists in the
test_db.test_table table."));
// Alter table renames a column throws ColumnNotExistException when
column does not exist
assertThatThrownBy(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 1a175de24..5fb76387e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -606,4 +606,61 @@ public class SchemaManagerTest {
assertThatCode(() ->
manager.commitChanges(middleColumnNotExistDropColumn))
.hasMessageContaining("Column v.invalid does not exist");
}
+
+ @Test
+ public void testRenameNestedColumns() throws Exception {
+ RowType innerType =
+ RowType.of(
+ new DataField(4, "f1", DataTypes.INT()),
+ new DataField(5, "f2", DataTypes.BIGINT()));
+ RowType middleType =
+ RowType.of(
+ new DataField(2, "f1", DataTypes.STRING()),
+ new DataField(3, "f2", innerType));
+ RowType outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()), new
DataField(1, "v", middleType));
+
+ Schema schema =
+ new Schema(
+ outerType.getFields(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+ manager.createTable(schema);
+
+ SchemaChange renameColumn =
+ SchemaChange.renameColumn(Arrays.asList("v", "f2", "f1"),
"f100");
+ manager.commitChanges(renameColumn);
+
+ innerType =
+ RowType.of(
+ new DataField(4, "f100", DataTypes.INT()),
+ new DataField(5, "f2", DataTypes.BIGINT()));
+ middleType =
+ RowType.of(
+ new DataField(2, "f1", DataTypes.STRING()),
+ new DataField(3, "f2", innerType));
+ outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()), new
DataField(1, "v", middleType));
+
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+
+ SchemaChange middleColumnNotExistRenameColumn =
+ SchemaChange.renameColumn(Arrays.asList("v", "invalid", "f2"),
"f200");
+ assertThatCode(() ->
manager.commitChanges(middleColumnNotExistRenameColumn))
+ .hasMessageContaining("Column v.invalid does not exist");
+
+ SchemaChange lastColumnNotExistRenameColumn =
+ SchemaChange.renameColumn(Arrays.asList("v", "f2", "invalid"),
"new_invalid");
+ assertThatCode(() ->
manager.commitChanges(lastColumnNotExistRenameColumn))
+ .hasMessageContaining("Column v.f2.invalid does not exist");
+
+ SchemaChange newNameAlreadyExistRenameColumn =
+ SchemaChange.renameColumn(Arrays.asList("v", "f2", "f2"),
"f100");
+ assertThatCode(() ->
manager.commitChanges(newNameAlreadyExistRenameColumn))
+ .hasMessageContaining("Column v.f2.f100 already exists");
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 44c3beea7..951539299 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -353,7 +353,7 @@ public class SchemaEvolutionTest {
.hasMessage(
String.format(
"Column %s already exists in the %s table.",
- "f0", identifier.getFullName()));
+ "f1", identifier.getFullName()));
}
@Test
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 2ac1d032c..82c8939ea 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -379,8 +379,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
move);
} else if (change instanceof TableChange.RenameColumn) {
TableChange.RenameColumn rename = (TableChange.RenameColumn)
change;
- validateAlterNestedField(rename.fieldNames());
- return SchemaChange.renameColumn(rename.fieldNames()[0],
rename.newName());
+ return
SchemaChange.renameColumn(Arrays.asList(rename.fieldNames()), rename.newName());
} else if (change instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn delete = (TableChange.DeleteColumn)
change;
return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index e876a0027..ccae59e88 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -788,4 +788,33 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
"[4,[42,[402,null,4002],four]]",
"[5,[53,[503,500.03,5003],five]]");
}
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testRenameNestedColumn(String formatType) {
+ String tableName = "testRenameNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1:
STRING, f2: INT>>) "
+ + "TBLPROPERTIES ('file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2,
STRUCT(20, STRUCT('banana', 200)))");
+ assertThat(
+ spark.sql("SELECT v.f2.f1, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
+
+ spark.sql("ALTER TABLE paimon.default." + tableName + " RENAME COLUMN
v.f2.f1 to f100");
+ assertThat(
+ spark.sql("SELECT v.f2.f100, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
+ }
}