This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e18bd43 feat(table): add schema evolution support  (#596)
3e18bd43 is described below

commit 3e18bd4366819724f51d98e66ab8522c74cea12f
Author: Blue Li <[email protected]>
AuthorDate: Fri Oct 24 01:19:17 2025 +0800

    feat(table): add schema evolution support  (#596)
    
    ### Summary
    
    This PR introduces initial schema evolution capabilities, allowing users
    to modify table schemas through a fluent API. This work was originally
    started by @Shreyas220 in PR #431 , but due to time constraints, I'm
    continuing the implementation.
    
    ### Key Features
    
    - **Add Columns**: Support for adding primitive types, structs, lists,
    and maps
    - **Delete Columns**: Remove fields from the schema
    - **Update Columns**: Modify field properties including name, type,
    nullability, and documentation
    - **Move Columns**: Reorder fields with operations like `MoveFirst`,
    `MoveBefore`, and `MoveAfter`
    - **Rename Columns**: Change field names
    - **Basic Validation**: Type checking and compatibility validation
    
    ### API Usage
    
    ```go
    // Create a transaction and update schema
    txn := table.NewTransaction()
    updateSchema := txn.UpdateSchema(true, false) // caseSensitive=true, 
allowIncompatibleChanges=false
    
    // Add a new column
    updateSchema.AddColumn([]string{"gender"}, iceberg.PrimitiveTypes.String, 
"User gender", false, nil)
    
    // Add nested column in struct
    updateSchema.AddColumn([]string{"address", "country"}, 
iceberg.PrimitiveTypes.String, "Country code", false, 
iceberg.StringLiteral("US"))
    
    // Add complex nested structure
    updateSchema.AddColumn([]string{"profile"}, &iceberg.StructType{
        FieldList: []iceberg.NestedField{
            {Name: "bio", Type: iceberg.PrimitiveTypes.String, Required: false},
            {Name: "avatar", Type: iceberg.PrimitiveTypes.String, Required: 
false},
        },
    }, "User profile", false, nil)
    
    // Add list type
    updateSchema.AddColumn([]string{"tags"}, &iceberg.ListType{
        Element: iceberg.PrimitiveTypes.String,
        ElementRequired: false,
    }, "User tags", false, nil)
    
    // Rename a column
    updateSchema.RenameColumn([]string{"age"}, "user_age")
    
    // Update column properties
    updateSchema.UpdateColumn([]string{"name"}, ColumnUpdate{
        Required: iceberg.Optional[bool]{Valid: true, Val: true},
        Doc: iceberg.Optional[string]{Valid: true, Val: "User's full name"},
    })
    
    // Move column to first position
    updateSchema.MoveFirst([]string{"id"})
    
    // Move column before another column
    updateSchema.MoveBefore([]string{"name"}, []string{"age"})
    
    // Delete a column
    updateSchema.DeleteColumn([]string{"old_field"})
    
    // Chain multiple operations
    updateSchema.
        AddColumn([]string{"email"}, iceberg.PrimitiveTypes.String, "Email 
address", true, nil).
        RenameColumn([]string{"phone"}, "phone_number").
        MoveAfter([]string{"email"}, []string{"name"}).
        DeleteColumn([]string{"deprecated_field"})
    
    // Preview changes without committing
    newSchema, err := updateSchema.Apply()
    if err != nil {
        // Handle validation errors
        return err
    }
    
    // Commit the changes
    err := updateSchema.Commit()
    if err != nil {
        return err
    }
    
    // Commit the entire transaction
    updatedTable, err := txn.Commit(ctx)
    ```
---
 table/transaction.go        |  17 +
 table/update_schema.go      | 944 ++++++++++++++++++++++++++++++++++++++++++++
 table/update_schema_test.go | 881 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 1842 insertions(+)

diff --git a/table/transaction.go b/table/transaction.go
index 09f0fa91..d949df3a 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -147,6 +147,23 @@ func (t *Transaction) UpdateSpec(caseSensitive bool) 
*UpdateSpec {
        return NewUpdateSpec(t, caseSensitive)
 }
 
+// UpdateSchema creates a new UpdateSchema instance for managing schema changes
+// within this transaction.
+//
+// Parameters:
+//   - caseSensitive: If true, field name lookups are case-sensitive; if false,
+//     field names are matched case-insensitively.
+//   - allowIncompatibleChanges: If true, allows schema changes that would 
normally
+//     be rejected for being incompatible (e.g., adding required fields without
+//     default values, changing field types in non-promotable ways, or changing
+//     column nullability from optional to required).
+//   - opts: Optional configuration functions to customize the UpdateSchema 
behavior.
+//
+// Returns an UpdateSchema instance that can be used to build and apply schema 
changes.
+func (t *Transaction) UpdateSchema(caseSensitive bool, 
allowIncompatibleChanges bool, opts ...UpdateSchemaOption) *UpdateSchema {
+       return NewUpdateSchema(t, caseSensitive, allowIncompatibleChanges, 
opts...)
+}
+
 type expireSnapshotsCfg struct {
        minSnapshotsToKeep *int
        maxSnapshotAgeMs   *int64
diff --git a/table/update_schema.go b/table/update_schema.go
new file mode 100644
index 00000000..0492f085
--- /dev/null
+++ b/table/update_schema.go
@@ -0,0 +1,944 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "errors"
+       "fmt"
+       "maps"
+       "slices"
+       "strings"
+
+       "github.com/apache/iceberg-go"
+)
+
+const TableRootID = -1
+
+type MoveOp string
+
+const (
+       MoveOpFirst  MoveOp = "first"
+       MoveOpBefore MoveOp = "before"
+       MoveOpAfter  MoveOp = "after"
+)
+
+type move struct {
+       FieldID    int
+       RelativeTo int
+       Op         MoveOp
+}
+
+// UpdateSchema manages schema evolution operations within a transaction.
+// It supports adding, deleting, renaming, updating, and reordering columns,
+// and ensures all changes are validated before being committed.
+//
+// Operations can be chained together and are applied in the order they are 
called.
+// Changes are not persisted until Commit() is called.
+//
+// Basic Usage:
+//
+//     txn := table.NewTransaction()
+//     updateSchema := txn.UpdateSchema(true, false)
+//
+//     // Add a new column
+//     updateSchema.AddColumn([]string{"email"}, 
iceberg.PrimitiveTypes.String, "Email address", false, nil)
+//
+//     // Commit changes
+//     if err := updateSchema.Commit(); err != nil {
+//         return err
+//     }
+//     if _, err := txn.Commit(ctx); err != nil {
+//         return err
+//     }
+//
+// Chaining Operations:
+//
+//     updateSchema.
+//         AddColumn([]string{"age"}, iceberg.PrimitiveTypes.Int, "User age", 
false, nil).
+//         RenameColumn([]string{"name"}, "full_name").
+//         MoveFirst([]string{"id"}).
+//         Commit()
+//
+// Adding Nested Columns:
+//
+//     // Add a column to a struct field
+//     updateSchema.AddColumn([]string{"address", "country"}, 
iceberg.PrimitiveTypes.String, "Country code", false, 
iceberg.StringLiteral("US"))
+//
+//     // Commit the schema update
+//     if err := updateSchema.Commit(); err != nil {
+//         return err
+//     }
+//     if _, err := txn.Commit(ctx); err != nil {
+//         return err
+//     }
+type UpdateSchema struct {
+       txn          *Transaction
+       schema       *iceberg.Schema
+       lastColumnID int
+
+       deletes map[int]struct{}
+       updates map[int]map[int]iceberg.NestedField
+       adds    map[int][]iceberg.NestedField
+       moves   map[int][]move
+
+       identifierFieldNames map[string]struct{}
+       parentID             map[int]int
+
+       addedNameToID            map[string]int
+       allowIncompatibleChanges bool
+       caseSensitive            bool
+       nameMapping              iceberg.NameMapping
+       ops                      []func() error
+}
+
+// UpdateSchemaOption is a functional option for configuring UpdateSchema.
+type UpdateSchemaOption func(*UpdateSchema)
+
+// WithNameMapping configures the UpdateSchema to use the provided name mapping
+// for tracking field name changes and ensuring consistency during schema 
evolution.
+func WithNameMapping(nameMapping iceberg.NameMapping) UpdateSchemaOption {
+       return func(u *UpdateSchema) {
+               u.nameMapping = nameMapping
+       }
+}
+
+// NewUpdateSchema creates a new UpdateSchema instance for managing schema 
changes
+// within a transaction.
+//
+// Parameters:
+//   - txn: The transaction that this schema update will be applied to.
+//   - caseSensitive: If true, field name lookups are case-sensitive; if false,
+//     field names are matched case-insensitively.
+//   - allowIncompatibleChanges: If true, allows schema changes that would 
normally
+//     be rejected for being incompatible (e.g., adding required fields without
+//     default values, changing field types in non-promotable ways, or changing
+//     column nullability from optional to required).
+//   - opts: Optional configuration functions to customize the UpdateSchema 
behavior.
+//
+// Returns an UpdateSchema instance that can be used to build and apply schema 
changes.
+func NewUpdateSchema(txn *Transaction, caseSensitive bool, 
allowIncompatibleChanges bool, opts ...UpdateSchemaOption) *UpdateSchema {
+       u := &UpdateSchema{
+               txn:          txn,
+               schema:       nil,
+               lastColumnID: txn.meta.CurrentSchema().HighestFieldID(),
+
+               deletes: make(map[int]struct{}),
+               updates: make(map[int]map[int]iceberg.NestedField),
+               adds:    make(map[int][]iceberg.NestedField),
+               moves:   make(map[int][]move),
+
+               identifierFieldNames: nil,
+               parentID:             make(map[int]int),
+
+               addedNameToID:            make(map[string]int),
+               allowIncompatibleChanges: allowIncompatibleChanges,
+               caseSensitive:            caseSensitive,
+               nameMapping:              nil,
+               ops:                      make([]func() error, 0),
+       }
+
+       for _, opt := range opts {
+               opt(u)
+       }
+
+       return u
+}
+
+func (u *UpdateSchema) init() error {
+       if u.txn == nil {
+               return errors.New("transaction is nil")
+       }
+       if u.txn.meta == nil {
+               return errors.New("transaction meta is nil")
+       }
+
+       u.schema = u.txn.meta.CurrentSchema()
+       if u.schema == nil {
+               return errors.New("current schema is nil")
+       }
+
+       if err := u.initIdentifierFieldNames(); err != nil {
+               return err
+       }
+
+       if err := u.initParentID(); err != nil {
+               return err
+       }
+
+       return nil
+}
+
+func (u *UpdateSchema) initIdentifierFieldNames() error {
+       if u.identifierFieldNames != nil {
+               return nil
+       }
+
+       identifierFieldNames := make(map[string]struct{})
+       for _, id := range u.schema.IdentifierFieldIDs {
+               name, ok := u.schema.FindColumnName(id)
+               if !ok {
+                       return fmt.Errorf("identifier field %d not found", id)
+               }
+               identifierFieldNames[name] = struct{}{}
+       }
+
+       u.identifierFieldNames = identifierFieldNames
+
+       return nil
+}
+
+func (u *UpdateSchema) initParentID() error {
+       parents, err := iceberg.IndexParents(u.schema)
+       if err != nil {
+               return err
+       }
+
+       maps.Copy(u.parentID, parents)
+
+       return nil
+}
+
+func (u *UpdateSchema) assignNewColumnID() int {
+       u.lastColumnID++
+
+       return u.lastColumnID
+}
+
+func (u *UpdateSchema) findField(name string) (iceberg.NestedField, bool) {
+       if u.caseSensitive {
+               return u.schema.FindFieldByName(name)
+       } else {
+               return u.schema.FindFieldByNameCaseInsensitive(name)
+       }
+}
+
+func (u *UpdateSchema) isDeleted(fieldID int) bool {
+       _, ok := u.deletes[fieldID]
+
+       return ok
+}
+
+func (u *UpdateSchema) findParentID(fieldID int) int {
+       parentID, ok := u.parentID[fieldID]
+       if !ok {
+               return TableRootID
+       }
+
+       return parentID
+}
+
+func (u *UpdateSchema) AddColumn(path []string, fieldType iceberg.Type, doc 
string, required bool, defaultValue iceberg.Literal) *UpdateSchema {
+       u.ops = append(u.ops, func() error {
+               return u.addColumn(path, fieldType, doc, required, defaultValue)
+       })
+
+       return u
+}
+
+func (u *UpdateSchema) addColumn(path []string, fieldType iceberg.Type, doc 
string, required bool, defaultValue iceberg.Literal) error {
+       if len(path) == 0 {
+               return errors.New("path is empty")
+       }
+
+       fullName := strings.Join(path, ".")
+
+       switch t := fieldType.(type) {
+       case *iceberg.ListType, *iceberg.MapType, *iceberg.StructType:
+               if defaultValue != nil {
+                       return fmt.Errorf("default values are not supported for 
%s", t.String())
+               }
+       case iceberg.PrimitiveType:
+               if required && defaultValue == nil && 
!u.allowIncompatibleChanges {
+                       return fmt.Errorf("required field %s has no default 
value", fullName)
+               }
+               if defaultValue != nil && !defaultValue.Type().Equals(t) {
+                       return fmt.Errorf("default value type mismatch: %s != 
%s", defaultValue.Type(), t)
+               }
+       default:
+               return fmt.Errorf("invalid field type: %T", t)
+       }
+
+       parent := path[:len(path)-1]
+       parentID := TableRootID
+
+       if len(parent) > 0 {
+               parentFullPath := strings.Join(parent, ".")
+               parentField, ok := u.findField(parentFullPath)
+               if !ok {
+                       return fmt.Errorf("parent field not found: %s", 
parentFullPath)
+               }
+
+               switch parentType := parentField.Type.(type) {
+               case *iceberg.ListType:
+                       f := parentType.ElementField()
+                       parentField = f
+               case *iceberg.MapType:
+                       f := parentType.ValueField()
+                       parentField = f
+               }
+
+               if _, ok := parentField.Type.(*iceberg.StructType); !ok {
+                       return fmt.Errorf("cannot add field to non-struct type: 
%s", parentFullPath)
+               }
+
+               parentID = parentField.ID
+       }
+
+       name := path[len(path)-1]
+       for _, add := range u.adds[parentID] {
+               if add.Name == name {
+                       return fmt.Errorf("field already exists in adds: %s", 
fullName)
+               }
+       }
+
+       // support add field with the same name as deleted field and renamed 
field
+       if field, ok := u.findField(fullName); ok {
+               if !u.isDeleted(field.ID) {
+                       for _, upd := range u.updates[parentID] {
+                               if upd.Name == name {
+                                       return fmt.Errorf("field already 
exists: %s", fullName)
+                               }
+                       }
+               }
+       }
+
+       field := iceberg.NestedField{
+               Name:     name,
+               Type:     fieldType,
+               Required: required,
+               Doc:      doc,
+       }
+       if defaultValue != nil {
+               field.InitialDefault = defaultValue.Any()
+               field.WriteDefault = defaultValue.Any()
+       }
+
+       sch, err := iceberg.AssignFreshSchemaIDs(iceberg.NewSchema(0, field), 
u.assignNewColumnID)
+       if err != nil {
+               return fmt.Errorf("failed to assign field id: %w", err)
+       }
+       u.adds[parentID] = append(u.adds[parentID], sch.Field(0))
+       u.addedNameToID[fullName] = sch.Field(0).ID
+
+       return nil
+}
+
+func (u *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+       u.ops = append(u.ops, func() error {
+               return u.deleteColumn(path)
+       })
+
+       return u
+}
+
+func (u *UpdateSchema) deleteColumn(path []string) error {
+       fullName := strings.Join(path, ".")
+       field, ok := u.findField(fullName)
+       if !ok {
+               return fmt.Errorf("field not found: %s", fullName)
+       }
+
+       if _, ok := u.adds[field.ID]; ok {
+               return fmt.Errorf("field that has additions cannot be deleted: 
%s", fullName)
+       }
+
+       if _, ok := u.updates[field.ID]; ok {
+               return fmt.Errorf("field that has updates cannot be deleted: 
%s", fullName)
+       }
+
+       delete(u.identifierFieldNames, fullName)
+
+       u.deletes[field.ID] = struct{}{}
+
+       return nil
+}
+
+type ColumnUpdate struct {
+       Name         iceberg.Optional[string]
+       FieldType    iceberg.Optional[iceberg.Type]
+       Required     iceberg.Optional[bool]
+       WriteDefault iceberg.Optional[iceberg.Literal]
+       Doc          iceberg.Optional[string]
+}
+
+func (u *UpdateSchema) UpdateColumn(path []string, update ColumnUpdate) 
*UpdateSchema {
+       u.ops = append(u.ops, func() error {
+               return u.updateColumn(path, update)
+       })
+
+       return u
+}
+
+func (u *UpdateSchema) updateColumn(path []string, update ColumnUpdate) error {
+       if !update.Name.Valid &&
+               !update.FieldType.Valid &&
+               !update.Required.Valid &&
+               !update.WriteDefault.Valid &&
+               !update.Doc.Valid {
+               return nil
+       }
+
+       fullName := strings.Join(path, ".")
+
+       field, ok := u.findField(fullName)
+       if !ok {
+               return fmt.Errorf("field not found: %s", fullName)
+       }
+
+       if u.isDeleted(field.ID) {
+               return fmt.Errorf("field that has been deleted cannot be 
updated: %s", fullName)
+       }
+
+       parentID := u.findParentID(field.ID)
+
+       if update.Name.Valid {
+               if update.Name.Val == "" {
+                       return fmt.Errorf("cannot rename field to empty name: 
%s", fullName)
+               }
+               if field.Name == update.Name.Val {
+                       return fmt.Errorf("cannot rename field to the same 
name: %s", fullName)
+               }
+
+               newFullName := strings.Join(append(path[:len(path)-1], 
update.Name.Val), ".")
+               if existingField, ok := u.findField(newFullName); ok {
+                       if !u.isDeleted(existingField.ID) {
+                               return fmt.Errorf("field already exists: %s", 
newFullName)
+                       }
+               }
+
+               for _, add := range u.adds[parentID] {
+                       if add.Name == update.Name.Val {
+                               return fmt.Errorf("cannot rename field to added 
field: %s", newFullName)
+                       }
+               }
+
+               for _, upd := range u.updates[parentID] {
+                       if upd.Name == update.Name.Val && upd.ID != field.ID {
+                               return fmt.Errorf("cannot rename field to 
renamed field: %s", newFullName)
+                       }
+               }
+
+               if _, ok := u.identifierFieldNames[fullName]; ok {
+                       delete(u.identifierFieldNames, fullName)
+                       u.identifierFieldNames[newFullName] = struct{}{}
+               }
+       }
+
+       if update.FieldType.Valid {
+               if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+                       return fmt.Errorf("cannot update field type for 
non-primitive type: %s", fullName)
+               }
+               if !update.FieldType.Val.Equals(field.Type) && 
!u.allowIncompatibleChanges {
+                       fieldType, err := iceberg.PromoteType(field.Type, 
update.FieldType.Val)
+                       if err != nil {
+                               return err
+                       }
+                       update.FieldType.Val = fieldType
+               }
+       }
+
+       if update.Required.Valid {
+               if field.Required != update.Required.Val {
+                       if !u.allowIncompatibleChanges && update.Required.Val {
+                               return fmt.Errorf("cannot change column 
nullability from optional to required: %s", fullName)
+                       }
+               }
+       }
+
+       if update.WriteDefault.Valid {
+               if update.WriteDefault.Val == nil {
+                       if field.Required && !u.allowIncompatibleChanges {
+                               return fmt.Errorf("cannot change default value 
of required column to nil: %s", fullName)
+                       }
+               }
+       }
+
+       if _, ok := u.updates[parentID]; !ok {
+               u.updates[parentID] = make(map[int]iceberg.NestedField)
+       }
+
+       updatedField, ok := u.updates[parentID][field.ID]
+       if !ok {
+               updatedField = field
+       }
+       if update.Name.Valid {
+               updatedField.Name = update.Name.Val
+       }
+       if update.FieldType.Valid {
+               updatedField.Type = update.FieldType.Val
+       }
+       if update.Required.Valid {
+               updatedField.Required = update.Required.Val
+       }
+       if update.WriteDefault.Valid {
+               updatedField.WriteDefault = update.WriteDefault.Val.Any()
+       }
+       if update.Doc.Valid {
+               updatedField.Doc = update.Doc.Val
+       }
+       u.updates[parentID][field.ID] = updatedField
+
+       return nil
+}
+
+func (u *UpdateSchema) RenameColumn(path []string, newName string) 
*UpdateSchema {
+       u.ops = append(u.ops, func() error {
+               return u.updateColumn(path, ColumnUpdate{
+                       Name: iceberg.Optional[string]{
+                               Valid: true,
+                               Val:   newName,
+                       },
+               })
+       })
+
+       return u
+}
+
+func (u *UpdateSchema) MoveColumn(op MoveOp, path, relativeTo []string) 
*UpdateSchema {
+       u.ops = append(u.ops, func() error {
+               return u.moveColumn(op, path, relativeTo)
+       })
+
+       return u
+}
+
+func (u *UpdateSchema) MoveFirst(path []string) *UpdateSchema {
+       u.ops = append(u.ops, func() error {
+               return u.moveColumn(MoveOpFirst, path, nil)
+       })
+
+       return u
+}
+
+func (u *UpdateSchema) MoveBefore(path, relativeTo []string) *UpdateSchema {
+       u.ops = append(u.ops, func() error {
+               return u.moveColumn(MoveOpBefore, path, relativeTo)
+       })
+
+       return u
+}
+
+func (u *UpdateSchema) MoveAfter(path, relativeTo []string) *UpdateSchema {
+       u.ops = append(u.ops, func() error {
+               return u.moveColumn(MoveOpAfter, path, relativeTo)
+       })
+
+       return u
+}
+
+func (u *UpdateSchema) findFieldForMove(name string) (int, bool) {
+       field, ok := u.findField(name)
+       if ok {
+               return field.ID, true
+       }
+       id, ok := u.addedNameToID[name]
+
+       return id, ok
+}
+
+func (u *UpdateSchema) moveColumn(op MoveOp, path []string, relativeTo 
[]string) error {
+       fullName := strings.Join(path, ".")
+       fieldID, ok := u.findFieldForMove(fullName)
+       if !ok {
+               return fmt.Errorf("field not found: %s", fullName)
+       }
+
+       if u.isDeleted(fieldID) {
+               return fmt.Errorf("field that has been deleted cannot be moved: 
%s", fullName)
+       }
+
+       parentID := u.findParentID(fieldID)
+
+       switch op {
+       case MoveOpFirst:
+               u.moves[parentID] = append(u.moves[parentID], move{
+                       FieldID:    fieldID,
+                       RelativeTo: -1,
+                       Op:         op,
+               })
+
+               return nil
+       case MoveOpBefore, MoveOpAfter:
+               relativeToFullName := strings.Join(relativeTo, ".")
+               relativeToFieldID, ok := u.findFieldForMove(relativeToFullName)
+               if !ok {
+                       return fmt.Errorf("relative to field not found: %s", 
relativeToFullName)
+               }
+
+               if relativeToFieldID == fieldID {
+                       return fmt.Errorf("cannot move a field to itself: %s", 
fullName)
+               }
+
+               if u.findParentID(relativeToFieldID) != parentID {
+                       return fmt.Errorf("relative to field is not a child of 
the parent: %s", relativeToFullName)
+               }
+               u.moves[parentID] = append(u.moves[parentID], move{
+                       FieldID:    fieldID,
+                       RelativeTo: relativeToFieldID,
+                       Op:         op,
+               })
+
+               return nil
+       default:
+
+               return fmt.Errorf("invalid move operation: %s", op)
+       }
+}
+
+func (u *UpdateSchema) SetIdentifierField(paths [][]string) *UpdateSchema {
+       identifierFieldNames := make(map[string]struct{})
+       for _, path := range paths {
+               identifierFieldNames[strings.Join(path, ".")] = struct{}{}
+       }
+       u.identifierFieldNames = identifierFieldNames
+
+       return u
+}
+
+func (u *UpdateSchema) BuildUpdates() ([]Update, []Requirement, error) {
+       newSchema, err := u.Apply()
+       if err != nil {
+               return nil, nil, err
+       }
+
+       existingSchemaID := -1
+       for _, schema := range u.txn.meta.schemaList {
+               if newSchema.Equals(schema) {
+                       existingSchemaID = schema.ID
+
+                       break
+               }
+       }
+
+       requirements := make([]Requirement, 0)
+       updates := make([]Update, 0)
+
+       if existingSchemaID != u.schema.ID {
+               requirements = append(requirements, 
AssertCurrentSchemaID(u.schema.ID))
+               if existingSchemaID == -1 {
+                       updates = append(
+                               updates,
+                               NewAddSchemaUpdate(newSchema),
+                               NewSetCurrentSchemaUpdate(newSchema.ID),
+                       )
+               } else {
+                       updates = append(updates,
+                               NewSetCurrentSchemaUpdate(newSchema.ID),
+                       )
+               }
+
+               if u.nameMapping != nil {
+                       updatesMap := make(map[int]iceberg.NestedField)
+                       for _, upds := range u.updates {
+                               maps.Copy(updatesMap, upds)
+                       }
+                       updatedNameMapping, err := 
iceberg.UpdateNameMapping(u.nameMapping, updatesMap, u.adds)
+                       if err != nil {
+                               return nil, nil, err
+                       }
+                       updates = append(updates, 
NewSetPropertiesUpdate(iceberg.Properties{
+                               DefaultNameMappingKey: 
updatedNameMapping.String(),
+                       }))
+               }
+       }
+
+       return updates, requirements, nil
+}
+
+func (u *UpdateSchema) Apply() (*iceberg.Schema, error) {
+       if err := u.init(); err != nil {
+               return nil, err
+       }
+
+       for _, op := range u.ops {
+               if err := op(); err != nil {
+                       return nil, err
+               }
+       }
+
+       updates := make(map[int]iceberg.NestedField)
+       for _, upds := range u.updates {
+               maps.Copy(updates, upds)
+       }
+       st, err := iceberg.Visit(u.schema, &applyChanges{
+               adds:    u.adds,
+               updates: updates,
+               deletes: u.deletes,
+               moves:   u.moves,
+       })
+       if err != nil {
+               return nil, fmt.Errorf("error applying schema changes: %w", err)
+       }
+
+       identifierFieldIDs := make([]int, 0)
+       newSchema := iceberg.NewSchema(0, st.(*iceberg.StructType).FieldList...)
+       for name := range u.identifierFieldNames {
+               var field iceberg.NestedField
+               var ok bool
+               if u.caseSensitive {
+                       field, ok = newSchema.FindFieldByName(name)
+               } else {
+                       field, ok = 
newSchema.FindFieldByNameCaseInsensitive(name)
+               }
+               if !ok {
+                       return nil, fmt.Errorf("identifier field not found: 
%s", name)
+               }
+               identifierFieldIDs = append(identifierFieldIDs, field.ID)
+       }
+
+       nextSchemaID := 1
+       if len(u.txn.meta.schemaList) > 0 {
+               nextSchemaID = 1 + slices.MaxFunc(u.txn.meta.schemaList, 
func(a, b *iceberg.Schema) int {
+                       return a.ID - b.ID
+               }).ID
+       }
+
+       return iceberg.NewSchemaWithIdentifiers(nextSchemaID, 
identifierFieldIDs, st.(*iceberg.StructType).FieldList...), nil
+}
+
+func (u *UpdateSchema) Commit() error {
+       updates, requirements, err := u.BuildUpdates()
+       if err != nil {
+               return err
+       }
+       if len(updates) == 0 {
+               return nil
+       }
+
+       return u.txn.apply(updates, requirements)
+}
+
+type applyChanges struct {
+       adds    map[int][]iceberg.NestedField
+       updates map[int]iceberg.NestedField
+       deletes map[int]struct{}
+       moves   map[int][]move
+}
+
+func (a *applyChanges) Schema(schema *iceberg.Schema, structResult 
iceberg.Type) iceberg.Type {
+       added := a.adds[TableRootID]
+       moves := a.moves[TableRootID]
+
+       if len(added) > 0 || len(moves) > 0 {
+               if newFields := 
addAndMoveFields(structResult.(*iceberg.StructType).Fields(), added, moves); 
newFields != nil {
+                       return &iceberg.StructType{FieldList: newFields}
+               }
+       }
+
+       return structResult
+}
+
+func (a *applyChanges) Struct(structType iceberg.StructType, fieldResults 
[]iceberg.Type) iceberg.Type {
+       hasChanges := false
+       newFields := make([]iceberg.NestedField, 0)
+
+       for i, resultType := range fieldResults {
+               if resultType == nil {
+                       hasChanges = true
+
+                       continue
+               }
+
+               field := structType.Fields()[i]
+
+               name := field.Name
+               doc := field.Doc
+               required := field.Required
+               writeDefault := field.WriteDefault
+
+               if update, ok := a.updates[field.ID]; ok {
+                       name = update.Name
+                       doc = update.Doc
+                       required = update.Required
+                       writeDefault = update.WriteDefault
+               }
+               if field.Name == name &&
+                       field.Type.Equals(resultType) &&
+                       field.Required == required &&
+                       field.Doc == doc &&
+                       field.WriteDefault == writeDefault {
+                       newFields = append(newFields, field)
+               } else {
+                       hasChanges = true
+                       newFields = append(newFields, iceberg.NestedField{
+                               ID:             field.ID,
+                               Name:           name,
+                               Type:           resultType,
+                               Required:       required,
+                               Doc:            doc,
+                               InitialDefault: field.InitialDefault,
+                               WriteDefault:   writeDefault,
+                       })
+               }
+       }
+
+       if hasChanges {
+               return &iceberg.StructType{FieldList: newFields}
+       }
+
+       return &structType
+}
+
+func (a *applyChanges) Field(field iceberg.NestedField, fieldResult 
iceberg.Type) iceberg.Type {
+       if _, ok := a.deletes[field.ID]; ok {
+               return nil
+       }
+
+       if update, ok := a.updates[field.ID]; ok && 
!field.Type.Equals(update.Type) {
+               return update.Type
+       }
+
+       st, ok := fieldResult.(*iceberg.StructType)
+       if !ok {
+               return fieldResult
+       }
+
+       added := a.adds[field.ID]
+       moves := a.moves[field.ID]
+       if len(added) > 0 || len(moves) > 0 {
+               newFields := addAndMoveFields(st.FieldList, added, moves)
+               if len(newFields) > 0 {
+                       return &iceberg.StructType{FieldList: newFields}
+               }
+       }
+
+       return fieldResult
+}
+
+func (a *applyChanges) List(listType iceberg.ListType, elementResult 
iceberg.Type) iceberg.Type {
+       elementType := a.Field(listType.ElementField(), elementResult)
+       if elementType == nil {
+               panic(fmt.Sprintf("cannot delete element type from list: %s", 
elementResult))
+       }
+
+       return &iceberg.ListType{
+               ElementID:       listType.ElementID,
+               Element:         elementType,
+               ElementRequired: listType.ElementRequired,
+       }
+}
+
+func (a *applyChanges) Map(mapType iceberg.MapType, keyResult, valueResult 
iceberg.Type) iceberg.Type {
+       keyID := mapType.KeyID
+       if _, ok := a.deletes[keyID]; ok {
+               panic(fmt.Errorf("cannot delete map keys: %s", 
mapType.String()))
+       }
+
+       if _, ok := a.updates[keyID]; ok {
+               panic(fmt.Errorf("cannot update map keys: %s", 
mapType.String()))
+       }
+
+       if _, ok := a.adds[keyID]; ok {
+               panic(fmt.Errorf("cannot add fields to map keys: %s", 
mapType.String()))
+       }
+
+       if !mapType.KeyType.Equals(keyResult) {
+               panic(fmt.Errorf("cannot alter map keys: %s", mapType.String()))
+       }
+
+       valueField := mapType.ValueField()
+       valueType := a.Field(valueField, valueResult)
+
+       if valueType == nil {
+               panic(fmt.Errorf("cannot delete value type from map: %s", 
mapType.String()))
+       }
+
+       return &iceberg.MapType{
+               KeyID:         mapType.KeyID,
+               KeyType:       mapType.KeyType,
+               ValueID:       mapType.ValueID,
+               ValueType:     valueType,
+               ValueRequired: mapType.ValueRequired,
+       }
+}
+
+func (a *applyChanges) Primitive(primitive iceberg.PrimitiveType) iceberg.Type 
{
+       return primitive
+}
+
+func addFields(fields []iceberg.NestedField, adds []iceberg.NestedField) 
[]iceberg.NestedField {
+       return append(fields, adds...)
+}
+
+func moveFields(fields []iceberg.NestedField, moves []move) 
[]iceberg.NestedField {
+       reordered := slices.Clone(fields)
+       for _, move := range moves {
+               var fieldToMove iceberg.NestedField
+               var fieldIndex int
+               found := false
+               for i, field := range reordered {
+                       if field.ID == move.FieldID {
+                               fieldToMove = field
+                               fieldIndex = i
+                               found = true
+
+                               break
+                       }
+               }
+               if !found {
+                       continue
+               }
+
+               reordered = append(reordered[:fieldIndex], 
reordered[fieldIndex+1:]...)
+
+               switch move.Op {
+               case MoveOpFirst:
+                       reordered = append([]iceberg.NestedField{fieldToMove}, 
reordered...)
+               case MoveOpBefore, MoveOpAfter:
+                       var relativeIndex int
+                       found = false
+                       for i, field := range reordered {
+                               if field.ID == move.RelativeTo {
+                                       relativeIndex = i
+                                       found = true
+
+                                       break
+                               }
+                       }
+                       if !found {
+                               continue
+                       }
+
+                       if move.Op == MoveOpBefore {
+                               reordered = append(reordered[:relativeIndex], 
append([]iceberg.NestedField{fieldToMove}, reordered[relativeIndex:]...)...)
+                       } else {
+                               reordered = append(reordered[:relativeIndex+1], 
append([]iceberg.NestedField{fieldToMove}, reordered[relativeIndex+1:]...)...)
+                       }
+               }
+       }
+
+       return reordered
+}
+
+func addAndMoveFields(fields []iceberg.NestedField, adds 
[]iceberg.NestedField, moves []move) []iceberg.NestedField {
+       if len(adds) > 0 {
+               added := addFields(fields, adds)
+               if len(moves) > 0 {
+                       return moveFields(added, moves)
+               } else {
+                       return added
+               }
+       } else if len(moves) > 0 {
+               return moveFields(fields, moves)
+       }
+       if len(adds) == 0 {
+               return nil
+       }
+
+       return fields
+}
diff --git a/table/update_schema_test.go b/table/update_schema_test.go
new file mode 100644
index 00000000..8f35363d
--- /dev/null
+++ b/table/update_schema_test.go
@@ -0,0 +1,881 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/stretchr/testify/assert"
+)
+
+var originalSchema = iceberg.NewSchema(1,
+       iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: true, Doc: ""},
+       iceberg.NestedField{ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+       iceberg.NestedField{ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+       iceberg.NestedField{ID: 4, Name: "address", Type: &iceberg.StructType{
+               FieldList: []iceberg.NestedField{
+                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+               },
+       }, Required: false, Doc: ""},
+       iceberg.NestedField{ID: 7, Name: "tags", Type: &iceberg.ListType{
+               ElementID:       8,
+               Element:         iceberg.PrimitiveTypes.String,
+               ElementRequired: false,
+       }, Required: false, Doc: ""},
+       iceberg.NestedField{ID: 9, Name: "properties", Type: &iceberg.MapType{
+               KeyID:         10,
+               KeyType:       iceberg.PrimitiveTypes.String,
+               ValueID:       11,
+               ValueType:     iceberg.PrimitiveTypes.String,
+               ValueRequired: false,
+       }, Required: false, Doc: ""},
+)
+
+var testMetadata, _ = NewMetadata(originalSchema, nil, UnsortedSortOrder, "", 
nil)
+
+func TestAddColumn(t *testing.T) {
+       t.Run("test update schema with add primitive type on top level", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).AddColumn([]string{"gender"}, iceberg.PrimitiveTypes.String, "", false, 
iceberg.StringLiteral("male")).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+                       {ID: 4, Name: "address", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               },
+                       }, Required: false, Doc: ""},
+                       {ID: 7, Name: "tags", Type: &iceberg.ListType{
+                               ElementID:       8,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 9, Name: "properties", Type: &iceberg.MapType{
+                               KeyID:         10,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       11,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 12, Name: "gender", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+               }, newSchema.Fields())
+       })
+
+       t.Run("test update schema with add list of primitive type on top 
level", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).AddColumn([]string{"files"}, &iceberg.ListType{
+                       Element:         iceberg.PrimitiveTypes.String,
+                       ElementRequired: false,
+               }, "", false, nil).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+                       {ID: 4, Name: "address", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               },
+                       }, Required: false, Doc: ""},
+                       {ID: 7, Name: "tags", Type: &iceberg.ListType{
+                               ElementID:       8,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 9, Name: "properties", Type: &iceberg.MapType{
+                               KeyID:         10,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       11,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 12, Name: "files", Type: &iceberg.ListType{
+                               ElementID:       13,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+               }, newSchema.Fields())
+       })
+
+       t.Run("test update schema with add map of primitive type on top level", 
func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).AddColumn([]string{"files"}, &iceberg.MapType{
+                       KeyType:       iceberg.PrimitiveTypes.String,
+                       ValueType:     iceberg.PrimitiveTypes.String,
+                       ValueRequired: false,
+               }, "", false, nil).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+                       {ID: 4, Name: "address", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               },
+                       }, Required: false, Doc: ""},
+                       {ID: 7, Name: "tags", Type: &iceberg.ListType{
+                               ElementID:       8,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 9, Name: "properties", Type: &iceberg.MapType{
+                               KeyID:         10,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       11,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 12, Name: "files", Type: &iceberg.MapType{
+                               KeyID:         13,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       14,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+               }, newSchema.Fields())
+       })
+
+       t.Run("test update schema with add struct type on top level", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).AddColumn([]string{"files"}, &iceberg.StructType{
+                       FieldList: []iceberg.NestedField{
+                               {ID: 5, Name: "id", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               {ID: 6, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       },
+               }, "", false, nil).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+                       {ID: 4, Name: "address", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               },
+                       }, Required: false, Doc: ""},
+                       {ID: 7, Name: "tags", Type: &iceberg.ListType{
+                               ElementID:       8,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 9, Name: "properties", Type: &iceberg.MapType{
+                               KeyID:         10,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       11,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 12, Name: "files", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 13, Name: "id", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 14, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               },
+                       }, Required: false, Doc: ""},
+               }, newSchema.Fields())
+       })
+
+       t.Run("test update schema with add primitive in struct", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).AddColumn([]string{"address", "code"}, iceberg.PrimitiveTypes.String, "", 
false, nil).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+                       {ID: 4, Name: "address", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 12, Name: "code", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               },
+                       }, Required: false, Doc: ""},
+                       {ID: 7, Name: "tags", Type: &iceberg.ListType{
+                               ElementID:       8,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 9, Name: "properties", Type: &iceberg.MapType{
+                               KeyID:         10,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       11,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+               }, newSchema.Fields())
+       })
+
+       t.Run("test update schema with add struct in struct", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).AddColumn([]string{"address", "code"}, &iceberg.StructType{
+                       FieldList: []iceberg.NestedField{
+                               {ID: 5, Name: "code-1", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               {ID: 6, Name: "code-2", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       },
+               }, "", false, nil).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+                       {ID: 4, Name: "address", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 12, Name: "code", Type: 
&iceberg.StructType{
+                                               FieldList: 
[]iceberg.NestedField{
+                                                       {ID: 13, Name: 
"code-1", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                                       {ID: 14, Name: 
"code-2", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                               },
+                                       }, Required: false, Doc: ""},
+                               },
+                       }},
+                       {ID: 7, Name: "tags", Type: &iceberg.ListType{
+                               ElementID:       8,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 9, Name: "properties", Type: &iceberg.MapType{
+                               KeyID:         10,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       11,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+               }, newSchema.Fields())
+       })
+
+       t.Run("test update schema with multiple adds", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).AddColumn([]string{"address", "code"}, &iceberg.StructType{
+                       FieldList: []iceberg.NestedField{
+                               {ID: 5, Name: "code-1", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               {ID: 6, Name: "code-2", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       },
+               }, "", false, nil).AddColumn([]string{"gender"}, 
iceberg.PrimitiveTypes.String, "", false, nil).AddColumn([]string{"files"}, 
&iceberg.ListType{
+                       Element:         iceberg.PrimitiveTypes.String,
+                       ElementRequired: false,
+               }, "", false, nil).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+                       {ID: 4, Name: "address", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 12, Name: "code", Type: 
&iceberg.StructType{
+                                               FieldList: 
[]iceberg.NestedField{
+                                                       {ID: 13, Name: 
"code-1", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                                       {ID: 14, Name: 
"code-2", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                               },
+                                       }, Required: false, Doc: ""},
+                               },
+                       }, Required: false, Doc: ""},
+                       {ID: 7, Name: "tags", Type: &iceberg.ListType{
+                               ElementID:       8,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 9, Name: "properties", Type: &iceberg.MapType{
+                               KeyID:         10,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       11,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 15, Name: "gender", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       {ID: 16, Name: "files", Type: &iceberg.ListType{
+                               ElementID:       17,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }},
+               }, newSchema.Fields())
+       })
+}
+
+func TestApplyChanges(t *testing.T) {
+       t.Run("test apply changes on schema", func(t *testing.T) {
+               deletes := map[int]struct{}{
+                       2: {},
+               }
+               updates := map[int]iceberg.NestedField{
+                       3: {Name: "age", Type: iceberg.PrimitiveTypes.Int64, 
Required: true, Doc: ""},
+               }
+               adds := map[int][]iceberg.NestedField{
+                       -1: {
+                               {ID: 12, Name: "gender", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       },
+               }
+               moves := map[int][]move{
+                       4: {
+                               {FieldID: 6, RelativeTo: 5, Op: MoveOpBefore},
+                       },
+               }
+
+               st, err := iceberg.Visit(originalSchema, &applyChanges{
+                       deletes: deletes,
+                       updates: updates,
+                       adds:    adds,
+                       moves:   moves,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, st)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int64, Required: true, Doc: ""},
+                       {ID: 4, Name: "address", Type: &iceberg.StructType{
+                               FieldList: []iceberg.NestedField{
+                                       {ID: 6, Name: "zip", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                                       {ID: 5, Name: "city", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                               },
+                       }, Required: false, Doc: ""},
+                       {ID: 7, Name: "tags", Type: &iceberg.ListType{
+                               ElementID:       8,
+                               Element:         iceberg.PrimitiveTypes.String,
+                               ElementRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 9, Name: "properties", Type: &iceberg.MapType{
+                               KeyID:         10,
+                               KeyType:       iceberg.PrimitiveTypes.String,
+                               ValueID:       11,
+                               ValueType:     iceberg.PrimitiveTypes.String,
+                               ValueRequired: false,
+                       }, Required: false, Doc: ""},
+                       {ID: 12, Name: "gender", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+               }, st.(*iceberg.StructType).Fields())
+       })
+
+       t.Run("test apply changes on add field that delete in same time", 
func(t *testing.T) {
+               originalSchema := iceberg.NewSchema(1,
+                       iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: true, Doc: ""},
+                       iceberg.NestedField{ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       iceberg.NestedField{ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+               )
+               deletes := map[int]struct{}{
+                       2: {},
+               }
+               adds := map[int][]iceberg.NestedField{
+                       -1: {
+                               {ID: 4, Name: "name", Type: 
iceberg.PrimitiveTypes.UUID, Required: false, Doc: ""},
+                       },
+               }
+
+               st, err := iceberg.Visit(originalSchema, &applyChanges{
+                       deletes: deletes,
+                       adds:    adds,
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, st)
+
+               assert.Equal(t, []iceberg.NestedField{
+                       {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32, 
Required: true, Doc: ""},
+                       {ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+                       {ID: 4, Name: "name", Type: 
iceberg.PrimitiveTypes.UUID, Required: false, Doc: ""},
+               }, st.(*iceberg.StructType).Fields())
+       })
+}
+
+func TestDeleteColumn(t *testing.T) {
+       t.Run("test delete top level column", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).DeleteColumn([]string{"name"}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               fields := newSchema.Fields()
+               assert.Len(t, fields, 5)
+
+               fieldNames := make([]string, len(fields))
+               for i, field := range fields {
+                       fieldNames[i] = field.Name
+               }
+               assert.Contains(t, fieldNames, "id")
+               assert.Contains(t, fieldNames, "age")
+               assert.Contains(t, fieldNames, "address")
+               assert.NotContains(t, fieldNames, "name")
+       })
+
+       t.Run("test delete nested column", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).DeleteColumn([]string{"address", "city"}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               addressField, ok := newSchema.FindFieldByName("address")
+               assert.True(t, ok)
+
+               structType, ok := addressField.Type.(*iceberg.StructType)
+               assert.True(t, ok)
+               assert.Len(t, structType.Fields(), 1)
+               assert.Equal(t, "zip", structType.Fields()[0].Name)
+       })
+
+       t.Run("test delete non-existent column", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
true).DeleteColumn([]string{"non_existent"}).Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "field not found")
+       })
+}
+
+func TestUpdateColumn(t *testing.T) {
+       t.Run("test update column type", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).UpdateColumn([]string{"age"}, ColumnUpdate{
+                       FieldType: iceberg.Optional[iceberg.Type]{Valid: true, 
Val: iceberg.PrimitiveTypes.Int64},
+               }).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               ageField, ok := newSchema.FindFieldByName("age")
+               assert.True(t, ok)
+               assert.Equal(t, iceberg.PrimitiveTypes.Int64, ageField.Type)
+       })
+
+       t.Run("test update column required", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).UpdateColumn([]string{"name"}, ColumnUpdate{
+                       Required: iceberg.Optional[bool]{Valid: true, Val: 
true},
+               }).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               nameField, ok := newSchema.FindFieldByName("name")
+               assert.True(t, ok)
+               assert.True(t, nameField.Required)
+       })
+
+       t.Run("test update column doc", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).UpdateColumn([]string{"age"}, ColumnUpdate{
+                       Doc: iceberg.Optional[string]{Valid: true, Val: "User's 
age in years"},
+               }).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               ageField, ok := newSchema.FindFieldByName("age")
+               assert.True(t, ok)
+               assert.Equal(t, "User's age in years", ageField.Doc)
+       })
+
+       t.Run("test update non-existent column", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
true).UpdateColumn([]string{"non_existent"}, ColumnUpdate{
+                       Doc: iceberg.Optional[string]{Valid: true, Val: "test"},
+               }).Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "field not found")
+       })
+}
+
+func TestRenameColumn(t *testing.T) {
+       t.Run("test rename top level column", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).RenameColumn([]string{"name"}, "full_name").Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               _, ok := newSchema.FindFieldByName("name")
+               assert.False(t, ok)
+
+               field, ok := newSchema.FindFieldByName("full_name")
+               assert.True(t, ok)
+               assert.Equal(t, iceberg.PrimitiveTypes.String, field.Type)
+       })
+
+       t.Run("test rename nested column", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).RenameColumn([]string{"address", "city"}, "city_name").Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               addressField, ok := newSchema.FindFieldByName("address")
+               assert.True(t, ok)
+
+               structType, ok := addressField.Type.(*iceberg.StructType)
+               assert.True(t, ok)
+
+               fieldNames := make([]string, len(structType.Fields()))
+               for i, field := range structType.Fields() {
+                       fieldNames[i] = field.Name
+               }
+               assert.Contains(t, fieldNames, "city_name")
+               assert.NotContains(t, fieldNames, "city")
+       })
+
+       t.Run("test rename to existing name", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
true).RenameColumn([]string{"name"}, "age").Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "field already exists")
+       })
+}
+
+func TestMoveColumn(t *testing.T) {
+       t.Run("test move column to first", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).MoveFirst([]string{"age"}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               fields := newSchema.Fields()
+               assert.Equal(t, "age", fields[0].Name)
+               assert.Equal(t, "id", fields[1].Name)
+       })
+
+       t.Run("test move column before", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).MoveBefore([]string{"age"}, []string{"name"}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               fields := newSchema.Fields()
+               fieldNames := make([]string, len(fields))
+               for i, field := range fields {
+                       fieldNames[i] = field.Name
+               }
+
+               ageIndex := -1
+               nameIndex := -1
+               for i, name := range fieldNames {
+                       if name == "age" {
+                               ageIndex = i
+                       }
+                       if name == "name" {
+                               nameIndex = i
+                       }
+               }
+
+               assert.True(t, ageIndex < nameIndex, "age should come before 
name")
+       })
+
+       t.Run("test move column after", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).MoveAfter([]string{"name"}, []string{"age"}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               fields := newSchema.Fields()
+               fieldNames := make([]string, len(fields))
+               for i, field := range fields {
+                       fieldNames[i] = field.Name
+               }
+
+               ageIndex := -1
+               nameIndex := -1
+               for i, name := range fieldNames {
+                       if name == "age" {
+                               ageIndex = i
+                       }
+                       if name == "name" {
+                               nameIndex = i
+                       }
+               }
+
+               assert.True(t, nameIndex > ageIndex, "name should come after 
age")
+       })
+
+       t.Run("test move non-existent column", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
true).MoveFirst([]string{"non_existent"}).Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "field not found")
+       })
+}
+
+func TestChainedOperations(t *testing.T) {
+       t.Run("test multiple operations in chain", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, true).
+                       AddColumn([]string{"email"}, 
iceberg.PrimitiveTypes.String, "Email address", false, nil).
+                       RenameColumn([]string{"name"}, "full_name").
+                       UpdateColumn([]string{"age"}, ColumnUpdate{
+                               Required: iceberg.Optional[bool]{Valid: true, 
Val: true},
+                       }).
+                       MoveFirst([]string{"email"}).
+                       DeleteColumn([]string{"tags"}).
+                       Apply()
+
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               fields := newSchema.Fields()
+               assert.Len(t, fields, 6)
+
+               assert.Equal(t, "email", fields[0].Name)
+
+               _, ok := newSchema.FindFieldByName("name")
+               assert.False(t, ok)
+               _, ok = newSchema.FindFieldByName("full_name")
+               assert.True(t, ok)
+
+               ageField, ok := newSchema.FindFieldByName("age")
+               assert.True(t, ok)
+               assert.True(t, ageField.Required)
+
+               _, ok = newSchema.FindFieldByName("tags")
+               assert.False(t, ok)
+       })
+}
+
+func TestSetIdentifierField(t *testing.T) {
+       t.Run("test set identifier field with single top-level field", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               // Test that SetIdentifierField returns the same UpdateSchema 
instance
+               updateSchema := NewUpdateSchema(txn, true, true)
+               updatedSchema := 
updateSchema.SetIdentifierField([][]string{{"id"}})
+               assert.Equal(t, updateSchema, updatedSchema) // Should return 
the same instance
+
+               newSchema, err := updatedSchema.Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               // Check that the schema has identifier field IDs set
+               assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+               if len(newSchema.IdentifierFieldIDs) > 0 {
+                       assert.Equal(t, 1, newSchema.IdentifierFieldIDs[0]) // 
id field has ID 1
+               }
+       })
+
+       t.Run("test set identifier field with multiple fields", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).SetIdentifierField([][]string{{"id"}, {"name"}}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               // Check that the schema has identifier field IDs set
+               assert.Len(t, newSchema.IdentifierFieldIDs, 2)
+               assert.Contains(t, newSchema.IdentifierFieldIDs, 1) // id field
+               assert.Contains(t, newSchema.IdentifierFieldIDs, 2) // name 
field
+       })
+
+       t.Run("test set identifier field with case sensitive matching", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
true).SetIdentifierField([][]string{{"ID"}}).Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "identifier field not found: 
ID")
+       })
+
+       t.Run("test set identifier field with case insensitive matching", 
func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, false, 
true).SetIdentifierField([][]string{{"ID"}}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               // Check that the schema has identifier field IDs set
+               assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+               assert.Equal(t, 1, newSchema.IdentifierFieldIDs[0]) // id field 
(case insensitive match)
+       })
+
+       t.Run("test set identifier field with non-existent field", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
true).SetIdentifierField([][]string{{"non_existent"}}).Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "identifier field not found: 
non_existent")
+       })
+
+       t.Run("test set identifier field with empty paths", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).SetIdentifierField([][]string{}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               // Check that the schema has no identifier field IDs
+               assert.Len(t, newSchema.IdentifierFieldIDs, 0)
+       })
+
+       t.Run("test set identifier field chained with other operations", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, true).
+                       AddColumn([]string{"email"}, 
iceberg.PrimitiveTypes.String, "", false, nil).
+                       SetIdentifierField([][]string{{"id"}, {"email"}}).
+                       Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               // Check that the schema has identifier field IDs set
+               assert.Len(t, newSchema.IdentifierFieldIDs, 2)
+               assert.Contains(t, newSchema.IdentifierFieldIDs, 1)  // id field
+               assert.Contains(t, newSchema.IdentifierFieldIDs, 12) // email 
field (newly added)
+       })
+
+       t.Run("test set identifier field with duplicate field paths", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               newSchema, err := NewUpdateSchema(txn, true, 
true).SetIdentifierField([][]string{{"id"}, {"id"}}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               // Check that the schema has identifier field IDs set 
(duplicates should be deduplicated)
+               assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+               assert.Equal(t, 1, newSchema.IdentifierFieldIDs[0]) // id field
+       })
+
+       t.Run("test set identifier field replaces existing identifier fields", 
func(t *testing.T) {
+               // Create a schema with existing identifier fields
+               schemaWithIdentifiers := iceberg.NewSchemaWithIdentifiers(1, 
[]int{1}, // id is initially an identifier
+                       iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: true, Doc: ""},
+                       iceberg.NestedField{ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+                       iceberg.NestedField{ID: 3, Name: "age", Type: 
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+               )
+               metadata, _ := NewMetadata(schemaWithIdentifiers, nil, 
UnsortedSortOrder, "", nil)
+               table := New([]string{"id"}, metadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               // Set identifier fields to name instead of id
+               newSchema, err := NewUpdateSchema(txn, true, 
true).SetIdentifierField([][]string{{"name"}}).Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               // Check that only name is now an identifier field
+               assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+               assert.Equal(t, 2, newSchema.IdentifierFieldIDs[0]) // name 
field has ID 2
+       })
+
+       t.Run("test set identifier field multiple times", func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               // Set identifier fields multiple times - last one should win
+               newSchema, err := NewUpdateSchema(txn, true, true).
+                       SetIdentifierField([][]string{{"id"}}).
+                       SetIdentifierField([][]string{{"name"}}).
+                       Apply()
+               assert.NoError(t, err)
+               assert.NotNil(t, newSchema)
+
+               // Check that only the last SetIdentifierField call is applied
+               assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+               assert.Equal(t, 2, newSchema.IdentifierFieldIDs[0]) // name 
field has ID 2
+       })
+}
+
+func TestErrorHandling(t *testing.T) {
+       t.Run("test incompatible changes without allowIncompatibleChanges", 
func(t *testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
false).UpdateColumn([]string{"name"}, ColumnUpdate{
+                       Required: iceberg.Optional[bool]{Valid: true, Val: 
true},
+               }).Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "cannot change column 
nullability from optional to required")
+       })
+
+       t.Run("test add required field without default value", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
false).AddColumn([]string{"required_field"}, iceberg.PrimitiveTypes.String, "", 
true, nil).Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "required field required_field 
has no default value")
+       })
+
+       t.Run("test add field with incompatible default value", func(t 
*testing.T) {
+               table := New([]string{"id"}, testMetadata, "", nil, nil)
+               txn := table.NewTransaction()
+
+               _, err := NewUpdateSchema(txn, true, 
true).AddColumn([]string{"age_field"}, iceberg.PrimitiveTypes.String, "", 
false, iceberg.Int32Literal(25)).Apply()
+               assert.Error(t, err)
+               assert.Contains(t, err.Error(), "default value type mismatch")
+       })
+}

Reply via email to