This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5b0774556 [core][spark] Support adding and dropping nested columns in
Spark (#4483)
5b0774556 is described below
commit 5b0774556a9a08c3c0bb0cef54ea3c7ea0182094
Author: tsreaper <[email protected]>
AuthorDate: Fri Nov 8 19:37:06 2024 +0800
[core][spark] Support adding and dropping nested columns in Spark (#4483)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 2 +-
.../org/apache/paimon/schema/SchemaChange.java | 46 +++--
.../apache/paimon/schema/SchemaEvolutionUtil.java | 49 ++++-
.../org/apache/paimon/schema/SchemaManager.java | 229 ++++++++++++---------
.../org/apache/paimon/catalog/CatalogTestBase.java | 8 +-
.../apache/paimon/schema/SchemaManagerTest.java | 79 +++++++
.../java/org/apache/paimon/spark/SparkCatalog.java | 6 +-
.../org/apache/paimon/spark/SparkReadITCase.java | 5 +
.../paimon/spark/SparkSchemaEvolutionITCase.java | 83 ++++++++
9 files changed, 375 insertions(+), 132 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c5cea0c21..4c36ad4db 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -536,7 +536,7 @@ public abstract class AbstractCatalog implements Catalog {
for (SchemaChange change : changes) {
if (change instanceof SchemaChange.AddColumn) {
SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn)
change;
- fieldNames.add(addColumn.fieldName());
+ fieldNames.addAll(addColumn.fieldNames());
} else if (change instanceof SchemaChange.RenameColumn) {
SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn)
change;
fieldNames.add(rename.newName());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 1e790bf65..1b4c58e30 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -25,6 +25,8 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Objects;
/**
@@ -52,11 +54,16 @@ public interface SchemaChange extends Serializable {
}
static SchemaChange addColumn(String fieldName, DataType dataType, String
comment) {
- return new AddColumn(fieldName, dataType, comment, null);
+ return new AddColumn(Collections.singletonList(fieldName), dataType,
comment, null);
}
static SchemaChange addColumn(String fieldName, DataType dataType, String
comment, Move move) {
- return new AddColumn(fieldName, dataType, comment, move);
+ return new AddColumn(Collections.singletonList(fieldName), dataType,
comment, move);
+ }
+
+ static SchemaChange addColumn(
+ List<String> fieldNames, DataType dataType, String comment, Move
move) {
+ return new AddColumn(fieldNames, dataType, comment, move);
}
static SchemaChange renameColumn(String fieldName, String newName) {
@@ -64,7 +71,11 @@ public interface SchemaChange extends Serializable {
}
static SchemaChange dropColumn(String fieldName) {
- return new DropColumn(fieldName);
+ return new DropColumn(Collections.singletonList(fieldName));
+ }
+
+ static SchemaChange dropColumn(List<String> fieldNames) {
+ return new DropColumn(fieldNames);
}
static SchemaChange updateColumnType(String fieldName, DataType
newDataType) {
@@ -207,20 +218,21 @@ public interface SchemaChange extends Serializable {
private static final long serialVersionUID = 1L;
- private final String fieldName;
+ private final List<String> fieldNames;
private final DataType dataType;
private final String description;
private final Move move;
- private AddColumn(String fieldName, DataType dataType, String
description, Move move) {
- this.fieldName = fieldName;
+ private AddColumn(
+ List<String> fieldNames, DataType dataType, String
description, Move move) {
+ this.fieldNames = fieldNames;
this.dataType = dataType;
this.description = description;
this.move = move;
}
- public String fieldName() {
- return fieldName;
+ public List<String> fieldNames() {
+ return fieldNames;
}
public DataType dataType() {
@@ -246,7 +258,7 @@ public interface SchemaChange extends Serializable {
return false;
}
AddColumn addColumn = (AddColumn) o;
- return Objects.equals(fieldName, addColumn.fieldName)
+ return Objects.equals(fieldNames, addColumn.fieldNames)
&& dataType.equals(addColumn.dataType)
&& Objects.equals(description, addColumn.description)
&& move.equals(addColumn.move);
@@ -255,7 +267,7 @@ public interface SchemaChange extends Serializable {
@Override
public int hashCode() {
int result = Objects.hash(dataType, description);
- result = 31 * result + Objects.hashCode(fieldName);
+ result = 31 * result + Objects.hashCode(fieldNames);
result = 31 * result + Objects.hashCode(move);
return result;
}
@@ -308,14 +320,14 @@ public interface SchemaChange extends Serializable {
private static final long serialVersionUID = 1L;
- private final String fieldName;
+ private final List<String> fieldNames;
- private DropColumn(String fieldName) {
- this.fieldName = fieldName;
+ private DropColumn(List<String> fieldNames) {
+ this.fieldNames = fieldNames;
}
- public String fieldName() {
- return fieldName;
+ public List<String> fieldNames() {
+ return fieldNames;
}
@Override
@@ -327,12 +339,12 @@ public interface SchemaChange extends Serializable {
return false;
}
DropColumn that = (DropColumn) o;
- return Objects.equals(fieldName, that.fieldName);
+ return Objects.equals(fieldNames, that.fieldNames);
}
@Override
public int hashCode() {
- return Objects.hashCode(fieldName);
+ return Objects.hashCode(fieldNames);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
index 083d131ec..b5d730707 100644
---
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java
@@ -22,6 +22,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.casting.CastFieldGetter;
+import org.apache.paimon.casting.CastedRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
@@ -67,8 +68,6 @@ public class SchemaEvolutionUtil {
* data fields, -1 is the index of 6->b in data fields and 1 is the index
of 3->a in data
* fields.
*
- * <p>/// TODO should support nest index mapping when nest schema
evolution is supported.
- *
* @param tableFields the fields of table
* @param dataFields the fields of underlying data
* @return the index mapping
@@ -394,18 +393,32 @@ public class SchemaEvolutionUtil {
checkState(
!(tableField.type() instanceof MapType
|| dataField.type() instanceof ArrayType
- || dataField.type() instanceof MultisetType
- || dataField.type() instanceof RowType),
- "Only support column type evolution in atomic data
type.");
+ || dataField.type() instanceof
MultisetType),
+ "Only support column type evolution in atomic and
row data type.");
+
+ CastExecutor<?, ?> castExecutor;
+ if (tableField.type() instanceof RowType
+ && dataField.type() instanceof RowType) {
+ castExecutor =
+ createRowCastExecutor(
+ (RowType) dataField.type(), (RowType)
tableField.type());
+ } else {
+ castExecutor = CastExecutors.resolve(dataField.type(),
tableField.type());
+ }
+ checkNotNull(
+ castExecutor,
+ "Cannot cast from type "
+ + dataField.type()
+ + " to type "
+ + tableField.type());
+
// Create getter with index i and projected row data will
convert to underlying
// data
converterMapping[i] =
new CastFieldGetter(
InternalRowUtils.createNullCheckingFieldGetter(
dataField.type(), i),
- checkNotNull(
- CastExecutors.resolve(
- dataField.type(),
tableField.type())));
+ castExecutor);
castExist = true;
}
}
@@ -413,4 +426,24 @@ public class SchemaEvolutionUtil {
return castExist ? converterMapping : null;
}
+
+ private static CastExecutor<InternalRow, InternalRow>
createRowCastExecutor(
+ RowType inputType, RowType targetType) {
+ int[] indexMapping = createIndexMapping(targetType.getFields(),
inputType.getFields());
+ CastFieldGetter[] castFieldGetters =
+ createCastFieldGetterMapping(
+ targetType.getFields(), inputType.getFields(),
indexMapping);
+
+ ProjectedRow projectedRow = indexMapping == null ? null :
ProjectedRow.from(indexMapping);
+ CastedRow castedRow = castFieldGetters == null ? null :
CastedRow.from(castFieldGetters);
+ return value -> {
+ if (projectedRow != null) {
+ value = projectedRow.replaceRow(value);
+ }
+ if (castedRow != null) {
+ value = castedRow.replaceRow(value);
+ }
+ return value;
+ };
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 7b987b049..6b4127cee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -40,7 +40,6 @@ import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeCasts;
-import org.apache.paimon.types.DataTypeVisitor;
import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;
@@ -282,44 +281,52 @@ public class SchemaManager implements Serializable {
} else if (change instanceof AddColumn) {
AddColumn addColumn = (AddColumn) change;
SchemaChange.Move move = addColumn.move();
- if (newFields.stream().anyMatch(f ->
f.name().equals(addColumn.fieldName()))) {
- throw new Catalog.ColumnAlreadyExistException(
- identifierFromPath(tableRoot.toString(), true,
branch),
- addColumn.fieldName());
- }
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
"Column %s cannot specify NOT NULL in the %s
table.",
- addColumn.fieldName(),
+ String.join(".", addColumn.fieldNames()),
identifierFromPath(tableRoot.toString(), true,
branch).getFullName());
int id = highestFieldId.incrementAndGet();
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(),
highestFieldId);
- DataField dataField =
- new DataField(
- id, addColumn.fieldName(), dataType,
addColumn.description());
-
- // key: name ; value : index
- Map<String, Integer> map = new HashMap<>();
- for (int i = 0; i < newFields.size(); i++) {
- map.put(newFields.get(i).name(), i);
- }
-
- if (null != move) {
- if
(move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
- newFields.add(0, dataField);
- } else if
(move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
- int fieldIndex =
map.get(move.referenceFieldName());
- newFields.add(fieldIndex + 1, dataField);
+ new
NestedColumnModifier<Catalog.ColumnAlreadyExistException>(
+ addColumn.fieldNames().toArray(new String[0])) {
+ @Override
+ protected void updateLastColumn(List<DataField>
newFields, String fieldName)
+ throws Catalog.ColumnAlreadyExistException {
+ for (DataField field : newFields) {
+ if (field.name().equals(fieldName)) {
+ throw new
Catalog.ColumnAlreadyExistException(
+
identifierFromPath(tableRoot.toString(), true, branch),
+ String.join(".",
addColumn.fieldNames()));
+ }
+ }
+
+ DataField dataField =
+ new DataField(id, fieldName, dataType,
addColumn.description());
+
+ // key: name ; value : index
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 0; i < newFields.size(); i++) {
+ map.put(newFields.get(i).name(), i);
+ }
+
+ if (null != move) {
+ if
(move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+ newFields.add(0, dataField);
+ } else if
(move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+ int fieldIndex =
map.get(move.referenceFieldName());
+ newFields.add(fieldIndex + 1, dataField);
+ }
+ } else {
+ newFields.add(dataField);
+ }
}
- } else {
- newFields.add(dataField);
- }
-
+ }.updateIntermediateColumn(newFields, 0);
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
- columnChangeValidation(oldTableSchema, change);
+ renameColumnValidation(oldTableSchema, rename);
if (newFields.stream().anyMatch(f ->
f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
identifierFromPath(tableRoot.toString(), true,
branch),
@@ -329,7 +336,6 @@ public class SchemaManager implements Serializable {
updateNestedColumn(
newFields,
new String[] {rename.fieldName()},
- 0,
(field) ->
new DataField(
field.id(),
@@ -338,16 +344,23 @@ public class SchemaManager implements Serializable {
field.description()));
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
- columnChangeValidation(oldTableSchema, change);
- if (!newFields.removeIf(
- f -> f.name().equals(((DropColumn)
change).fieldName()))) {
- throw new Catalog.ColumnNotExistException(
- identifierFromPath(tableRoot.toString(), true,
branch),
- drop.fieldName());
- }
- if (newFields.isEmpty()) {
- throw new IllegalArgumentException("Cannot drop all
fields in table");
- }
+ dropColumnValidation(oldTableSchema, drop);
+ new NestedColumnModifier<Catalog.ColumnNotExistException>(
+ drop.fieldNames().toArray(new String[0])) {
+ @Override
+ protected void updateLastColumn(List<DataField>
newFields, String fieldName)
+ throws Catalog.ColumnNotExistException {
+ if (!newFields.removeIf(f ->
f.name().equals(fieldName))) {
+ throw new Catalog.ColumnNotExistException(
+
identifierFromPath(tableRoot.toString(), true, branch),
+ String.join(".", drop.fieldNames()));
+ }
+ if (newFields.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Cannot drop all fields in table");
+ }
+ }
+ }.updateIntermediateColumn(newFields, 0);
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
if
(oldTableSchema.partitionKeys().contains(update.fieldName())) {
@@ -356,9 +369,9 @@ public class SchemaManager implements Serializable {
"Cannot update partition column [%s]
type in the table[%s].",
update.fieldName(),
tableRoot.getName()));
}
- updateColumn(
+ updateNestedColumn(
newFields,
- update.fieldName(),
+ new String[] {update.fieldName()},
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
@@ -392,7 +405,6 @@ public class SchemaManager implements Serializable {
updateNestedColumn(
newFields,
update.fieldNames(),
- 0,
(field) ->
new DataField(
field.id(),
@@ -404,7 +416,6 @@ public class SchemaManager implements Serializable {
updateNestedColumn(
newFields,
update.fieldNames(),
- 0,
(field) ->
new DataField(
field.id(),
@@ -569,74 +580,96 @@ public class SchemaManager implements Serializable {
.collect(Collectors.toList());
}
- private static void columnChangeValidation(TableSchema schema,
SchemaChange change) {
- /// TODO support partition and primary keys schema evolution
- if (change instanceof DropColumn) {
- String columnToDrop = ((DropColumn) change).fieldName();
- if (schema.partitionKeys().contains(columnToDrop)
- || schema.primaryKeys().contains(columnToDrop)) {
- throw new UnsupportedOperationException(
- String.format(
- "Cannot drop partition key or primary key:
[%s]", columnToDrop));
- }
- } else if (change instanceof RenameColumn) {
- String columnToRename = ((RenameColumn) change).fieldName();
- if (schema.partitionKeys().contains(columnToRename)) {
- throw new UnsupportedOperationException(
- String.format("Cannot rename partition column: [%s]",
columnToRename));
- }
- } else {
- throw new IllegalArgumentException(
- String.format(
- "Validation for %s is not supported",
- change.getClass().getSimpleName()));
+ private static void dropColumnValidation(TableSchema schema, DropColumn
change) {
+ // primary keys and partition keys can't be nested columns
+ if (change.fieldNames().size() > 1) {
+ return;
+ }
+ String columnToDrop = change.fieldNames().get(0);
+ if (schema.partitionKeys().contains(columnToDrop)
+ || schema.primaryKeys().contains(columnToDrop)) {
+ throw new UnsupportedOperationException(
+ String.format("Cannot drop partition key or primary key:
[%s]", columnToDrop));
}
}
- /** This method is hacky, newFields may be immutable. We should use {@link
DataTypeVisitor}. */
- private void updateNestedColumn(
- List<DataField> newFields,
- String[] updateFieldNames,
- int index,
- Function<DataField, DataField> updateFunc)
- throws Catalog.ColumnNotExistException {
- boolean found = false;
- for (int i = 0; i < newFields.size(); i++) {
- DataField field = newFields.get(i);
- if (field.name().equals(updateFieldNames[index])) {
- found = true;
- if (index == updateFieldNames.length - 1) {
- newFields.set(i, updateFunc.apply(field));
- break;
- } else {
- List<DataField> nestedFields =
- new ArrayList<>(
- ((org.apache.paimon.types.RowType)
field.type()).getFields());
- updateNestedColumn(nestedFields, updateFieldNames, index +
1, updateFunc);
- newFields.set(
- i,
- new DataField(
- field.id(),
- field.name(),
- new org.apache.paimon.types.RowType(
- field.type().isNullable(),
nestedFields),
- field.description()));
+ private static void renameColumnValidation(TableSchema schema,
RenameColumn change) {
+ String columnToRename = change.fieldName();
+ if (schema.partitionKeys().contains(columnToRename)) {
+ throw new UnsupportedOperationException(
+ String.format("Cannot rename partition column: [%s]",
columnToRename));
+ }
+ }
+
+ private abstract class NestedColumnModifier<E extends Exception> {
+
+ private final String[] updateFieldNames;
+
+ private NestedColumnModifier(String[] updateFieldNames) {
+ this.updateFieldNames = updateFieldNames;
+ }
+
+ public void updateIntermediateColumn(List<DataField> newFields, int
depth)
+ throws Catalog.ColumnNotExistException, E {
+ if (depth == updateFieldNames.length - 1) {
+ updateLastColumn(newFields, updateFieldNames[depth]);
+ return;
+ }
+
+ for (int i = 0; i < newFields.size(); i++) {
+ DataField field = newFields.get(i);
+ if (!field.name().equals(updateFieldNames[depth])) {
+ continue;
}
+
+ List<DataField> nestedFields =
+ new ArrayList<>(
+ ((org.apache.paimon.types.RowType)
field.type()).getFields());
+ updateIntermediateColumn(nestedFields, depth + 1);
+ newFields.set(
+ i,
+ new DataField(
+ field.id(),
+ field.name(),
+ new org.apache.paimon.types.RowType(
+ field.type().isNullable(),
nestedFields),
+ field.description()));
+ return;
}
- }
- if (!found) {
+
throw new Catalog.ColumnNotExistException(
identifierFromPath(tableRoot.toString(), true, branch),
- Arrays.toString(updateFieldNames));
+ String.join(".",
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
}
+
+ protected abstract void updateLastColumn(List<DataField> newFields,
String fieldName)
+ throws E;
}
- private void updateColumn(
+ private void updateNestedColumn(
List<DataField> newFields,
- String updateFieldName,
+ String[] updateFieldNames,
Function<DataField, DataField> updateFunc)
throws Catalog.ColumnNotExistException {
- updateNestedColumn(newFields, new String[] {updateFieldName}, 0,
updateFunc);
+ new
NestedColumnModifier<Catalog.ColumnNotExistException>(updateFieldNames) {
+ @Override
+ protected void updateLastColumn(List<DataField> newFields, String
fieldName)
+ throws Catalog.ColumnNotExistException {
+ for (int i = 0; i < newFields.size(); i++) {
+ DataField field = newFields.get(i);
+ if (!field.name().equals(fieldName)) {
+ continue;
+ }
+
+ newFields.set(i, updateFunc.apply(field));
+ return;
+ }
+
+ throw new Catalog.ColumnNotExistException(
+ identifierFromPath(tableRoot.toString(), true, branch),
+ String.join(".", updateFieldNames));
+ }
+ }.updateIntermediateColumn(newFields, 0);
}
@VisibleForTesting
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index dbeedcfe5..643e1372b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -541,7 +541,7 @@ public abstract class CatalogTestBase {
.satisfies(
anyCauseMatches(
Catalog.ColumnNotExistException.class,
- "Column [non_existing_col] does not exist in
the test_db.test_table table."));
+ "Column non_existing_col does not exist in the
test_db.test_table table."));
}
@Test
@@ -647,7 +647,7 @@ public abstract class CatalogTestBase {
.satisfies(
anyCauseMatches(
Catalog.ColumnNotExistException.class,
- "Column [non_existing_col] does not exist in
the test_db.test_table table."));
+ "Column non_existing_col does not exist in the
test_db.test_table table."));
// Alter table update a column type throws Exception when column is
partition columns
assertThatThrownBy(
() ->
@@ -718,7 +718,7 @@ public abstract class CatalogTestBase {
.satisfies(
anyCauseMatches(
Catalog.ColumnNotExistException.class,
- "Column [non_existing_col] does not exist in
the test_db.test_table table."));
+ "Column non_existing_col does not exist in the
test_db.test_table table."));
}
@Test
@@ -774,7 +774,7 @@ public abstract class CatalogTestBase {
.satisfies(
anyCauseMatches(
Catalog.ColumnNotExistException.class,
- "Column [non_existing_col] does not exist in
the test_db.test_table table."));
+ "Column non_existing_col does not exist in the
test_db.test_table table."));
// Alter table update a column nullability throws Exception when
column is pk columns
assertThatThrownBy(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 4bd965268..1a175de24 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -65,6 +65,7 @@ import java.util.stream.IntStream;
import static org.apache.paimon.utils.FailingFileIO.retryArtificialException;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link SchemaManager}. */
@@ -527,4 +528,82 @@ public class SchemaManagerTest {
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage("Change 'merge-engine' is not supported yet.");
}
+
+ @Test
+ public void testAddAndDropNestedColumns() throws Exception {
+ RowType innerType =
+ RowType.of(
+ new DataField(4, "f1", DataTypes.INT()),
+ new DataField(5, "f2", DataTypes.BIGINT()));
+ RowType middleType =
+ RowType.of(
+ new DataField(2, "f1", DataTypes.STRING()),
+ new DataField(3, "f2", innerType));
+ RowType outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()), new
DataField(1, "v", middleType));
+
+ Schema schema =
+ new Schema(
+ outerType.getFields(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+ manager.createTable(schema);
+
+ SchemaChange addColumn =
+ SchemaChange.addColumn(
+ Arrays.asList("v", "f2", "f3"),
+ DataTypes.STRING(),
+ "",
+ SchemaChange.Move.after("f3", "f1"));
+ manager.commitChanges(addColumn);
+
+ innerType =
+ RowType.of(
+ new DataField(4, "f1", DataTypes.INT()),
+ new DataField(6, "f3", DataTypes.STRING(), ""),
+ new DataField(5, "f2", DataTypes.BIGINT()));
+ middleType =
+ RowType.of(
+ new DataField(2, "f1", DataTypes.STRING()),
+ new DataField(3, "f2", innerType));
+ outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()), new
DataField(1, "v", middleType));
+
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+
+ assertThatCode(() -> manager.commitChanges(addColumn))
+ .hasMessageContaining("Column v.f2.f3 already exists");
+ SchemaChange middleColumnNotExistAddColumn =
+ SchemaChange.addColumn(
+ Arrays.asList("v", "invalid", "f4"),
DataTypes.STRING(), "", null);
+ assertThatCode(() ->
manager.commitChanges(middleColumnNotExistAddColumn))
+ .hasMessageContaining("Column v.invalid does not exist");
+
+ SchemaChange dropColumn = SchemaChange.dropColumn(Arrays.asList("v",
"f2", "f1"));
+ manager.commitChanges(dropColumn);
+
+ innerType =
+ RowType.of(
+ new DataField(6, "f3", DataTypes.STRING(), ""),
+ new DataField(5, "f2", DataTypes.BIGINT()));
+ middleType =
+ RowType.of(
+ new DataField(2, "f1", DataTypes.STRING()),
+ new DataField(3, "f2", innerType));
+ outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()), new
DataField(1, "v", middleType));
+
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+
+ assertThatCode(() -> manager.commitChanges(dropColumn))
+ .hasMessageContaining("Column v.f2.f1 does not exist");
+ SchemaChange middleColumnNotExistDropColumn =
+ SchemaChange.dropColumn(Arrays.asList("v", "invalid", "f2"));
+ assertThatCode(() ->
manager.commitChanges(middleColumnNotExistDropColumn))
+ .hasMessageContaining("Column v.invalid does not exist");
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index b500da8f1..2ac1d032c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -371,10 +371,9 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
}
} else if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn add = (TableChange.AddColumn) change;
- validateAlterNestedField(add.fieldNames());
SchemaChange.Move move = getMove(add.position(), add.fieldNames());
return SchemaChange.addColumn(
- add.fieldNames()[0],
+ Arrays.asList(add.fieldNames()),
toPaimonType(add.dataType()).copy(add.isNullable()),
add.comment(),
move);
@@ -384,8 +383,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
return SchemaChange.renameColumn(rename.fieldNames()[0],
rename.newName());
} else if (change instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn delete = (TableChange.DeleteColumn)
change;
- validateAlterNestedField(delete.fieldNames());
- return SchemaChange.dropColumn(delete.fieldNames()[0]);
+ return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
} else if (change instanceof TableChange.UpdateColumnType) {
TableChange.UpdateColumnType update =
(TableChange.UpdateColumnType) change;
validateAlterNestedField(update.fieldNames());
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 32c3498a7..b4565447c 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -456,6 +456,11 @@ public class SparkReadITCase extends SparkReadTestBase {
"INSERT INTO paimon.default."
+ tableName
+ " VALUES (2, STRUCT(20, STRUCT('banana', 200)))");
+ assertThat(
+ spark.sql("SELECT v.f2.f1, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
spark.sql(
"INSERT INTO paimon.default."
+ tableName
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 9d958931c..e876a0027 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -23,6 +23,8 @@ import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.List;
@@ -705,4 +707,85 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
","))
.collect(Collectors.toList());
}
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testAddAndDropNestedColumn(String formatType) {
+ String tableName = "testAddNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1:
STRING, f2: INT>>) "
+ + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k',
'file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2,
STRUCT(20, STRUCT('banana', 200)))");
+ assertThat(
+ spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
+ .stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[1,[10,[apple,100]]]",
"[2,[20,[banana,200]]]");
+ assertThat(
+ spark.sql("SELECT v.f2.f1, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
+
+ spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN
v.f3 STRING");
+ spark.sql("ALTER TABLE paimon.default." + tableName + " ADD COLUMN
v.f2.f3 BIGINT");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(11, STRUCT('APPLE', 101, 1001),
'one')), (3, STRUCT(31, STRUCT('CHERRY', 301, 3001), 'three'))");
+ assertThat(
+ spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
+ .stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "[1,[11,[APPLE,101,1001],one]]",
+ "[2,[20,[banana,200,null],null]]",
+ "[3,[31,[CHERRY,301,3001],three]]");
+ assertThat(
+ spark.sql("SELECT v.f2.f2, v.f3, k FROM
paimon.default." + tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[101,one,1]", "[200,null,2]",
"[301,three,3]");
+
+ spark.sql("ALTER TABLE paimon.default." + tableName + " DROP COLUMN
v.f2.f1");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(12, STRUCT(102, 1002), 'one')),
(4, STRUCT(42, STRUCT(402, 4002), 'four'))");
+ assertThat(
+ spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
+ .stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "[1,[12,[102,1002],one]]",
+ "[2,[20,[200,null],null]]",
+ "[3,[31,[301,3001],three]]",
+ "[4,[42,[402,4002],four]]");
+
+ spark.sql(
+ "ALTER TABLE paimon.default."
+ + tableName
+ + " ADD COLUMN v.f2.f1 DECIMAL(5, 2) AFTER f2");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(13, STRUCT(103, 100.03, 1003),
'one')), (5, STRUCT(53, STRUCT(503, 500.03, 5003), 'five'))");
+ assertThat(
+ spark.sql("SELECT * FROM paimon.default." +
tableName).collectAsList()
+ .stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder(
+ "[1,[13,[103,100.03,1003],one]]",
+ "[2,[20,[200,null,null],null]]",
+ "[3,[31,[301,null,3001],three]]",
+ "[4,[42,[402,null,4002],four]]",
+ "[5,[53,[503,500.03,5003],five]]");
+ }
}