This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.1 by this push:
     new 9e715f195a [core] Extract generateTableSchema method to optimize code 
structure (#5482)
9e715f195a is described below

commit 9e715f195a00e2219fd7561f4e1fc35106b074e2
Author: Xiaohu <[email protected]>
AuthorDate: Thu Apr 17 13:20:53 2025 +0800

    [core] Extract generateTableSchema method to optimize code structure (#5482)
---
 .../org/apache/paimon/schema/SchemaManager.java    | 368 +++++++++++----------
 1 file changed, 185 insertions(+), 183 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index abcc21bb11..351be613ad 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -283,201 +283,203 @@ public class SchemaManager implements Serializable {
                                             new Catalog.TableNotExistException(
                                                     identifierFromPath(
                                                             
tableRoot.toString(), true, branch)));
-            Map<String, String> oldOptions = new 
HashMap<>(oldTableSchema.options());
-            Map<String, String> newOptions = new 
HashMap<>(oldTableSchema.options());
-            List<DataField> newFields = new 
ArrayList<>(oldTableSchema.fields());
-            AtomicInteger highestFieldId = new 
AtomicInteger(oldTableSchema.highestFieldId());
-            String newComment = oldTableSchema.comment();
-            for (SchemaChange change : changes) {
-                if (change instanceof SetOption) {
-                    SetOption setOption = (SetOption) change;
-                    if (hasSnapshots.get()) {
-                        checkAlterTableOption(
-                                setOption.key(),
-                                oldOptions.get(setOption.key()),
-                                setOption.value(),
-                                false);
-                    }
-                    newOptions.put(setOption.key(), setOption.value());
-                } else if (change instanceof RemoveOption) {
-                    RemoveOption removeOption = (RemoveOption) change;
-                    if (hasSnapshots.get()) {
-                        checkResetTableOption(removeOption.key());
-                    }
-                    newOptions.remove(removeOption.key());
-                } else if (change instanceof UpdateComment) {
-                    UpdateComment updateComment = (UpdateComment) change;
-                    newComment = updateComment.comment();
-                } else if (change instanceof AddColumn) {
-                    AddColumn addColumn = (AddColumn) change;
-                    SchemaChange.Move move = addColumn.move();
-                    Preconditions.checkArgument(
-                            addColumn.dataType().isNullable(),
-                            "Column %s cannot specify NOT NULL in the %s 
table.",
-                            String.join(".", addColumn.fieldNames()),
-                            identifierFromPath(tableRoot.toString(), true, 
branch).getFullName());
-                    int id = highestFieldId.incrementAndGet();
-                    DataType dataType =
-                            ReassignFieldId.reassign(addColumn.dataType(), 
highestFieldId);
-
-                    new NestedColumnModifier(addColumn.fieldNames()) {
-                        @Override
-                        protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
-                                throws Catalog.ColumnAlreadyExistException {
-                            assertColumnNotExists(newFields, fieldName);
-
-                            DataField dataField =
-                                    new DataField(id, fieldName, dataType, 
addColumn.description());
-
-                            // key: name ; value : index
-                            Map<String, Integer> map = new HashMap<>();
-                            for (int i = 0; i < newFields.size(); i++) {
-                                map.put(newFields.get(i).name(), i);
-                            }
+            TableSchema newTableSchema = generateTableSchema(oldTableSchema, 
changes, hasSnapshots);
+            try {
+                boolean success = commit(newTableSchema);
+                if (success) {
+                    return newTableSchema;
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 
-                            if (null != move) {
-                                if 
(move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
-                                    newFields.add(0, dataField);
-                                } else if 
(move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
-                                    int fieldIndex = 
map.get(move.referenceFieldName());
-                                    newFields.add(fieldIndex + 1, dataField);
-                                }
-                            } else {
-                                newFields.add(dataField);
-                            }
-                        }
-                    }.updateIntermediateColumn(newFields, 0);
-                } else if (change instanceof RenameColumn) {
-                    RenameColumn rename = (RenameColumn) change;
-                    assertNotUpdatingPrimaryKeys(oldTableSchema, 
rename.fieldNames(), "rename");
-                    new NestedColumnModifier(rename.fieldNames()) {
-                        @Override
-                        protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
-                                throws Catalog.ColumnNotExistException,
-                                        Catalog.ColumnAlreadyExistException {
-                            assertColumnExists(newFields, fieldName);
-                            assertColumnNotExists(newFields, rename.newName());
-                            for (int i = 0; i < newFields.size(); i++) {
-                                DataField field = newFields.get(i);
-                                if (!field.name().equals(fieldName)) {
-                                    continue;
-                                }
-
-                                DataField newField =
-                                        new DataField(
-                                                field.id(),
-                                                rename.newName(),
-                                                field.type(),
-                                                field.description());
-                                newFields.set(i, newField);
-                                return;
-                            }
+    public TableSchema generateTableSchema(
+            TableSchema oldTableSchema, List<SchemaChange> changes, 
LazyField<Boolean> hasSnapshots)
+            throws Catalog.ColumnAlreadyExistException, 
Catalog.ColumnNotExistException {
+        Map<String, String> oldOptions = new 
HashMap<>(oldTableSchema.options());
+        Map<String, String> newOptions = new 
HashMap<>(oldTableSchema.options());
+        List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
+        AtomicInteger highestFieldId = new 
AtomicInteger(oldTableSchema.highestFieldId());
+        String newComment = oldTableSchema.comment();
+        for (SchemaChange change : changes) {
+            if (change instanceof SetOption) {
+                SetOption setOption = (SetOption) change;
+                if (hasSnapshots.get()) {
+                    checkAlterTableOption(
+                            setOption.key(),
+                            oldOptions.get(setOption.key()),
+                            setOption.value(),
+                            false);
+                }
+                newOptions.put(setOption.key(), setOption.value());
+            } else if (change instanceof RemoveOption) {
+                RemoveOption removeOption = (RemoveOption) change;
+                if (hasSnapshots.get()) {
+                    checkResetTableOption(removeOption.key());
+                }
+                newOptions.remove(removeOption.key());
+            } else if (change instanceof UpdateComment) {
+                UpdateComment updateComment = (UpdateComment) change;
+                newComment = updateComment.comment();
+            } else if (change instanceof AddColumn) {
+                AddColumn addColumn = (AddColumn) change;
+                SchemaChange.Move move = addColumn.move();
+                Preconditions.checkArgument(
+                        addColumn.dataType().isNullable(),
+                        "Column %s cannot specify NOT NULL in the %s table.",
+                        String.join(".", addColumn.fieldNames()),
+                        identifierFromPath(tableRoot.toString(), true, 
branch).getFullName());
+                int id = highestFieldId.incrementAndGet();
+                DataType dataType = 
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
+
+                new NestedColumnModifier(addColumn.fieldNames()) {
+                    @Override
+                    protected void updateLastColumn(List<DataField> newFields, 
String fieldName)
+                            throws Catalog.ColumnAlreadyExistException {
+                        assertColumnNotExists(newFields, fieldName);
+
+                        DataField dataField =
+                                new DataField(id, fieldName, dataType, 
addColumn.description());
+
+                        // key: name ; value : index
+                        Map<String, Integer> map = new HashMap<>();
+                        for (int i = 0; i < newFields.size(); i++) {
+                            map.put(newFields.get(i).name(), i);
                         }
-                    }.updateIntermediateColumn(newFields, 0);
-                } else if (change instanceof DropColumn) {
-                    DropColumn drop = (DropColumn) change;
-                    dropColumnValidation(oldTableSchema, drop);
-                    new NestedColumnModifier(drop.fieldNames()) {
-                        @Override
-                        protected void updateLastColumn(List<DataField> 
newFields, String fieldName)
-                                throws Catalog.ColumnNotExistException {
-                            assertColumnExists(newFields, fieldName);
-                            newFields.removeIf(f -> 
f.name().equals(fieldName));
-                            if (newFields.isEmpty()) {
-                                throw new IllegalArgumentException(
-                                        "Cannot drop all fields in table");
+
+                        if (null != move) {
+                            if 
(move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
+                                newFields.add(0, dataField);
+                            } else if 
(move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
+                                int fieldIndex = 
map.get(move.referenceFieldName());
+                                newFields.add(fieldIndex + 1, dataField);
                             }
+                        } else {
+                            newFields.add(dataField);
                         }
-                    }.updateIntermediateColumn(newFields, 0);
-                } else if (change instanceof UpdateColumnType) {
-                    UpdateColumnType update = (UpdateColumnType) change;
-                    assertNotUpdatingPrimaryKeys(oldTableSchema, 
update.fieldNames(), "update");
-                    updateNestedColumn(
-                            newFields,
-                            update.fieldNames(),
-                            (field) -> {
-                                DataType targetType = update.newDataType();
-                                if (update.keepNullability()) {
-                                    targetType = 
targetType.copy(field.type().isNullable());
-                                }
-                                checkState(
-                                        
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
-                                                && 
CastExecutors.resolve(field.type(), targetType)
-                                                        != null,
-                                        String.format(
-                                                "Column type %s[%s] cannot be 
converted to %s without loosing information.",
-                                                field.name(), field.type(), 
targetType));
-                                return new DataField(
-                                        field.id(), field.name(), targetType, 
field.description());
-                            });
-                } else if (change instanceof UpdateColumnNullability) {
-                    UpdateColumnNullability update = (UpdateColumnNullability) 
change;
-                    if (update.fieldNames().length == 1
-                            && update.newNullability()
-                            && 
oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
-                        throw new UnsupportedOperationException(
-                                "Cannot change nullability of primary key");
                     }
-                    updateNestedColumn(
-                            newFields,
-                            update.fieldNames(),
-                            (field) ->
-                                    new DataField(
-                                            field.id(),
-                                            field.name(),
-                                            
field.type().copy(update.newNullability()),
-                                            field.description()));
-                } else if (change instanceof UpdateColumnComment) {
-                    UpdateColumnComment update = (UpdateColumnComment) change;
-                    updateNestedColumn(
-                            newFields,
-                            update.fieldNames(),
-                            (field) ->
+                }.updateIntermediateColumn(newFields, 0);
+            } else if (change instanceof RenameColumn) {
+                RenameColumn rename = (RenameColumn) change;
+                assertNotUpdatingPrimaryKeys(oldTableSchema, 
rename.fieldNames(), "rename");
+                new NestedColumnModifier(rename.fieldNames()) {
+                    @Override
+                    protected void updateLastColumn(List<DataField> newFields, 
String fieldName)
+                            throws Catalog.ColumnNotExistException,
+                                    Catalog.ColumnAlreadyExistException {
+                        assertColumnExists(newFields, fieldName);
+                        assertColumnNotExists(newFields, rename.newName());
+                        for (int i = 0; i < newFields.size(); i++) {
+                            DataField field = newFields.get(i);
+                            if (!field.name().equals(fieldName)) {
+                                continue;
+                            }
+
+                            DataField newField =
                                     new DataField(
                                             field.id(),
-                                            field.name(),
+                                            rename.newName(),
                                             field.type(),
-                                            update.newDescription()));
-                } else if (change instanceof UpdateColumnPosition) {
-                    UpdateColumnPosition update = (UpdateColumnPosition) 
change;
-                    SchemaChange.Move move = update.move();
-                    applyMove(newFields, move);
-                } else {
+                                            field.description());
+                            newFields.set(i, newField);
+                            return;
+                        }
+                    }
+                }.updateIntermediateColumn(newFields, 0);
+            } else if (change instanceof DropColumn) {
+                DropColumn drop = (DropColumn) change;
+                dropColumnValidation(oldTableSchema, drop);
+                new NestedColumnModifier(drop.fieldNames()) {
+                    @Override
+                    protected void updateLastColumn(List<DataField> newFields, 
String fieldName)
+                            throws Catalog.ColumnNotExistException {
+                        assertColumnExists(newFields, fieldName);
+                        newFields.removeIf(f -> f.name().equals(fieldName));
+                        if (newFields.isEmpty()) {
+                            throw new IllegalArgumentException("Cannot drop 
all fields in table");
+                        }
+                    }
+                }.updateIntermediateColumn(newFields, 0);
+            } else if (change instanceof UpdateColumnType) {
+                UpdateColumnType update = (UpdateColumnType) change;
+                assertNotUpdatingPrimaryKeys(oldTableSchema, 
update.fieldNames(), "update");
+                updateNestedColumn(
+                        newFields,
+                        update.fieldNames(),
+                        (field) -> {
+                            DataType targetType = update.newDataType();
+                            if (update.keepNullability()) {
+                                targetType = 
targetType.copy(field.type().isNullable());
+                            }
+                            checkState(
+                                    
DataTypeCasts.supportsExplicitCast(field.type(), targetType)
+                                            && 
CastExecutors.resolve(field.type(), targetType)
+                                                    != null,
+                                    String.format(
+                                            "Column type %s[%s] cannot be 
converted to %s without loosing information.",
+                                            field.name(), field.type(), 
targetType));
+                            return new DataField(
+                                    field.id(), field.name(), targetType, 
field.description());
+                        });
+            } else if (change instanceof UpdateColumnNullability) {
+                UpdateColumnNullability update = (UpdateColumnNullability) 
change;
+                if (update.fieldNames().length == 1
+                        && update.newNullability()
+                        && 
oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
                     throw new UnsupportedOperationException(
-                            "Unsupported change: " + change.getClass());
-                }
-            }
-
-            // We change TableSchema to Schema, because we want to deal with 
primary-key and
-            // partition in options.
-            Schema newSchema =
-                    new Schema(
-                            newFields,
-                            oldTableSchema.partitionKeys(),
-                            applyNotNestedColumnRename(
-                                    oldTableSchema.primaryKeys(),
-                                    Iterables.filter(changes, 
RenameColumn.class)),
-                            applySchemaChanges(newOptions, changes),
-                            newComment);
-            TableSchema newTableSchema =
-                    new TableSchema(
-                            oldTableSchema.id() + 1,
-                            newSchema.fields(),
-                            highestFieldId.get(),
-                            newSchema.partitionKeys(),
-                            newSchema.primaryKeys(),
-                            newSchema.options(),
-                            newSchema.comment());
-
-            try {
-                boolean success = commit(newTableSchema);
-                if (success) {
-                    return newTableSchema;
+                            "Cannot change nullability of primary key");
                 }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+                updateNestedColumn(
+                        newFields,
+                        update.fieldNames(),
+                        (field) ->
+                                new DataField(
+                                        field.id(),
+                                        field.name(),
+                                        
field.type().copy(update.newNullability()),
+                                        field.description()));
+            } else if (change instanceof UpdateColumnComment) {
+                UpdateColumnComment update = (UpdateColumnComment) change;
+                updateNestedColumn(
+                        newFields,
+                        update.fieldNames(),
+                        (field) ->
+                                new DataField(
+                                        field.id(),
+                                        field.name(),
+                                        field.type(),
+                                        update.newDescription()));
+            } else if (change instanceof UpdateColumnPosition) {
+                UpdateColumnPosition update = (UpdateColumnPosition) change;
+                SchemaChange.Move move = update.move();
+                applyMove(newFields, move);
+            } else {
+                throw new UnsupportedOperationException("Unsupported change: " 
+ change.getClass());
             }
         }
+
+        // We change TableSchema to Schema, because we want to deal with 
primary-key and
+        // partition in options.
+        Schema newSchema =
+                new Schema(
+                        newFields,
+                        oldTableSchema.partitionKeys(),
+                        applyNotNestedColumnRename(
+                                oldTableSchema.primaryKeys(),
+                                Iterables.filter(changes, RenameColumn.class)),
+                        applySchemaChanges(newOptions, changes),
+                        newComment);
+
+        return new TableSchema(
+                oldTableSchema.id() + 1,
+                newSchema.fields(),
+                highestFieldId.get(),
+                newSchema.partitionKeys(),
+                newSchema.primaryKeys(),
+                newSchema.options(),
+                newSchema.comment());
     }
 
     public void applyMove(List<DataField> newFields, SchemaChange.Move move) {

Reply via email to