This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.1 by this push:
new 9e715f195a [core] Extract generateTableSchema method to optimize code
structure (#5482)
9e715f195a is described below
commit 9e715f195a00e2219fd7561f4e1fc35106b074e2
Author: Xiaohu <[email protected]>
AuthorDate: Thu Apr 17 13:20:53 2025 +0800
[core] Extract generateTableSchema method to optimize code structure (#5482)
---
.../org/apache/paimon/schema/SchemaManager.java | 368 +++++++++++----------
1 file changed, 185 insertions(+), 183 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index abcc21bb11..351be613ad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -283,201 +283,203 @@ public class SchemaManager implements Serializable {
new Catalog.TableNotExistException(
identifierFromPath(
tableRoot.toString(), true, branch)));
- Map<String, String> oldOptions = new
HashMap<>(oldTableSchema.options());
- Map<String, String> newOptions = new
HashMap<>(oldTableSchema.options());
- List<DataField> newFields = new
ArrayList<>(oldTableSchema.fields());
- AtomicInteger highestFieldId = new
AtomicInteger(oldTableSchema.highestFieldId());
- String newComment = oldTableSchema.comment();
- for (SchemaChange change : changes) {
- if (change instanceof SetOption) {
- SetOption setOption = (SetOption) change;
- if (hasSnapshots.get()) {
- checkAlterTableOption(
- setOption.key(),
- oldOptions.get(setOption.key()),
- setOption.value(),
- false);
- }
- newOptions.put(setOption.key(), setOption.value());
- } else if (change instanceof RemoveOption) {
- RemoveOption removeOption = (RemoveOption) change;
- if (hasSnapshots.get()) {
- checkResetTableOption(removeOption.key());
- }
- newOptions.remove(removeOption.key());
- } else if (change instanceof UpdateComment) {
- UpdateComment updateComment = (UpdateComment) change;
- newComment = updateComment.comment();
- } else if (change instanceof AddColumn) {
- AddColumn addColumn = (AddColumn) change;
- SchemaChange.Move move = addColumn.move();
- Preconditions.checkArgument(
- addColumn.dataType().isNullable(),
- "Column %s cannot specify NOT NULL in the %s
table.",
- String.join(".", addColumn.fieldNames()),
- identifierFromPath(tableRoot.toString(), true,
branch).getFullName());
- int id = highestFieldId.incrementAndGet();
- DataType dataType =
- ReassignFieldId.reassign(addColumn.dataType(),
highestFieldId);
-
- new NestedColumnModifier(addColumn.fieldNames()) {
- @Override
- protected void updateLastColumn(List<DataField>
newFields, String fieldName)
- throws Catalog.ColumnAlreadyExistException {
- assertColumnNotExists(newFields, fieldName);
-
- DataField dataField =
- new DataField(id, fieldName, dataType,
addColumn.description());
-
- // key: name ; value : index
- Map<String, Integer> map = new HashMap<>();
- for (int i = 0; i < newFields.size(); i++) {
- map.put(newFields.get(i).name(), i);
- }
+ TableSchema newTableSchema = generateTableSchema(oldTableSchema,
changes, hasSnapshots);
+ try {
+ boolean success = commit(newTableSchema);
+ if (success) {
+ return newTableSchema;
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
- if (null != move) {
- if
(move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
- newFields.add(0, dataField);
- } else if
(move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
- int fieldIndex =
map.get(move.referenceFieldName());
- newFields.add(fieldIndex + 1, dataField);
- }
- } else {
- newFields.add(dataField);
- }
- }
- }.updateIntermediateColumn(newFields, 0);
- } else if (change instanceof RenameColumn) {
- RenameColumn rename = (RenameColumn) change;
- assertNotUpdatingPrimaryKeys(oldTableSchema,
rename.fieldNames(), "rename");
- new NestedColumnModifier(rename.fieldNames()) {
- @Override
- protected void updateLastColumn(List<DataField>
newFields, String fieldName)
- throws Catalog.ColumnNotExistException,
- Catalog.ColumnAlreadyExistException {
- assertColumnExists(newFields, fieldName);
- assertColumnNotExists(newFields, rename.newName());
- for (int i = 0; i < newFields.size(); i++) {
- DataField field = newFields.get(i);
- if (!field.name().equals(fieldName)) {
- continue;
- }
-
- DataField newField =
- new DataField(
- field.id(),
- rename.newName(),
- field.type(),
- field.description());
- newFields.set(i, newField);
- return;
- }
+ public TableSchema generateTableSchema(
+ TableSchema oldTableSchema, List<SchemaChange> changes,
LazyField<Boolean> hasSnapshots)
+ throws Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
+ Map<String, String> oldOptions = new
HashMap<>(oldTableSchema.options());
+ Map<String, String> newOptions = new
HashMap<>(oldTableSchema.options());
+ List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
+ AtomicInteger highestFieldId = new
AtomicInteger(oldTableSchema.highestFieldId());
+ String newComment = oldTableSchema.comment();
+ for (SchemaChange change : changes) {
+ if (change instanceof SetOption) {
+ SetOption setOption = (SetOption) change;
+ if (hasSnapshots.get()) {
+ checkAlterTableOption(
+ setOption.key(),
+ oldOptions.get(setOption.key()),
+ setOption.value(),
+ false);
+ }
+ newOptions.put(setOption.key(), setOption.value());
+ } else if (change instanceof RemoveOption) {
+ RemoveOption removeOption = (RemoveOption) change;
+ if (hasSnapshots.get()) {
+ checkResetTableOption(removeOption.key());
+ }
+ newOptions.remove(removeOption.key());
+ } else if (change instanceof UpdateComment) {
+ UpdateComment updateComment = (UpdateComment) change;
+ newComment = updateComment.comment();
+ } else if (change instanceof AddColumn) {
+ AddColumn addColumn = (AddColumn) change;
+ SchemaChange.Move move = addColumn.move();
+ Preconditions.checkArgument(
+ addColumn.dataType().isNullable(),
+ "Column %s cannot specify NOT NULL in the %s table.",
+ String.join(".", addColumn.fieldNames()),
+ identifierFromPath(tableRoot.toString(), true,
branch).getFullName());
+ int id = highestFieldId.incrementAndGet();
+ DataType dataType =
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
+
+ new NestedColumnModifier(addColumn.fieldNames()) {
+ @Override
+ protected void updateLastColumn(List<DataField> newFields,
String fieldName)
+ throws Catalog.ColumnAlreadyExistException {
+ assertColumnNotExists(newFields, fieldName);
+
+ DataField dataField =
+ new DataField(id, fieldName, dataType,
addColumn.description());
+
+ // key: name ; value : index
+ Map<String, Integer> map = new HashMap<>();
+ for (int i = 0; i < newFields.size(); i++) {
+ map.put(newFields.get(i).name(), i);
}
- }.updateIntermediateColumn(newFields, 0);
- } else if (change instanceof DropColumn) {
- DropColumn drop = (DropColumn) change;
- dropColumnValidation(oldTableSchema, drop);
- new NestedColumnModifier(drop.fieldNames()) {
- @Override
- protected void updateLastColumn(List<DataField>
newFields, String fieldName)
- throws Catalog.ColumnNotExistException {
- assertColumnExists(newFields, fieldName);
- newFields.removeIf(f ->
f.name().equals(fieldName));
- if (newFields.isEmpty()) {
- throw new IllegalArgumentException(
- "Cannot drop all fields in table");
+
+ if (null != move) {
+ if
(move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+ newFields.add(0, dataField);
+ } else if
(move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+ int fieldIndex =
map.get(move.referenceFieldName());
+ newFields.add(fieldIndex + 1, dataField);
}
+ } else {
+ newFields.add(dataField);
}
- }.updateIntermediateColumn(newFields, 0);
- } else if (change instanceof UpdateColumnType) {
- UpdateColumnType update = (UpdateColumnType) change;
- assertNotUpdatingPrimaryKeys(oldTableSchema,
update.fieldNames(), "update");
- updateNestedColumn(
- newFields,
- update.fieldNames(),
- (field) -> {
- DataType targetType = update.newDataType();
- if (update.keepNullability()) {
- targetType =
targetType.copy(field.type().isNullable());
- }
- checkState(
-
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
- &&
CastExecutors.resolve(field.type(), targetType)
- != null,
- String.format(
- "Column type %s[%s] cannot be
converted to %s without loosing information.",
- field.name(), field.type(),
targetType));
- return new DataField(
- field.id(), field.name(), targetType,
field.description());
- });
- } else if (change instanceof UpdateColumnNullability) {
- UpdateColumnNullability update = (UpdateColumnNullability)
change;
- if (update.fieldNames().length == 1
- && update.newNullability()
- &&
oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
- throw new UnsupportedOperationException(
- "Cannot change nullability of primary key");
}
- updateNestedColumn(
- newFields,
- update.fieldNames(),
- (field) ->
- new DataField(
- field.id(),
- field.name(),
-
field.type().copy(update.newNullability()),
- field.description()));
- } else if (change instanceof UpdateColumnComment) {
- UpdateColumnComment update = (UpdateColumnComment) change;
- updateNestedColumn(
- newFields,
- update.fieldNames(),
- (field) ->
+ }.updateIntermediateColumn(newFields, 0);
+ } else if (change instanceof RenameColumn) {
+ RenameColumn rename = (RenameColumn) change;
+ assertNotUpdatingPrimaryKeys(oldTableSchema,
rename.fieldNames(), "rename");
+ new NestedColumnModifier(rename.fieldNames()) {
+ @Override
+ protected void updateLastColumn(List<DataField> newFields,
String fieldName)
+ throws Catalog.ColumnNotExistException,
+ Catalog.ColumnAlreadyExistException {
+ assertColumnExists(newFields, fieldName);
+ assertColumnNotExists(newFields, rename.newName());
+ for (int i = 0; i < newFields.size(); i++) {
+ DataField field = newFields.get(i);
+ if (!field.name().equals(fieldName)) {
+ continue;
+ }
+
+ DataField newField =
new DataField(
field.id(),
- field.name(),
+ rename.newName(),
field.type(),
- update.newDescription()));
- } else if (change instanceof UpdateColumnPosition) {
- UpdateColumnPosition update = (UpdateColumnPosition)
change;
- SchemaChange.Move move = update.move();
- applyMove(newFields, move);
- } else {
+ field.description());
+ newFields.set(i, newField);
+ return;
+ }
+ }
+ }.updateIntermediateColumn(newFields, 0);
+ } else if (change instanceof DropColumn) {
+ DropColumn drop = (DropColumn) change;
+ dropColumnValidation(oldTableSchema, drop);
+ new NestedColumnModifier(drop.fieldNames()) {
+ @Override
+ protected void updateLastColumn(List<DataField> newFields,
String fieldName)
+ throws Catalog.ColumnNotExistException {
+ assertColumnExists(newFields, fieldName);
+ newFields.removeIf(f -> f.name().equals(fieldName));
+ if (newFields.isEmpty()) {
+ throw new IllegalArgumentException("Cannot drop
all fields in table");
+ }
+ }
+ }.updateIntermediateColumn(newFields, 0);
+ } else if (change instanceof UpdateColumnType) {
+ UpdateColumnType update = (UpdateColumnType) change;
+ assertNotUpdatingPrimaryKeys(oldTableSchema,
update.fieldNames(), "update");
+ updateNestedColumn(
+ newFields,
+ update.fieldNames(),
+ (field) -> {
+ DataType targetType = update.newDataType();
+ if (update.keepNullability()) {
+ targetType =
targetType.copy(field.type().isNullable());
+ }
+ checkState(
+
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
+ &&
CastExecutors.resolve(field.type(), targetType)
+ != null,
+ String.format(
+ "Column type %s[%s] cannot be
converted to %s without loosing information.",
+ field.name(), field.type(),
targetType));
+ return new DataField(
+ field.id(), field.name(), targetType,
field.description());
+ });
+ } else if (change instanceof UpdateColumnNullability) {
+ UpdateColumnNullability update = (UpdateColumnNullability)
change;
+ if (update.fieldNames().length == 1
+ && update.newNullability()
+ &&
oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
throw new UnsupportedOperationException(
- "Unsupported change: " + change.getClass());
- }
- }
-
- // We change TableSchema to Schema, because we want to deal with
primary-key and
- // partition in options.
- Schema newSchema =
- new Schema(
- newFields,
- oldTableSchema.partitionKeys(),
- applyNotNestedColumnRename(
- oldTableSchema.primaryKeys(),
- Iterables.filter(changes,
RenameColumn.class)),
- applySchemaChanges(newOptions, changes),
- newComment);
- TableSchema newTableSchema =
- new TableSchema(
- oldTableSchema.id() + 1,
- newSchema.fields(),
- highestFieldId.get(),
- newSchema.partitionKeys(),
- newSchema.primaryKeys(),
- newSchema.options(),
- newSchema.comment());
-
- try {
- boolean success = commit(newTableSchema);
- if (success) {
- return newTableSchema;
+ "Cannot change nullability of primary key");
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+ updateNestedColumn(
+ newFields,
+ update.fieldNames(),
+ (field) ->
+ new DataField(
+ field.id(),
+ field.name(),
+
field.type().copy(update.newNullability()),
+ field.description()));
+ } else if (change instanceof UpdateColumnComment) {
+ UpdateColumnComment update = (UpdateColumnComment) change;
+ updateNestedColumn(
+ newFields,
+ update.fieldNames(),
+ (field) ->
+ new DataField(
+ field.id(),
+ field.name(),
+ field.type(),
+ update.newDescription()));
+ } else if (change instanceof UpdateColumnPosition) {
+ UpdateColumnPosition update = (UpdateColumnPosition) change;
+ SchemaChange.Move move = update.move();
+ applyMove(newFields, move);
+ } else {
+ throw new UnsupportedOperationException("Unsupported change: "
+ change.getClass());
}
}
+
+ // We change TableSchema to Schema, because we want to deal with
primary-key and
+ // partition in options.
+ Schema newSchema =
+ new Schema(
+ newFields,
+ oldTableSchema.partitionKeys(),
+ applyNotNestedColumnRename(
+ oldTableSchema.primaryKeys(),
+ Iterables.filter(changes, RenameColumn.class)),
+ applySchemaChanges(newOptions, changes),
+ newComment);
+
+ return new TableSchema(
+ oldTableSchema.id() + 1,
+ newSchema.fields(),
+ highestFieldId.get(),
+ newSchema.partitionKeys(),
+ newSchema.primaryKeys(),
+ newSchema.options(),
+ newSchema.comment());
}
public void applyMove(List<DataField> newFields, SchemaChange.Move move) {