This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 3e18bd43 feat(table): add schema evolution support (#596)
3e18bd43 is described below
commit 3e18bd4366819724f51d98e66ab8522c74cea12f
Author: Blue Li <[email protected]>
AuthorDate: Fri Oct 24 01:19:17 2025 +0800
feat(table): add schema evolution support (#596)
### Summary
This PR introduces initial schema evolution capabilities, allowing users
to modify table schemas through a fluent API. This work was originally
started by @Shreyas220 in PR #431 , but due to time constraints, I'm
continuing the implementation.
### Key Features
- **Add Columns**: Support for adding primitive types, structs, lists,
and maps
- **Delete Columns**: Remove fields from the schema
- **Update Columns**: Modify field properties including name, type,
nullability, and documentation
- **Move Columns**: Reorder fields with operations like `MoveFirst`,
`MoveBefore`, and `MoveAfter`
- **Rename Columns**: Change field names
- **Basic Validation**: Type checking and compatibility validation
### API Usage
```go
// Create a transaction and update schema
txn := table.NewTransaction()
updateSchema := txn.UpdateSchema(true, false) // caseSensitive=true,
allowIncompatibleChanges=false
// Add a new column
updateSchema.AddColumn([]string{"gender"}, iceberg.PrimitiveTypes.String,
"User gender", false, nil)
// Add nested column in struct
updateSchema.AddColumn([]string{"address", "country"},
iceberg.PrimitiveTypes.String, "Country code", false,
iceberg.StringLiteral("US"))
// Add complex nested structure
updateSchema.AddColumn([]string{"profile"}, &iceberg.StructType{
FieldList: []iceberg.NestedField{
{Name: "bio", Type: iceberg.PrimitiveTypes.String, Required: false},
{Name: "avatar", Type: iceberg.PrimitiveTypes.String, Required:
false},
},
}, "User profile", false, nil)
// Add list type
updateSchema.AddColumn([]string{"tags"}, &iceberg.ListType{
Element: iceberg.PrimitiveTypes.String,
ElementRequired: false,
}, "User tags", false, nil)
// Rename a column
updateSchema.RenameColumn([]string{"age"}, "user_age")
// Update column properties
updateSchema.UpdateColumn([]string{"name"}, ColumnUpdate{
Required: iceberg.Optional[bool]{Valid: true, Val: true},
Doc: iceberg.Optional[string]{Valid: true, Val: "User's full name"},
})
// Move column to first position
updateSchema.MoveFirst([]string{"id"})
// Move column before another column
updateSchema.MoveBefore([]string{"name"}, []string{"age"})
// Delete a column
updateSchema.DeleteColumn([]string{"old_field"})
// Chain multiple operations
updateSchema.
AddColumn([]string{"email"}, iceberg.PrimitiveTypes.String, "Email
address", true, nil).
RenameColumn([]string{"phone"}, "phone_number").
MoveAfter([]string{"email"}, []string{"name"}).
DeleteColumn([]string{"deprecated_field"})
// Preview changes without committing
newSchema, err := updateSchema.Apply()
if err != nil {
// Handle validation errors
return err
}
// Commit the changes
err := updateSchema.Commit()
if err != nil {
return err
}
// Commit the entire transaction
updatedTable, err := txn.Commit(ctx)
```
---
table/transaction.go | 17 +
table/update_schema.go | 944 ++++++++++++++++++++++++++++++++++++++++++++
table/update_schema_test.go | 881 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 1842 insertions(+)
diff --git a/table/transaction.go b/table/transaction.go
index 09f0fa91..d949df3a 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -147,6 +147,23 @@ func (t *Transaction) UpdateSpec(caseSensitive bool)
*UpdateSpec {
return NewUpdateSpec(t, caseSensitive)
}
+// UpdateSchema creates a new UpdateSchema instance for managing schema changes
+// within this transaction.
+//
+// Parameters:
+// - caseSensitive: If true, field name lookups are case-sensitive; if false,
+// field names are matched case-insensitively.
+// - allowIncompatibleChanges: If true, allows schema changes that would
normally
+// be rejected for being incompatible (e.g., adding required fields without
+// default values, changing field types in non-promotable ways, or changing
+// column nullability from optional to required).
+// - opts: Optional configuration functions to customize the UpdateSchema
behavior.
+//
+// Returns an UpdateSchema instance that can be used to build and apply schema
changes.
+func (t *Transaction) UpdateSchema(caseSensitive bool,
allowIncompatibleChanges bool, opts ...UpdateSchemaOption) *UpdateSchema {
+ return NewUpdateSchema(t, caseSensitive, allowIncompatibleChanges,
opts...)
+}
+
type expireSnapshotsCfg struct {
minSnapshotsToKeep *int
maxSnapshotAgeMs *int64
diff --git a/table/update_schema.go b/table/update_schema.go
new file mode 100644
index 00000000..0492f085
--- /dev/null
+++ b/table/update_schema.go
@@ -0,0 +1,944 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "errors"
+ "fmt"
+ "maps"
+ "slices"
+ "strings"
+
+ "github.com/apache/iceberg-go"
+)
+
+const TableRootID = -1
+
+type MoveOp string
+
+const (
+ MoveOpFirst MoveOp = "first"
+ MoveOpBefore MoveOp = "before"
+ MoveOpAfter MoveOp = "after"
+)
+
+type move struct {
+ FieldID int
+ RelativeTo int
+ Op MoveOp
+}
+
+// UpdateSchema manages schema evolution operations within a transaction.
+// It supports adding, deleting, renaming, updating, and reordering columns,
+// and ensures all changes are validated before being committed.
+//
+// Operations can be chained together and are applied in the order they are
called.
+// Changes are not persisted until Commit() is called.
+//
+// Basic Usage:
+//
+// txn := table.NewTransaction()
+// updateSchema := txn.UpdateSchema(true, false)
+//
+// // Add a new column
+// updateSchema.AddColumn([]string{"email"},
iceberg.PrimitiveTypes.String, "Email address", false, nil)
+//
+// // Commit changes
+// if err := updateSchema.Commit(); err != nil {
+// return err
+// }
+// if _, err := txn.Commit(ctx); err != nil {
+// return err
+// }
+//
+// Chaining Operations:
+//
+// updateSchema.
+// AddColumn([]string{"age"}, iceberg.PrimitiveTypes.Int, "User age",
false, nil).
+// RenameColumn([]string{"name"}, "full_name").
+// MoveFirst([]string{"id"}).
+// Commit()
+//
+// Adding Nested Columns:
+//
+// // Add a column to a struct field
+// updateSchema.AddColumn([]string{"address", "country"},
iceberg.PrimitiveTypes.String, "Country code", false,
iceberg.StringLiteral("US"))
+//
+// // Commit the schema update
+// if err := updateSchema.Commit(); err != nil {
+// return err
+// }
+// if _, err := txn.Commit(ctx); err != nil {
+// return err
+// }
+type UpdateSchema struct {
+ txn *Transaction
+ schema *iceberg.Schema
+ lastColumnID int
+
+ deletes map[int]struct{}
+ updates map[int]map[int]iceberg.NestedField
+ adds map[int][]iceberg.NestedField
+ moves map[int][]move
+
+ identifierFieldNames map[string]struct{}
+ parentID map[int]int
+
+ addedNameToID map[string]int
+ allowIncompatibleChanges bool
+ caseSensitive bool
+ nameMapping iceberg.NameMapping
+ ops []func() error
+}
+
+// UpdateSchemaOption is a functional option for configuring UpdateSchema.
+type UpdateSchemaOption func(*UpdateSchema)
+
+// WithNameMapping configures the UpdateSchema to use the provided name mapping
+// for tracking field name changes and ensuring consistency during schema
evolution.
+func WithNameMapping(nameMapping iceberg.NameMapping) UpdateSchemaOption {
+ return func(u *UpdateSchema) {
+ u.nameMapping = nameMapping
+ }
+}
+
+// NewUpdateSchema creates a new UpdateSchema instance for managing schema
changes
+// within a transaction.
+//
+// Parameters:
+// - txn: The transaction that this schema update will be applied to.
+// - caseSensitive: If true, field name lookups are case-sensitive; if false,
+// field names are matched case-insensitively.
+// - allowIncompatibleChanges: If true, allows schema changes that would
normally
+// be rejected for being incompatible (e.g., adding required fields without
+// default values, changing field types in non-promotable ways, or changing
+// column nullability from optional to required).
+// - opts: Optional configuration functions to customize the UpdateSchema
behavior.
+//
+// Returns an UpdateSchema instance that can be used to build and apply schema
changes.
+func NewUpdateSchema(txn *Transaction, caseSensitive bool,
allowIncompatibleChanges bool, opts ...UpdateSchemaOption) *UpdateSchema {
+ u := &UpdateSchema{
+ txn: txn,
+ schema: nil,
+ lastColumnID: txn.meta.CurrentSchema().HighestFieldID(),
+
+ deletes: make(map[int]struct{}),
+ updates: make(map[int]map[int]iceberg.NestedField),
+ adds: make(map[int][]iceberg.NestedField),
+ moves: make(map[int][]move),
+
+ identifierFieldNames: nil,
+ parentID: make(map[int]int),
+
+ addedNameToID: make(map[string]int),
+ allowIncompatibleChanges: allowIncompatibleChanges,
+ caseSensitive: caseSensitive,
+ nameMapping: nil,
+ ops: make([]func() error, 0),
+ }
+
+ for _, opt := range opts {
+ opt(u)
+ }
+
+ return u
+}
+
+func (u *UpdateSchema) init() error {
+ if u.txn == nil {
+ return errors.New("transaction is nil")
+ }
+ if u.txn.meta == nil {
+ return errors.New("transaction meta is nil")
+ }
+
+ u.schema = u.txn.meta.CurrentSchema()
+ if u.schema == nil {
+ return errors.New("current schema is nil")
+ }
+
+ if err := u.initIdentifierFieldNames(); err != nil {
+ return err
+ }
+
+ if err := u.initParentID(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (u *UpdateSchema) initIdentifierFieldNames() error {
+ if u.identifierFieldNames != nil {
+ return nil
+ }
+
+ identifierFieldNames := make(map[string]struct{})
+ for _, id := range u.schema.IdentifierFieldIDs {
+ name, ok := u.schema.FindColumnName(id)
+ if !ok {
+ return fmt.Errorf("identifier field %d not found", id)
+ }
+ identifierFieldNames[name] = struct{}{}
+ }
+
+ u.identifierFieldNames = identifierFieldNames
+
+ return nil
+}
+
+func (u *UpdateSchema) initParentID() error {
+ parents, err := iceberg.IndexParents(u.schema)
+ if err != nil {
+ return err
+ }
+
+ maps.Copy(u.parentID, parents)
+
+ return nil
+}
+
+func (u *UpdateSchema) assignNewColumnID() int {
+ u.lastColumnID++
+
+ return u.lastColumnID
+}
+
+func (u *UpdateSchema) findField(name string) (iceberg.NestedField, bool) {
+ if u.caseSensitive {
+ return u.schema.FindFieldByName(name)
+ } else {
+ return u.schema.FindFieldByNameCaseInsensitive(name)
+ }
+}
+
+func (u *UpdateSchema) isDeleted(fieldID int) bool {
+ _, ok := u.deletes[fieldID]
+
+ return ok
+}
+
+func (u *UpdateSchema) findParentID(fieldID int) int {
+ parentID, ok := u.parentID[fieldID]
+ if !ok {
+ return TableRootID
+ }
+
+ return parentID
+}
+
+func (u *UpdateSchema) AddColumn(path []string, fieldType iceberg.Type, doc
string, required bool, defaultValue iceberg.Literal) *UpdateSchema {
+ u.ops = append(u.ops, func() error {
+ return u.addColumn(path, fieldType, doc, required, defaultValue)
+ })
+
+ return u
+}
+
+func (u *UpdateSchema) addColumn(path []string, fieldType iceberg.Type, doc
string, required bool, defaultValue iceberg.Literal) error {
+ if len(path) == 0 {
+ return errors.New("path is empty")
+ }
+
+ fullName := strings.Join(path, ".")
+
+ switch t := fieldType.(type) {
+ case *iceberg.ListType, *iceberg.MapType, *iceberg.StructType:
+ if defaultValue != nil {
+ return fmt.Errorf("default values are not supported for
%s", t.String())
+ }
+ case iceberg.PrimitiveType:
+ if required && defaultValue == nil &&
!u.allowIncompatibleChanges {
+ return fmt.Errorf("required field %s has no default
value", fullName)
+ }
+ if defaultValue != nil && !defaultValue.Type().Equals(t) {
+ return fmt.Errorf("default value type mismatch: %s !=
%s", defaultValue.Type(), t)
+ }
+ default:
+ return fmt.Errorf("invalid field type: %T", t)
+ }
+
+ parent := path[:len(path)-1]
+ parentID := TableRootID
+
+ if len(parent) > 0 {
+ parentFullPath := strings.Join(parent, ".")
+ parentField, ok := u.findField(parentFullPath)
+ if !ok {
+ return fmt.Errorf("parent field not found: %s",
parentFullPath)
+ }
+
+ switch parentType := parentField.Type.(type) {
+ case *iceberg.ListType:
+ f := parentType.ElementField()
+ parentField = f
+ case *iceberg.MapType:
+ f := parentType.ValueField()
+ parentField = f
+ }
+
+ if _, ok := parentField.Type.(*iceberg.StructType); !ok {
+ return fmt.Errorf("cannot add field to non-struct type:
%s", parentFullPath)
+ }
+
+ parentID = parentField.ID
+ }
+
+ name := path[len(path)-1]
+ for _, add := range u.adds[parentID] {
+ if add.Name == name {
+ return fmt.Errorf("field already exists in adds: %s",
fullName)
+ }
+ }
+
+ // support add field with the same name as deleted field and renamed
field
+ if field, ok := u.findField(fullName); ok {
+ if !u.isDeleted(field.ID) {
+ for _, upd := range u.updates[parentID] {
+ if upd.Name == name {
+ return fmt.Errorf("field already
exists: %s", fullName)
+ }
+ }
+ }
+ }
+
+ field := iceberg.NestedField{
+ Name: name,
+ Type: fieldType,
+ Required: required,
+ Doc: doc,
+ }
+ if defaultValue != nil {
+ field.InitialDefault = defaultValue.Any()
+ field.WriteDefault = defaultValue.Any()
+ }
+
+ sch, err := iceberg.AssignFreshSchemaIDs(iceberg.NewSchema(0, field),
u.assignNewColumnID)
+ if err != nil {
+ return fmt.Errorf("failed to assign field id: %w", err)
+ }
+ u.adds[parentID] = append(u.adds[parentID], sch.Field(0))
+ u.addedNameToID[fullName] = sch.Field(0).ID
+
+ return nil
+}
+
+func (u *UpdateSchema) DeleteColumn(path []string) *UpdateSchema {
+ u.ops = append(u.ops, func() error {
+ return u.deleteColumn(path)
+ })
+
+ return u
+}
+
+func (u *UpdateSchema) deleteColumn(path []string) error {
+ fullName := strings.Join(path, ".")
+ field, ok := u.findField(fullName)
+ if !ok {
+ return fmt.Errorf("field not found: %s", fullName)
+ }
+
+ if _, ok := u.adds[field.ID]; ok {
+ return fmt.Errorf("field that has additions cannot be deleted:
%s", fullName)
+ }
+
+ if _, ok := u.updates[field.ID]; ok {
+ return fmt.Errorf("field that has updates cannot be deleted:
%s", fullName)
+ }
+
+ delete(u.identifierFieldNames, fullName)
+
+ u.deletes[field.ID] = struct{}{}
+
+ return nil
+}
+
+type ColumnUpdate struct {
+ Name iceberg.Optional[string]
+ FieldType iceberg.Optional[iceberg.Type]
+ Required iceberg.Optional[bool]
+ WriteDefault iceberg.Optional[iceberg.Literal]
+ Doc iceberg.Optional[string]
+}
+
+func (u *UpdateSchema) UpdateColumn(path []string, update ColumnUpdate)
*UpdateSchema {
+ u.ops = append(u.ops, func() error {
+ return u.updateColumn(path, update)
+ })
+
+ return u
+}
+
+func (u *UpdateSchema) updateColumn(path []string, update ColumnUpdate) error {
+ if !update.Name.Valid &&
+ !update.FieldType.Valid &&
+ !update.Required.Valid &&
+ !update.WriteDefault.Valid &&
+ !update.Doc.Valid {
+ return nil
+ }
+
+ fullName := strings.Join(path, ".")
+
+ field, ok := u.findField(fullName)
+ if !ok {
+ return fmt.Errorf("field not found: %s", fullName)
+ }
+
+ if u.isDeleted(field.ID) {
+ return fmt.Errorf("field that has been deleted cannot be
updated: %s", fullName)
+ }
+
+ parentID := u.findParentID(field.ID)
+
+ if update.Name.Valid {
+ if update.Name.Val == "" {
+ return fmt.Errorf("cannot rename field to empty name:
%s", fullName)
+ }
+ if field.Name == update.Name.Val {
+ return fmt.Errorf("cannot rename field to the same
name: %s", fullName)
+ }
+
+ newFullName := strings.Join(append(path[:len(path)-1],
update.Name.Val), ".")
+ if existingField, ok := u.findField(newFullName); ok {
+ if !u.isDeleted(existingField.ID) {
+ return fmt.Errorf("field already exists: %s",
newFullName)
+ }
+ }
+
+ for _, add := range u.adds[parentID] {
+ if add.Name == update.Name.Val {
+ return fmt.Errorf("cannot rename field to added
field: %s", newFullName)
+ }
+ }
+
+ for _, upd := range u.updates[parentID] {
+ if upd.Name == update.Name.Val && upd.ID != field.ID {
+ return fmt.Errorf("cannot rename field to
renamed field: %s", newFullName)
+ }
+ }
+
+ if _, ok := u.identifierFieldNames[fullName]; ok {
+ delete(u.identifierFieldNames, fullName)
+ u.identifierFieldNames[newFullName] = struct{}{}
+ }
+ }
+
+ if update.FieldType.Valid {
+ if _, ok := field.Type.(iceberg.PrimitiveType); !ok {
+ return fmt.Errorf("cannot update field type for
non-primitive type: %s", fullName)
+ }
+ if !update.FieldType.Val.Equals(field.Type) &&
!u.allowIncompatibleChanges {
+ fieldType, err := iceberg.PromoteType(field.Type,
update.FieldType.Val)
+ if err != nil {
+ return err
+ }
+ update.FieldType.Val = fieldType
+ }
+ }
+
+ if update.Required.Valid {
+ if field.Required != update.Required.Val {
+ if !u.allowIncompatibleChanges && update.Required.Val {
+ return fmt.Errorf("cannot change column
nullability from optional to required: %s", fullName)
+ }
+ }
+ }
+
+ if update.WriteDefault.Valid {
+ if update.WriteDefault.Val == nil {
+ if field.Required && !u.allowIncompatibleChanges {
+ return fmt.Errorf("cannot change default value
of required column to nil: %s", fullName)
+ }
+ }
+ }
+
+ if _, ok := u.updates[parentID]; !ok {
+ u.updates[parentID] = make(map[int]iceberg.NestedField)
+ }
+
+ updatedField, ok := u.updates[parentID][field.ID]
+ if !ok {
+ updatedField = field
+ }
+ if update.Name.Valid {
+ updatedField.Name = update.Name.Val
+ }
+ if update.FieldType.Valid {
+ updatedField.Type = update.FieldType.Val
+ }
+ if update.Required.Valid {
+ updatedField.Required = update.Required.Val
+ }
+ if update.WriteDefault.Valid {
+ updatedField.WriteDefault = update.WriteDefault.Val.Any()
+ }
+ if update.Doc.Valid {
+ updatedField.Doc = update.Doc.Val
+ }
+ u.updates[parentID][field.ID] = updatedField
+
+ return nil
+}
+
+func (u *UpdateSchema) RenameColumn(path []string, newName string)
*UpdateSchema {
+ u.ops = append(u.ops, func() error {
+ return u.updateColumn(path, ColumnUpdate{
+ Name: iceberg.Optional[string]{
+ Valid: true,
+ Val: newName,
+ },
+ })
+ })
+
+ return u
+}
+
+func (u *UpdateSchema) MoveColumn(op MoveOp, path, relativeTo []string)
*UpdateSchema {
+ u.ops = append(u.ops, func() error {
+ return u.moveColumn(op, path, relativeTo)
+ })
+
+ return u
+}
+
+func (u *UpdateSchema) MoveFirst(path []string) *UpdateSchema {
+ u.ops = append(u.ops, func() error {
+ return u.moveColumn(MoveOpFirst, path, nil)
+ })
+
+ return u
+}
+
+func (u *UpdateSchema) MoveBefore(path, relativeTo []string) *UpdateSchema {
+ u.ops = append(u.ops, func() error {
+ return u.moveColumn(MoveOpBefore, path, relativeTo)
+ })
+
+ return u
+}
+
+func (u *UpdateSchema) MoveAfter(path, relativeTo []string) *UpdateSchema {
+ u.ops = append(u.ops, func() error {
+ return u.moveColumn(MoveOpAfter, path, relativeTo)
+ })
+
+ return u
+}
+
+func (u *UpdateSchema) findFieldForMove(name string) (int, bool) {
+ field, ok := u.findField(name)
+ if ok {
+ return field.ID, true
+ }
+ id, ok := u.addedNameToID[name]
+
+ return id, ok
+}
+
+func (u *UpdateSchema) moveColumn(op MoveOp, path []string, relativeTo
[]string) error {
+ fullName := strings.Join(path, ".")
+ fieldID, ok := u.findFieldForMove(fullName)
+ if !ok {
+ return fmt.Errorf("field not found: %s", fullName)
+ }
+
+ if u.isDeleted(fieldID) {
+ return fmt.Errorf("field that has been deleted cannot be moved:
%s", fullName)
+ }
+
+ parentID := u.findParentID(fieldID)
+
+ switch op {
+ case MoveOpFirst:
+ u.moves[parentID] = append(u.moves[parentID], move{
+ FieldID: fieldID,
+ RelativeTo: -1,
+ Op: op,
+ })
+
+ return nil
+ case MoveOpBefore, MoveOpAfter:
+ relativeToFullName := strings.Join(relativeTo, ".")
+ relativeToFieldID, ok := u.findFieldForMove(relativeToFullName)
+ if !ok {
+ return fmt.Errorf("relative to field not found: %s",
relativeToFullName)
+ }
+
+ if relativeToFieldID == fieldID {
+ return fmt.Errorf("cannot move a field to itself: %s",
fullName)
+ }
+
+ if u.findParentID(relativeToFieldID) != parentID {
+ return fmt.Errorf("relative to field is not a child of
the parent: %s", relativeToFullName)
+ }
+ u.moves[parentID] = append(u.moves[parentID], move{
+ FieldID: fieldID,
+ RelativeTo: relativeToFieldID,
+ Op: op,
+ })
+
+ return nil
+ default:
+
+ return fmt.Errorf("invalid move operation: %s", op)
+ }
+}
+
+func (u *UpdateSchema) SetIdentifierField(paths [][]string) *UpdateSchema {
+ identifierFieldNames := make(map[string]struct{})
+ for _, path := range paths {
+ identifierFieldNames[strings.Join(path, ".")] = struct{}{}
+ }
+ u.identifierFieldNames = identifierFieldNames
+
+ return u
+}
+
+func (u *UpdateSchema) BuildUpdates() ([]Update, []Requirement, error) {
+ newSchema, err := u.Apply()
+ if err != nil {
+ return nil, nil, err
+ }
+
+ existingSchemaID := -1
+ for _, schema := range u.txn.meta.schemaList {
+ if newSchema.Equals(schema) {
+ existingSchemaID = schema.ID
+
+ break
+ }
+ }
+
+ requirements := make([]Requirement, 0)
+ updates := make([]Update, 0)
+
+ if existingSchemaID != u.schema.ID {
+ requirements = append(requirements,
AssertCurrentSchemaID(u.schema.ID))
+ if existingSchemaID == -1 {
+ updates = append(
+ updates,
+ NewAddSchemaUpdate(newSchema),
+ NewSetCurrentSchemaUpdate(newSchema.ID),
+ )
+ } else {
+ updates = append(updates,
+ NewSetCurrentSchemaUpdate(newSchema.ID),
+ )
+ }
+
+ if u.nameMapping != nil {
+ updatesMap := make(map[int]iceberg.NestedField)
+ for _, upds := range u.updates {
+ maps.Copy(updatesMap, upds)
+ }
+ updatedNameMapping, err :=
iceberg.UpdateNameMapping(u.nameMapping, updatesMap, u.adds)
+ if err != nil {
+ return nil, nil, err
+ }
+ updates = append(updates,
NewSetPropertiesUpdate(iceberg.Properties{
+ DefaultNameMappingKey:
updatedNameMapping.String(),
+ }))
+ }
+ }
+
+ return updates, requirements, nil
+}
+
+func (u *UpdateSchema) Apply() (*iceberg.Schema, error) {
+ if err := u.init(); err != nil {
+ return nil, err
+ }
+
+ for _, op := range u.ops {
+ if err := op(); err != nil {
+ return nil, err
+ }
+ }
+
+ updates := make(map[int]iceberg.NestedField)
+ for _, upds := range u.updates {
+ maps.Copy(updates, upds)
+ }
+ st, err := iceberg.Visit(u.schema, &applyChanges{
+ adds: u.adds,
+ updates: updates,
+ deletes: u.deletes,
+ moves: u.moves,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("error applying schema changes: %w", err)
+ }
+
+ identifierFieldIDs := make([]int, 0)
+ newSchema := iceberg.NewSchema(0, st.(*iceberg.StructType).FieldList...)
+ for name := range u.identifierFieldNames {
+ var field iceberg.NestedField
+ var ok bool
+ if u.caseSensitive {
+ field, ok = newSchema.FindFieldByName(name)
+ } else {
+ field, ok =
newSchema.FindFieldByNameCaseInsensitive(name)
+ }
+ if !ok {
+ return nil, fmt.Errorf("identifier field not found:
%s", name)
+ }
+ identifierFieldIDs = append(identifierFieldIDs, field.ID)
+ }
+
+ nextSchemaID := 1
+ if len(u.txn.meta.schemaList) > 0 {
+ nextSchemaID = 1 + slices.MaxFunc(u.txn.meta.schemaList,
func(a, b *iceberg.Schema) int {
+ return a.ID - b.ID
+ }).ID
+ }
+
+ return iceberg.NewSchemaWithIdentifiers(nextSchemaID,
identifierFieldIDs, st.(*iceberg.StructType).FieldList...), nil
+}
+
+func (u *UpdateSchema) Commit() error {
+ updates, requirements, err := u.BuildUpdates()
+ if err != nil {
+ return err
+ }
+ if len(updates) == 0 {
+ return nil
+ }
+
+ return u.txn.apply(updates, requirements)
+}
+
+type applyChanges struct {
+ adds map[int][]iceberg.NestedField
+ updates map[int]iceberg.NestedField
+ deletes map[int]struct{}
+ moves map[int][]move
+}
+
+func (a *applyChanges) Schema(schema *iceberg.Schema, structResult
iceberg.Type) iceberg.Type {
+ added := a.adds[TableRootID]
+ moves := a.moves[TableRootID]
+
+ if len(added) > 0 || len(moves) > 0 {
+ if newFields :=
addAndMoveFields(structResult.(*iceberg.StructType).Fields(), added, moves);
newFields != nil {
+ return &iceberg.StructType{FieldList: newFields}
+ }
+ }
+
+ return structResult
+}
+
+func (a *applyChanges) Struct(structType iceberg.StructType, fieldResults
[]iceberg.Type) iceberg.Type {
+ hasChanges := false
+ newFields := make([]iceberg.NestedField, 0)
+
+ for i, resultType := range fieldResults {
+ if resultType == nil {
+ hasChanges = true
+
+ continue
+ }
+
+ field := structType.Fields()[i]
+
+ name := field.Name
+ doc := field.Doc
+ required := field.Required
+ writeDefault := field.WriteDefault
+
+ if update, ok := a.updates[field.ID]; ok {
+ name = update.Name
+ doc = update.Doc
+ required = update.Required
+ writeDefault = update.WriteDefault
+ }
+ if field.Name == name &&
+ field.Type.Equals(resultType) &&
+ field.Required == required &&
+ field.Doc == doc &&
+ field.WriteDefault == writeDefault {
+ newFields = append(newFields, field)
+ } else {
+ hasChanges = true
+ newFields = append(newFields, iceberg.NestedField{
+ ID: field.ID,
+ Name: name,
+ Type: resultType,
+ Required: required,
+ Doc: doc,
+ InitialDefault: field.InitialDefault,
+ WriteDefault: writeDefault,
+ })
+ }
+ }
+
+ if hasChanges {
+ return &iceberg.StructType{FieldList: newFields}
+ }
+
+ return &structType
+}
+
+func (a *applyChanges) Field(field iceberg.NestedField, fieldResult
iceberg.Type) iceberg.Type {
+ if _, ok := a.deletes[field.ID]; ok {
+ return nil
+ }
+
+ if update, ok := a.updates[field.ID]; ok &&
!field.Type.Equals(update.Type) {
+ return update.Type
+ }
+
+ st, ok := fieldResult.(*iceberg.StructType)
+ if !ok {
+ return fieldResult
+ }
+
+ added := a.adds[field.ID]
+ moves := a.moves[field.ID]
+ if len(added) > 0 || len(moves) > 0 {
+ newFields := addAndMoveFields(st.FieldList, added, moves)
+ if len(newFields) > 0 {
+ return &iceberg.StructType{FieldList: newFields}
+ }
+ }
+
+ return fieldResult
+}
+
+func (a *applyChanges) List(listType iceberg.ListType, elementResult
iceberg.Type) iceberg.Type {
+ elementType := a.Field(listType.ElementField(), elementResult)
+ if elementType == nil {
+ panic(fmt.Sprintf("cannot delete element type from list: %s",
elementResult))
+ }
+
+ return &iceberg.ListType{
+ ElementID: listType.ElementID,
+ Element: elementType,
+ ElementRequired: listType.ElementRequired,
+ }
+}
+
+func (a *applyChanges) Map(mapType iceberg.MapType, keyResult, valueResult
iceberg.Type) iceberg.Type {
+ keyID := mapType.KeyID
+ if _, ok := a.deletes[keyID]; ok {
+ panic(fmt.Errorf("cannot delete map keys: %s",
mapType.String()))
+ }
+
+ if _, ok := a.updates[keyID]; ok {
+ panic(fmt.Errorf("cannot update map keys: %s",
mapType.String()))
+ }
+
+ if _, ok := a.adds[keyID]; ok {
+ panic(fmt.Errorf("cannot add fields to map keys: %s",
mapType.String()))
+ }
+
+ if !mapType.KeyType.Equals(keyResult) {
+ panic(fmt.Errorf("cannot alter map keys: %s", mapType.String()))
+ }
+
+ valueField := mapType.ValueField()
+ valueType := a.Field(valueField, valueResult)
+
+ if valueType == nil {
+ panic(fmt.Errorf("cannot delete value type from map: %s",
mapType.String()))
+ }
+
+ return &iceberg.MapType{
+ KeyID: mapType.KeyID,
+ KeyType: mapType.KeyType,
+ ValueID: mapType.ValueID,
+ ValueType: valueType,
+ ValueRequired: mapType.ValueRequired,
+ }
+}
+
+func (a *applyChanges) Primitive(primitive iceberg.PrimitiveType) iceberg.Type
{
+ return primitive
+}
+
+func addFields(fields []iceberg.NestedField, adds []iceberg.NestedField)
[]iceberg.NestedField {
+ return append(fields, adds...)
+}
+
+func moveFields(fields []iceberg.NestedField, moves []move)
[]iceberg.NestedField {
+ reordered := slices.Clone(fields)
+ for _, move := range moves {
+ var fieldToMove iceberg.NestedField
+ var fieldIndex int
+ found := false
+ for i, field := range reordered {
+ if field.ID == move.FieldID {
+ fieldToMove = field
+ fieldIndex = i
+ found = true
+
+ break
+ }
+ }
+ if !found {
+ continue
+ }
+
+ reordered = append(reordered[:fieldIndex],
reordered[fieldIndex+1:]...)
+
+ switch move.Op {
+ case MoveOpFirst:
+ reordered = append([]iceberg.NestedField{fieldToMove},
reordered...)
+ case MoveOpBefore, MoveOpAfter:
+ var relativeIndex int
+ found = false
+ for i, field := range reordered {
+ if field.ID == move.RelativeTo {
+ relativeIndex = i
+ found = true
+
+ break
+ }
+ }
+ if !found {
+ continue
+ }
+
+ if move.Op == MoveOpBefore {
+ reordered = append(reordered[:relativeIndex],
append([]iceberg.NestedField{fieldToMove}, reordered[relativeIndex:]...)...)
+ } else {
+ reordered = append(reordered[:relativeIndex+1],
append([]iceberg.NestedField{fieldToMove}, reordered[relativeIndex+1:]...)...)
+ }
+ }
+ }
+
+ return reordered
+}
+
+func addAndMoveFields(fields []iceberg.NestedField, adds
[]iceberg.NestedField, moves []move) []iceberg.NestedField {
+ if len(adds) > 0 {
+ added := addFields(fields, adds)
+ if len(moves) > 0 {
+ return moveFields(added, moves)
+ } else {
+ return added
+ }
+ } else if len(moves) > 0 {
+ return moveFields(fields, moves)
+ }
+ if len(adds) == 0 {
+ return nil
+ }
+
+ return fields
+}
diff --git a/table/update_schema_test.go b/table/update_schema_test.go
new file mode 100644
index 00000000..8f35363d
--- /dev/null
+++ b/table/update_schema_test.go
@@ -0,0 +1,881 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/stretchr/testify/assert"
+)
+
+var originalSchema = iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true, Doc: ""},
+ iceberg.NestedField{ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ iceberg.NestedField{ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ iceberg.NestedField{ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ iceberg.NestedField{ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+)
+
+var testMetadata, _ = NewMetadata(originalSchema, nil, UnsortedSortOrder, "",
nil)
+
+func TestAddColumn(t *testing.T) {
+ t.Run("test update schema with add primitive type on top level", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).AddColumn([]string{"gender"}, iceberg.PrimitiveTypes.String, "", false,
iceberg.StringLiteral("male")).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ {ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ {ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 12, Name: "gender", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ }, newSchema.Fields())
+ })
+
+ t.Run("test update schema with add list of primitive type on top
level", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).AddColumn([]string{"files"}, &iceberg.ListType{
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, "", false, nil).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ {ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ {ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 12, Name: "files", Type: &iceberg.ListType{
+ ElementID: 13,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ }, newSchema.Fields())
+ })
+
+ t.Run("test update schema with add map of primitive type on top level",
func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).AddColumn([]string{"files"}, &iceberg.MapType{
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, "", false, nil).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ {ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ {ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 12, Name: "files", Type: &iceberg.MapType{
+ KeyID: 13,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 14,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ }, newSchema.Fields())
+ })
+
+ t.Run("test update schema with add struct type on top level", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).AddColumn([]string{"files"}, &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "id", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, "", false, nil).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ {ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ {ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 12, Name: "files", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 13, Name: "id", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 14, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ }, newSchema.Fields())
+ })
+
+ t.Run("test update schema with add primitive in struct", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).AddColumn([]string{"address", "code"}, iceberg.PrimitiveTypes.String, "",
false, nil).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ {ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 12, Name: "code", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ {ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ }, newSchema.Fields())
+ })
+
+ t.Run("test update schema with add struct in struct", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).AddColumn([]string{"address", "code"}, &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "code-1", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "code-2", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, "", false, nil).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ {ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 12, Name: "code", Type:
&iceberg.StructType{
+ FieldList:
[]iceberg.NestedField{
+ {ID: 13, Name:
"code-1", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 14, Name:
"code-2", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ },
+ }},
+ {ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ }, newSchema.Fields())
+ })
+
+ t.Run("test update schema with multiple adds", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).AddColumn([]string{"address", "code"}, &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "code-1", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "code-2", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, "", false, nil).AddColumn([]string{"gender"},
iceberg.PrimitiveTypes.String, "", false, nil).AddColumn([]string{"files"},
&iceberg.ListType{
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, "", false, nil).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ {ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 12, Name: "code", Type:
&iceberg.StructType{
+ FieldList:
[]iceberg.NestedField{
+ {ID: 13, Name:
"code-1", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 14, Name:
"code-2", Type: iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ {ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 15, Name: "gender", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 16, Name: "files", Type: &iceberg.ListType{
+ ElementID: 17,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }},
+ }, newSchema.Fields())
+ })
+}
+
+func TestApplyChanges(t *testing.T) {
+ t.Run("test apply changes on schema", func(t *testing.T) {
+ deletes := map[int]struct{}{
+ 2: {},
+ }
+ updates := map[int]iceberg.NestedField{
+ 3: {Name: "age", Type: iceberg.PrimitiveTypes.Int64,
Required: true, Doc: ""},
+ }
+ adds := map[int][]iceberg.NestedField{
+ -1: {
+ {ID: 12, Name: "gender", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }
+ moves := map[int][]move{
+ 4: {
+ {FieldID: 6, RelativeTo: 5, Op: MoveOpBefore},
+ },
+ }
+
+ st, err := iceberg.Visit(originalSchema, &applyChanges{
+ deletes: deletes,
+ updates: updates,
+ adds: adds,
+ moves: moves,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, st)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int64, Required: true, Doc: ""},
+ {ID: 4, Name: "address", Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {ID: 6, Name: "zip", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ {ID: 5, Name: "city", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ },
+ }, Required: false, Doc: ""},
+ {ID: 7, Name: "tags", Type: &iceberg.ListType{
+ ElementID: 8,
+ Element: iceberg.PrimitiveTypes.String,
+ ElementRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 9, Name: "properties", Type: &iceberg.MapType{
+ KeyID: 10,
+ KeyType: iceberg.PrimitiveTypes.String,
+ ValueID: 11,
+ ValueType: iceberg.PrimitiveTypes.String,
+ ValueRequired: false,
+ }, Required: false, Doc: ""},
+ {ID: 12, Name: "gender", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ }, st.(*iceberg.StructType).Fields())
+ })
+
+ t.Run("test apply changes on add field that delete in same time",
func(t *testing.T) {
+ originalSchema := iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true, Doc: ""},
+ iceberg.NestedField{ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ )
+ deletes := map[int]struct{}{
+ 2: {},
+ }
+ adds := map[int][]iceberg.NestedField{
+ -1: {
+ {ID: 4, Name: "name", Type:
iceberg.PrimitiveTypes.UUID, Required: false, Doc: ""},
+ },
+ }
+
+ st, err := iceberg.Visit(originalSchema, &applyChanges{
+ deletes: deletes,
+ adds: adds,
+ })
+ assert.NoError(t, err)
+ assert.NotNil(t, st)
+
+ assert.Equal(t, []iceberg.NestedField{
+ {ID: 1, Name: "id", Type: iceberg.PrimitiveTypes.Int32,
Required: true, Doc: ""},
+ {ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ {ID: 4, Name: "name", Type:
iceberg.PrimitiveTypes.UUID, Required: false, Doc: ""},
+ }, st.(*iceberg.StructType).Fields())
+ })
+}
+
+func TestDeleteColumn(t *testing.T) {
+ t.Run("test delete top level column", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).DeleteColumn([]string{"name"}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ fields := newSchema.Fields()
+ assert.Len(t, fields, 5)
+
+ fieldNames := make([]string, len(fields))
+ for i, field := range fields {
+ fieldNames[i] = field.Name
+ }
+ assert.Contains(t, fieldNames, "id")
+ assert.Contains(t, fieldNames, "age")
+ assert.Contains(t, fieldNames, "address")
+ assert.NotContains(t, fieldNames, "name")
+ })
+
+ t.Run("test delete nested column", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).DeleteColumn([]string{"address", "city"}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ addressField, ok := newSchema.FindFieldByName("address")
+ assert.True(t, ok)
+
+ structType, ok := addressField.Type.(*iceberg.StructType)
+ assert.True(t, ok)
+ assert.Len(t, structType.Fields(), 1)
+ assert.Equal(t, "zip", structType.Fields()[0].Name)
+ })
+
+ t.Run("test delete non-existent column", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
true).DeleteColumn([]string{"non_existent"}).Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "field not found")
+ })
+}
+
+func TestUpdateColumn(t *testing.T) {
+ t.Run("test update column type", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).UpdateColumn([]string{"age"}, ColumnUpdate{
+ FieldType: iceberg.Optional[iceberg.Type]{Valid: true,
Val: iceberg.PrimitiveTypes.Int64},
+ }).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ ageField, ok := newSchema.FindFieldByName("age")
+ assert.True(t, ok)
+ assert.Equal(t, iceberg.PrimitiveTypes.Int64, ageField.Type)
+ })
+
+ t.Run("test update column required", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).UpdateColumn([]string{"name"}, ColumnUpdate{
+ Required: iceberg.Optional[bool]{Valid: true, Val:
true},
+ }).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ nameField, ok := newSchema.FindFieldByName("name")
+ assert.True(t, ok)
+ assert.True(t, nameField.Required)
+ })
+
+ t.Run("test update column doc", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).UpdateColumn([]string{"age"}, ColumnUpdate{
+ Doc: iceberg.Optional[string]{Valid: true, Val: "User's
age in years"},
+ }).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ ageField, ok := newSchema.FindFieldByName("age")
+ assert.True(t, ok)
+ assert.Equal(t, "User's age in years", ageField.Doc)
+ })
+
+ t.Run("test update non-existent column", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
true).UpdateColumn([]string{"non_existent"}, ColumnUpdate{
+ Doc: iceberg.Optional[string]{Valid: true, Val: "test"},
+ }).Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "field not found")
+ })
+}
+
+func TestRenameColumn(t *testing.T) {
+ t.Run("test rename top level column", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).RenameColumn([]string{"name"}, "full_name").Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ _, ok := newSchema.FindFieldByName("name")
+ assert.False(t, ok)
+
+ field, ok := newSchema.FindFieldByName("full_name")
+ assert.True(t, ok)
+ assert.Equal(t, iceberg.PrimitiveTypes.String, field.Type)
+ })
+
+ t.Run("test rename nested column", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).RenameColumn([]string{"address", "city"}, "city_name").Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ addressField, ok := newSchema.FindFieldByName("address")
+ assert.True(t, ok)
+
+ structType, ok := addressField.Type.(*iceberg.StructType)
+ assert.True(t, ok)
+
+ fieldNames := make([]string, len(structType.Fields()))
+ for i, field := range structType.Fields() {
+ fieldNames[i] = field.Name
+ }
+ assert.Contains(t, fieldNames, "city_name")
+ assert.NotContains(t, fieldNames, "city")
+ })
+
+ t.Run("test rename to existing name", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
true).RenameColumn([]string{"name"}, "age").Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "field already exists")
+ })
+}
+
+func TestMoveColumn(t *testing.T) {
+ t.Run("test move column to first", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).MoveFirst([]string{"age"}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ fields := newSchema.Fields()
+ assert.Equal(t, "age", fields[0].Name)
+ assert.Equal(t, "id", fields[1].Name)
+ })
+
+ t.Run("test move column before", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).MoveBefore([]string{"age"}, []string{"name"}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ fields := newSchema.Fields()
+ fieldNames := make([]string, len(fields))
+ for i, field := range fields {
+ fieldNames[i] = field.Name
+ }
+
+ ageIndex := -1
+ nameIndex := -1
+ for i, name := range fieldNames {
+ if name == "age" {
+ ageIndex = i
+ }
+ if name == "name" {
+ nameIndex = i
+ }
+ }
+
+ assert.True(t, ageIndex < nameIndex, "age should come before
name")
+ })
+
+ t.Run("test move column after", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).MoveAfter([]string{"name"}, []string{"age"}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ fields := newSchema.Fields()
+ fieldNames := make([]string, len(fields))
+ for i, field := range fields {
+ fieldNames[i] = field.Name
+ }
+
+ ageIndex := -1
+ nameIndex := -1
+ for i, name := range fieldNames {
+ if name == "age" {
+ ageIndex = i
+ }
+ if name == "name" {
+ nameIndex = i
+ }
+ }
+
+ assert.True(t, nameIndex > ageIndex, "name should come after
age")
+ })
+
+ t.Run("test move non-existent column", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
true).MoveFirst([]string{"non_existent"}).Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "field not found")
+ })
+}
+
+func TestChainedOperations(t *testing.T) {
+ t.Run("test multiple operations in chain", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true, true).
+ AddColumn([]string{"email"},
iceberg.PrimitiveTypes.String, "Email address", false, nil).
+ RenameColumn([]string{"name"}, "full_name").
+ UpdateColumn([]string{"age"}, ColumnUpdate{
+ Required: iceberg.Optional[bool]{Valid: true,
Val: true},
+ }).
+ MoveFirst([]string{"email"}).
+ DeleteColumn([]string{"tags"}).
+ Apply()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ fields := newSchema.Fields()
+ assert.Len(t, fields, 6)
+
+ assert.Equal(t, "email", fields[0].Name)
+
+ _, ok := newSchema.FindFieldByName("name")
+ assert.False(t, ok)
+ _, ok = newSchema.FindFieldByName("full_name")
+ assert.True(t, ok)
+
+ ageField, ok := newSchema.FindFieldByName("age")
+ assert.True(t, ok)
+ assert.True(t, ageField.Required)
+
+ _, ok = newSchema.FindFieldByName("tags")
+ assert.False(t, ok)
+ })
+}
+
+func TestSetIdentifierField(t *testing.T) {
+ t.Run("test set identifier field with single top-level field", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ // Test that SetIdentifierField returns the same UpdateSchema
instance
+ updateSchema := NewUpdateSchema(txn, true, true)
+ updatedSchema :=
updateSchema.SetIdentifierField([][]string{{"id"}})
+ assert.Equal(t, updateSchema, updatedSchema) // Should return
the same instance
+
+ newSchema, err := updatedSchema.Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ // Check that the schema has identifier field IDs set
+ assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+ if len(newSchema.IdentifierFieldIDs) > 0 {
+ assert.Equal(t, 1, newSchema.IdentifierFieldIDs[0]) //
id field has ID 1
+ }
+ })
+
+ t.Run("test set identifier field with multiple fields", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).SetIdentifierField([][]string{{"id"}, {"name"}}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ // Check that the schema has identifier field IDs set
+ assert.Len(t, newSchema.IdentifierFieldIDs, 2)
+ assert.Contains(t, newSchema.IdentifierFieldIDs, 1) // id field
+ assert.Contains(t, newSchema.IdentifierFieldIDs, 2) // name
field
+ })
+
+ t.Run("test set identifier field with case sensitive matching", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
true).SetIdentifierField([][]string{{"ID"}}).Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "identifier field not found:
ID")
+ })
+
+ t.Run("test set identifier field with case insensitive matching",
func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, false,
true).SetIdentifierField([][]string{{"ID"}}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ // Check that the schema has identifier field IDs set
+ assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+ assert.Equal(t, 1, newSchema.IdentifierFieldIDs[0]) // id field
(case insensitive match)
+ })
+
+ t.Run("test set identifier field with non-existent field", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
true).SetIdentifierField([][]string{{"non_existent"}}).Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "identifier field not found:
non_existent")
+ })
+
+ t.Run("test set identifier field with empty paths", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).SetIdentifierField([][]string{}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ // Check that the schema has no identifier field IDs
+ assert.Len(t, newSchema.IdentifierFieldIDs, 0)
+ })
+
+ t.Run("test set identifier field chained with other operations", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true, true).
+ AddColumn([]string{"email"},
iceberg.PrimitiveTypes.String, "", false, nil).
+ SetIdentifierField([][]string{{"id"}, {"email"}}).
+ Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ // Check that the schema has identifier field IDs set
+ assert.Len(t, newSchema.IdentifierFieldIDs, 2)
+ assert.Contains(t, newSchema.IdentifierFieldIDs, 1) // id field
+ assert.Contains(t, newSchema.IdentifierFieldIDs, 12) // email
field (newly added)
+ })
+
+ t.Run("test set identifier field with duplicate field paths", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ newSchema, err := NewUpdateSchema(txn, true,
true).SetIdentifierField([][]string{{"id"}, {"id"}}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ // Check that the schema has identifier field IDs set
(duplicates should be deduplicated)
+ assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+ assert.Equal(t, 1, newSchema.IdentifierFieldIDs[0]) // id field
+ })
+
+ t.Run("test set identifier field replaces existing identifier fields",
func(t *testing.T) {
+ // Create a schema with existing identifier fields
+ schemaWithIdentifiers := iceberg.NewSchemaWithIdentifiers(1,
[]int{1}, // id is initially an identifier
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true, Doc: ""},
+ iceberg.NestedField{ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false, Doc: ""},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32, Required: false, Doc: ""},
+ )
+ metadata, _ := NewMetadata(schemaWithIdentifiers, nil,
UnsortedSortOrder, "", nil)
+ table := New([]string{"id"}, metadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ // Set identifier fields to name instead of id
+ newSchema, err := NewUpdateSchema(txn, true,
true).SetIdentifierField([][]string{{"name"}}).Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ // Check that only name is now an identifier field
+ assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+ assert.Equal(t, 2, newSchema.IdentifierFieldIDs[0]) // name
field has ID 2
+ })
+
+ t.Run("test set identifier field multiple times", func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ // Set identifier fields multiple times - last one should win
+ newSchema, err := NewUpdateSchema(txn, true, true).
+ SetIdentifierField([][]string{{"id"}}).
+ SetIdentifierField([][]string{{"name"}}).
+ Apply()
+ assert.NoError(t, err)
+ assert.NotNil(t, newSchema)
+
+ // Check that only the last SetIdentifierField call is applied
+ assert.Len(t, newSchema.IdentifierFieldIDs, 1)
+ assert.Equal(t, 2, newSchema.IdentifierFieldIDs[0]) // name
field has ID 2
+ })
+}
+
+func TestErrorHandling(t *testing.T) {
+ t.Run("test incompatible changes without allowIncompatibleChanges",
func(t *testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
false).UpdateColumn([]string{"name"}, ColumnUpdate{
+ Required: iceberg.Optional[bool]{Valid: true, Val:
true},
+ }).Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "cannot change column
nullability from optional to required")
+ })
+
+ t.Run("test add required field without default value", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
false).AddColumn([]string{"required_field"}, iceberg.PrimitiveTypes.String, "",
true, nil).Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "required field required_field
has no default value")
+ })
+
+ t.Run("test add field with incompatible default value", func(t
*testing.T) {
+ table := New([]string{"id"}, testMetadata, "", nil, nil)
+ txn := table.NewTransaction()
+
+ _, err := NewUpdateSchema(txn, true,
true).AddColumn([]string{"age_field"}, iceberg.PrimitiveTypes.String, "",
false, iceberg.Int32Literal(25)).Apply()
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "default value type mismatch")
+ })
+}