This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 60f6611e5e [flink] Support updating row type to another row type in
Flink (#4499)
60f6611e5e is described below
commit 60f6611e5ea07404fb000c94565b9186057ec302
Author: tsreaper <[email protected]>
AuthorDate: Tue Nov 12 22:21:54 2024 +0800
[flink] Support updating row type to another row type in Flink (#4499)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 3 +-
.../org/apache/paimon/schema/SchemaChange.java | 56 +++++-----
.../org/apache/paimon/schema/SchemaManager.java | 22 ++--
.../apache/paimon/schema/SchemaManagerTest.java | 20 ++--
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 8 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 120 ++++++++++++++++++---
.../apache/paimon/flink/SchemaChangeITCase.java | 51 ++++++++-
.../java/org/apache/paimon/spark/SparkCatalog.java | 8 +-
8 files changed, 213 insertions(+), 75 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c2e4afe5d5..a1cf941cda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -48,6 +48,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -560,7 +561,7 @@ public abstract class AbstractCatalog implements Catalog {
for (SchemaChange change : changes) {
if (change instanceof SchemaChange.AddColumn) {
SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn)
change;
- fieldNames.addAll(addColumn.fieldNames());
+ fieldNames.addAll(Arrays.asList(addColumn.fieldNames()));
} else if (change instanceof SchemaChange.RenameColumn) {
SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn)
change;
fieldNames.add(rename.newName());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 1c1d601bce..cefa3c6eb9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -25,8 +25,6 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.Objects;
/**
@@ -54,46 +52,45 @@ public interface SchemaChange extends Serializable {
}
static SchemaChange addColumn(String fieldName, DataType dataType, String
comment) {
- return new AddColumn(Collections.singletonList(fieldName), dataType,
comment, null);
+ return new AddColumn(new String[] {fieldName}, dataType, comment,
null);
}
static SchemaChange addColumn(String fieldName, DataType dataType, String
comment, Move move) {
- return new AddColumn(Collections.singletonList(fieldName), dataType,
comment, move);
+ return new AddColumn(new String[] {fieldName}, dataType, comment,
move);
}
static SchemaChange addColumn(
- List<String> fieldNames, DataType dataType, String comment, Move
move) {
+ String[] fieldNames, DataType dataType, String comment, Move move)
{
return new AddColumn(fieldNames, dataType, comment, move);
}
static SchemaChange renameColumn(String fieldName, String newName) {
- return new RenameColumn(Collections.singletonList(fieldName), newName);
+ return new RenameColumn(new String[] {fieldName}, newName);
}
- static SchemaChange renameColumn(List<String> fieldNames, String newName) {
+ static SchemaChange renameColumn(String[] fieldNames, String newName) {
return new RenameColumn(fieldNames, newName);
}
static SchemaChange dropColumn(String fieldName) {
- return new DropColumn(Collections.singletonList(fieldName));
+ return new DropColumn(new String[] {fieldName});
}
- static SchemaChange dropColumn(List<String> fieldNames) {
+ static SchemaChange dropColumn(String[] fieldNames) {
return new DropColumn(fieldNames);
}
static SchemaChange updateColumnType(String fieldName, DataType
newDataType) {
- return new UpdateColumnType(Collections.singletonList(fieldName),
newDataType, false);
+ return new UpdateColumnType(new String[] {fieldName}, newDataType,
false);
}
static SchemaChange updateColumnType(
String fieldName, DataType newDataType, boolean keepNullability) {
- return new UpdateColumnType(
- Collections.singletonList(fieldName), newDataType,
keepNullability);
+ return new UpdateColumnType(new String[] {fieldName}, newDataType,
keepNullability);
}
static SchemaChange updateColumnType(
- List<String> fieldNames, DataType newDataType, boolean
keepNullability) {
+ String[] fieldNames, DataType newDataType, boolean
keepNullability) {
return new UpdateColumnType(fieldNames, newDataType, keepNullability);
}
@@ -228,20 +225,19 @@ public interface SchemaChange extends Serializable {
private static final long serialVersionUID = 1L;
- private final List<String> fieldNames;
+ private final String[] fieldNames;
private final DataType dataType;
private final String description;
private final Move move;
- private AddColumn(
- List<String> fieldNames, DataType dataType, String
description, Move move) {
+ private AddColumn(String[] fieldNames, DataType dataType, String
description, Move move) {
this.fieldNames = fieldNames;
this.dataType = dataType;
this.description = description;
this.move = move;
}
- public List<String> fieldNames() {
+ public String[] fieldNames() {
return fieldNames;
}
@@ -268,7 +264,7 @@ public interface SchemaChange extends Serializable {
return false;
}
AddColumn addColumn = (AddColumn) o;
- return Objects.equals(fieldNames, addColumn.fieldNames)
+ return Arrays.equals(fieldNames, addColumn.fieldNames)
&& dataType.equals(addColumn.dataType)
&& Objects.equals(description, addColumn.description)
&& move.equals(addColumn.move);
@@ -288,15 +284,15 @@ public interface SchemaChange extends Serializable {
private static final long serialVersionUID = 1L;
- private final List<String> fieldNames;
+ private final String[] fieldNames;
private final String newName;
- private RenameColumn(List<String> fieldNames, String newName) {
+ private RenameColumn(String[] fieldNames, String newName) {
this.fieldNames = fieldNames;
this.newName = newName;
}
- public List<String> fieldNames() {
+ public String[] fieldNames() {
return fieldNames;
}
@@ -313,7 +309,7 @@ public interface SchemaChange extends Serializable {
return false;
}
RenameColumn that = (RenameColumn) o;
- return Objects.equals(fieldNames, that.fieldNames)
+ return Arrays.equals(fieldNames, that.fieldNames)
&& Objects.equals(newName, that.newName);
}
@@ -330,13 +326,13 @@ public interface SchemaChange extends Serializable {
private static final long serialVersionUID = 1L;
- private final List<String> fieldNames;
+ private final String[] fieldNames;
- private DropColumn(List<String> fieldNames) {
+ private DropColumn(String[] fieldNames) {
this.fieldNames = fieldNames;
}
- public List<String> fieldNames() {
+ public String[] fieldNames() {
return fieldNames;
}
@@ -349,7 +345,7 @@ public interface SchemaChange extends Serializable {
return false;
}
DropColumn that = (DropColumn) o;
- return Objects.equals(fieldNames, that.fieldNames);
+ return Arrays.equals(fieldNames, that.fieldNames);
}
@Override
@@ -363,19 +359,19 @@ public interface SchemaChange extends Serializable {
private static final long serialVersionUID = 1L;
- private final List<String> fieldNames;
+ private final String[] fieldNames;
private final DataType newDataType;
// If true, do not change the target field nullability
private final boolean keepNullability;
private UpdateColumnType(
- List<String> fieldNames, DataType newDataType, boolean
keepNullability) {
+ String[] fieldNames, DataType newDataType, boolean
keepNullability) {
this.fieldNames = fieldNames;
this.newDataType = newDataType;
this.keepNullability = keepNullability;
}
- public List<String> fieldNames() {
+ public String[] fieldNames() {
return fieldNames;
}
@@ -396,7 +392,7 @@ public interface SchemaChange extends Serializable {
return false;
}
UpdateColumnType that = (UpdateColumnType) o;
- return Objects.equals(fieldNames, that.fieldNames)
+ return Arrays.equals(fieldNames, that.fieldNames)
&& newDataType.equals(that.newDataType);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 86ed96d5b0..28cc69cf99 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -290,7 +290,7 @@ public class SchemaManager implements Serializable {
DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(),
highestFieldId);
- new
NestedColumnModifier(addColumn.fieldNames().toArray(new String[0])) {
+ new NestedColumnModifier(addColumn.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField>
newFields, String fieldName)
throws Catalog.ColumnAlreadyExistException {
@@ -320,7 +320,7 @@ public class SchemaManager implements Serializable {
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
assertNotUpdatingPrimaryKeys(oldTableSchema,
rename.fieldNames(), "rename");
- new NestedColumnModifier(rename.fieldNames().toArray(new
String[0])) {
+ new NestedColumnModifier(rename.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField>
newFields, String fieldName)
throws Catalog.ColumnNotExistException,
@@ -347,7 +347,7 @@ public class SchemaManager implements Serializable {
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
dropColumnValidation(oldTableSchema, drop);
- new NestedColumnModifier(drop.fieldNames().toArray(new
String[0])) {
+ new NestedColumnModifier(drop.fieldNames()) {
@Override
protected void updateLastColumn(List<DataField>
newFields, String fieldName)
throws Catalog.ColumnNotExistException {
@@ -364,7 +364,7 @@ public class SchemaManager implements Serializable {
assertNotUpdatingPrimaryKeys(oldTableSchema,
update.fieldNames(), "update");
updateNestedColumn(
newFields,
- update.fieldNames().toArray(new String[0]),
+ update.fieldNames(),
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
@@ -558,8 +558,8 @@ public class SchemaManager implements Serializable {
Map<String, String> columnNames = Maps.newHashMap();
for (RenameColumn renameColumn : renames) {
- if (renameColumn.fieldNames().size() == 1) {
- columnNames.put(renameColumn.fieldNames().get(0),
renameColumn.newName());
+ if (renameColumn.fieldNames().length == 1) {
+ columnNames.put(renameColumn.fieldNames()[0],
renameColumn.newName());
}
}
@@ -571,10 +571,10 @@ public class SchemaManager implements Serializable {
private static void dropColumnValidation(TableSchema schema, DropColumn
change) {
// primary keys and partition keys can't be nested columns
- if (change.fieldNames().size() > 1) {
+ if (change.fieldNames().length > 1) {
return;
}
- String columnToDrop = change.fieldNames().get(0);
+ String columnToDrop = change.fieldNames()[0];
if (schema.partitionKeys().contains(columnToDrop)
|| schema.primaryKeys().contains(columnToDrop)) {
throw new UnsupportedOperationException(
@@ -583,12 +583,12 @@ public class SchemaManager implements Serializable {
}
private static void assertNotUpdatingPrimaryKeys(
- TableSchema schema, List<String> fieldNames, String operation) {
+ TableSchema schema, String[] fieldNames, String operation) {
// partition keys can't be nested columns
- if (fieldNames.size() > 1) {
+ if (fieldNames.length > 1) {
return;
}
- String columnToRename = fieldNames.get(0);
+ String columnToRename = fieldNames[0];
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
String.format(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index ac8d4cd91e..088cb72f92 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -555,7 +555,7 @@ public class SchemaManagerTest {
SchemaChange addColumn =
SchemaChange.addColumn(
- Arrays.asList("v", "f2", "f3"),
+ new String[] {"v", "f2", "f3"},
DataTypes.STRING(),
"",
SchemaChange.Move.after("f3", "f1"));
@@ -579,11 +579,11 @@ public class SchemaManagerTest {
.hasMessageContaining("Column v.f2.f3 already exists");
SchemaChange middleColumnNotExistAddColumn =
SchemaChange.addColumn(
- Arrays.asList("v", "invalid", "f4"),
DataTypes.STRING(), "", null);
+ new String[] {"v", "invalid", "f4"},
DataTypes.STRING(), "", null);
assertThatCode(() ->
manager.commitChanges(middleColumnNotExistAddColumn))
.hasMessageContaining("Column v.invalid does not exist");
- SchemaChange dropColumn = SchemaChange.dropColumn(Arrays.asList("v",
"f2", "f1"));
+ SchemaChange dropColumn = SchemaChange.dropColumn(new String[] {"v",
"f2", "f1"});
manager.commitChanges(dropColumn);
innerType =
@@ -602,7 +602,7 @@ public class SchemaManagerTest {
assertThatCode(() -> manager.commitChanges(dropColumn))
.hasMessageContaining("Column v.f2.f1 does not exist");
SchemaChange middleColumnNotExistDropColumn =
- SchemaChange.dropColumn(Arrays.asList("v", "invalid", "f2"));
+ SchemaChange.dropColumn(new String[] {"v", "invalid", "f2"});
assertThatCode(() ->
manager.commitChanges(middleColumnNotExistDropColumn))
.hasMessageContaining("Column v.invalid does not exist");
}
@@ -632,7 +632,7 @@ public class SchemaManagerTest {
manager.createTable(schema);
SchemaChange renameColumn =
- SchemaChange.renameColumn(Arrays.asList("v", "f2", "f1"),
"f100");
+ SchemaChange.renameColumn(new String[] {"v", "f2", "f1"},
"f100");
manager.commitChanges(renameColumn);
innerType =
@@ -649,17 +649,17 @@ public class SchemaManagerTest {
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
SchemaChange middleColumnNotExistRenameColumn =
- SchemaChange.renameColumn(Arrays.asList("v", "invalid", "f2"),
"f200");
+ SchemaChange.renameColumn(new String[] {"v", "invalid", "f2"},
"f200");
assertThatCode(() ->
manager.commitChanges(middleColumnNotExistRenameColumn))
.hasMessageContaining("Column v.invalid does not exist");
SchemaChange lastColumnNotExistRenameColumn =
- SchemaChange.renameColumn(Arrays.asList("v", "f2", "invalid"),
"new_invalid");
+ SchemaChange.renameColumn(new String[] {"v", "f2", "invalid"},
"new_invalid");
assertThatCode(() ->
manager.commitChanges(lastColumnNotExistRenameColumn))
.hasMessageContaining("Column v.f2.invalid does not exist");
SchemaChange newNameAlreadyExistRenameColumn =
- SchemaChange.renameColumn(Arrays.asList("v", "f2", "f2"),
"f100");
+ SchemaChange.renameColumn(new String[] {"v", "f2", "f2"},
"f100");
assertThatCode(() ->
manager.commitChanges(newNameAlreadyExistRenameColumn))
.hasMessageContaining("Column v.f2.f100 already exists");
}
@@ -690,7 +690,7 @@ public class SchemaManagerTest {
SchemaChange updateColumnType =
SchemaChange.updateColumnType(
- Arrays.asList("v", "f2", "f1"), DataTypes.BIGINT(),
true);
+ new String[] {"v", "f2", "f1"}, DataTypes.BIGINT(),
true);
manager.commitChanges(updateColumnType);
innerType =
@@ -708,7 +708,7 @@ public class SchemaManagerTest {
SchemaChange middleColumnNotExistUpdateColumnType =
SchemaChange.updateColumnType(
- Arrays.asList("v", "invalid", "f1"),
DataTypes.BIGINT(), true);
+ new String[] {"v", "invalid", "f1"},
DataTypes.BIGINT(), true);
assertThatCode(() ->
manager.commitChanges(middleColumnNotExistUpdateColumnType))
.hasMessageContaining("Column v.invalid does not exist");
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 0e93fdb073..c2e928bd4a 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -101,7 +101,7 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
SchemaChange.UpdateColumnType updateColumnType =
(SchemaChange.UpdateColumnType) schemaChange;
Preconditions.checkState(
- updateColumnType.fieldNames().size() == 1,
+ updateColumnType.fieldNames().length == 1,
"Paimon CDC currently does not support nested type schema
evolution.");
TableSchema schema =
schemaManager
@@ -110,11 +110,11 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
() ->
new RuntimeException(
"Table does not exist.
This is unexpected."));
- int idx =
schema.fieldNames().indexOf(updateColumnType.fieldNames().get(0));
+ int idx =
schema.fieldNames().indexOf(updateColumnType.fieldNames()[0]);
Preconditions.checkState(
idx >= 0,
"Field name "
- + updateColumnType.fieldNames().get(0)
+ + updateColumnType.fieldNames()[0]
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
@@ -126,7 +126,7 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
throw new UnsupportedOperationException(
String.format(
"Cannot convert field %s from type %s to
%s of Paimon table %s.",
- updateColumnType.fieldNames().get(0),
+ updateColumnType.fieldNames()[0],
oldType,
newType,
identifier.getFullName()));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index cae6e6f0e3..ae30fa569d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -37,6 +37,8 @@ import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
@@ -98,7 +100,6 @@ import
org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,11 +111,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -595,17 +598,12 @@ public class FlinkCatalog extends AbstractCatalog {
if (!oldTableNonPhysicalColumnIndex.containsKey(
((ModifyPhysicalColumnType)
change).getOldColumn().getName())) {
ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType)
change;
- LogicalType newColumnType =
modify.getNewType().getLogicalType();
- LogicalType oldColumnType =
modify.getOldColumn().getDataType().getLogicalType();
- if (newColumnType.isNullable() != oldColumnType.isNullable()) {
- schemaChanges.add(
- SchemaChange.updateColumnNullability(
- modify.getNewColumn().getName(),
newColumnType.isNullable()));
- }
- schemaChanges.add(
- SchemaChange.updateColumnType(
- modify.getOldColumn().getName(),
-
LogicalTypeConversion.toDataType(newColumnType)));
+ generateNestedColumnUpdates(
+
Collections.singletonList(modify.getOldColumn().getName()),
+ LogicalTypeConversion.toDataType(
+
modify.getOldColumn().getDataType().getLogicalType()),
+
LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()),
+ schemaChanges);
}
return schemaChanges;
} else if (change instanceof ModifyColumnPosition) {
@@ -670,6 +668,104 @@ public class FlinkCatalog extends AbstractCatalog {
throw new UnsupportedOperationException("Change is not supported: " +
change.getClass());
}
+ private void generateNestedColumnUpdates(
+ List<String> fieldNames,
+ org.apache.paimon.types.DataType oldType,
+ org.apache.paimon.types.DataType newType,
+ List<SchemaChange> schemaChanges) {
+ if (oldType.getTypeRoot() == DataTypeRoot.ROW) {
+ Preconditions.checkArgument(
+ newType.getTypeRoot() == DataTypeRoot.ROW,
+ "Column "
+ + String.join(".", fieldNames)
+ + " can only be updated to row type, and cannot be
updated to "
+ + newType
+ + " type");
+ org.apache.paimon.types.RowType oldRowType =
(org.apache.paimon.types.RowType) oldType;
+ org.apache.paimon.types.RowType newRowType =
(org.apache.paimon.types.RowType) newType;
+
+ // check that existing fields have same order
+ Map<String, Integer> oldFieldOrders = new HashMap<>();
+ for (int i = 0; i < oldRowType.getFieldCount(); i++) {
+ oldFieldOrders.put(oldRowType.getFields().get(i).name(), i);
+ }
+ int lastIdx = -1;
+ String lastFieldName = "";
+ for (DataField newField : newRowType.getFields()) {
+ String name = newField.name();
+ if (oldFieldOrders.containsKey(name)) {
+ int idx = oldFieldOrders.get(name);
+ Preconditions.checkState(
+ lastIdx < idx,
+ "Order of existing fields in column %s must be
kept the same. "
+ + "However, field %s and %s have changed
their orders.",
+ String.join(".", fieldNames),
+ lastFieldName,
+ name);
+ lastIdx = idx;
+ lastFieldName = name;
+ }
+ }
+
+ // drop fields
+ Set<String> newFieldNames = new
HashSet<>(newRowType.getFieldNames());
+ for (String name : oldRowType.getFieldNames()) {
+ if (!newFieldNames.contains(name)) {
+ List<String> dropColumnNames = new ArrayList<>(fieldNames);
+ dropColumnNames.add(name);
+ schemaChanges.add(
+
SchemaChange.dropColumn(dropColumnNames.toArray(new String[0])));
+ }
+ }
+
+ for (int i = 0; i < newRowType.getFieldCount(); i++) {
+ DataField field = newRowType.getFields().get(i);
+ String name = field.name();
+ List<String> fullFieldNames = new ArrayList<>(fieldNames);
+ fullFieldNames.add(name);
+ if (!oldFieldOrders.containsKey(name)) {
+ // add fields
+ SchemaChange.Move move;
+ if (i == 0) {
+ move = SchemaChange.Move.first(name);
+ } else {
+ String lastName = newRowType.getFields().get(i -
1).name();
+ move = SchemaChange.Move.after(name, lastName);
+ }
+ schemaChanges.add(
+ SchemaChange.addColumn(
+ fullFieldNames.toArray(new String[0]),
+ field.type(),
+ field.description(),
+ move));
+ } else {
+ // update existing fields
+ DataField oldField =
oldRowType.getFields().get(oldFieldOrders.get(name));
+ if (!Objects.equals(oldField.description(),
field.description())) {
+ schemaChanges.add(
+ SchemaChange.updateColumnComment(
+ fullFieldNames.toArray(new String[0]),
+ field.description()));
+ }
+ generateNestedColumnUpdates(
+ fullFieldNames, oldField.type(), field.type(),
schemaChanges);
+ }
+ }
+ } else {
+ if (!oldType.equalsIgnoreNullable(newType)) {
+ schemaChanges.add(
+ SchemaChange.updateColumnType(
+ fieldNames.toArray(new String[0]), newType,
false));
+ }
+ }
+
+ if (oldType.isNullable() != newType.isNullable()) {
+ schemaChanges.add(
+ SchemaChange.updateColumnNullability(
+ fieldNames.toArray(new String[0]),
newType.isNullable()));
+ }
+ }
+
/**
* Try handle change related to materialized table.
*
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 08f79efccb..ba161fe840 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.format.DateTimeFormatter;
import java.util.List;
@@ -35,6 +37,7 @@ import java.util.stream.Collectors;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for schema changes. */
@@ -1015,7 +1018,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase
{
sql("INSERT INTO T1 VALUES ('a', 'b', 'l')");
sql("INSERT INTO T1 VALUES ('a', 'd', 'n')");
sql("INSERT INTO T1 VALUES ('a', 'e', 'm')");
- List<Row> sql = sql("select * from T1");
assertThat(sql("select * from T1").toString()).isEqualTo("[+I[a, d,
n]]");
// test for get small record
@@ -1024,7 +1026,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase
{
sql("INSERT INTO T2 VALUES ('a', 'b', 1)");
sql("INSERT INTO T2 VALUES ('a', 'd', 3)");
sql("INSERT INTO T2 VALUES ('a', 'e', 2)");
- sql = sql("select * from T2");
assertThat(sql("select * from T2").toString()).isEqualTo("[+I[a, b,
1]]");
// test for get largest record
@@ -1033,7 +1034,6 @@ public class SchemaChangeITCase extends CatalogITCaseBase
{
sql("INSERT INTO T3 VALUES ('a', 'b', 1.0)");
sql("INSERT INTO T3 VALUES ('a', 'd', 3.0)");
sql("INSERT INTO T3 VALUES ('a', 'e', 2.0)");
- sql = sql("select * from T3");
assertThat(sql("select * from T3").toString()).isEqualTo("[+I[a, d,
3.0]]");
}
@@ -1089,4 +1089,49 @@ public class SchemaChangeITCase extends
CatalogITCaseBase {
UnsupportedOperationException.class,
"Cannot change bucket to -1."));
}
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateNestedColumn(String formatType) {
+ sql(
+ "CREATE TABLE T "
+ + "( k INT, v ROW(f1 INT, f2 ROW(f1 STRING, f2 INT NOT
NULL)), PRIMARY KEY (k) NOT ENFORCED ) "
+ + "WITH ( 'bucket' = '1', 'file.format' = '"
+ + formatType
+ + "' )");
+ sql(
+ "INSERT INTO T VALUES (1, ROW(10, ROW('apple', 100))), (2,
ROW(20, ROW('banana', 200)))");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, Row.of(10, Row.of("apple", 100))),
+ Row.of(2, Row.of(20, Row.of("banana", 200))));
+
+ sql("ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2 ROW(f3 DOUBLE, f2 INT),
f3 STRING))");
+ sql(
+ "INSERT INTO T VALUES "
+ + "(1, ROW(1000000000001, ROW(101.0, 101), 'cat')), "
+ + "(3, ROW(3000000000001, ROW(301.0, CAST(NULL AS
INT)), 'dog'))");
+ assertThat(sql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, Row.of(1000000000001L, Row.of(101.0, 101),
"cat")),
+ Row.of(2, Row.of(20L, Row.of(null, 200), null)),
+ Row.of(3, Row.of(3000000000001L, Row.of(301.0, null),
"dog")));
+
+ sql(
+ "ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2 ROW(f3 DOUBLE, f1
STRING, f2 INT), f3 STRING))");
+ sql(
+ "INSERT INTO T VALUES "
+ + "(1, ROW(1000000000002, ROW(102.0, 'APPLE', 102),
'cat')), "
+ + "(4, ROW(4000000000002, ROW(402.0, 'LEMON', 402),
'tiger'))");
+ assertThat(sql("SELECT k, v.f2.f1, v.f3 FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, "APPLE", "cat"),
+ Row.of(2, null, null),
+ Row.of(3, null, "dog"),
+ Row.of(4, "LEMON", "tiger"));
+
+ assertThatCode(() -> sql("ALTER TABLE T MODIFY (v ROW(f1 BIGINT, f2
INT, f3 STRING))"))
+ .hasRootCauseMessage(
+ "Column v.f2 can only be updated to row type, and
cannot be updated to INT type");
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 5fde2c5659..89448c1f43 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -373,20 +373,20 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
TableChange.AddColumn add = (TableChange.AddColumn) change;
SchemaChange.Move move = getMove(add.position(), add.fieldNames());
return SchemaChange.addColumn(
- Arrays.asList(add.fieldNames()),
+ add.fieldNames(),
toPaimonType(add.dataType()).copy(add.isNullable()),
add.comment(),
move);
} else if (change instanceof TableChange.RenameColumn) {
TableChange.RenameColumn rename = (TableChange.RenameColumn)
change;
- return
SchemaChange.renameColumn(Arrays.asList(rename.fieldNames()), rename.newName());
+ return SchemaChange.renameColumn(rename.fieldNames(),
rename.newName());
} else if (change instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn delete = (TableChange.DeleteColumn)
change;
- return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
+ return SchemaChange.dropColumn(delete.fieldNames());
} else if (change instanceof TableChange.UpdateColumnType) {
TableChange.UpdateColumnType update =
(TableChange.UpdateColumnType) change;
return SchemaChange.updateColumnType(
- Arrays.asList(update.fieldNames()),
toPaimonType(update.newDataType()), true);
+ update.fieldNames(), toPaimonType(update.newDataType()),
true);
} else if (change instanceof TableChange.UpdateColumnNullability) {
TableChange.UpdateColumnNullability update =
(TableChange.UpdateColumnNullability) change;