This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new dba25d84 feat(metadata): export UpdateTable & fixes (#580)
dba25d84 is described below

commit dba25d84620964be756187d381f8717f1535688f
Author: Tobias Pütz <[email protected]>
AuthorDate: Mon Oct 6 19:33:56 2025 +0200

    feat(metadata): export UpdateTable & fixes (#580)
    
    - exports UpdateTable so that outside packages can use it
    - adds metadata log maintenance to the builder, i.e. based on
    currentFileLocation, an entry is added to the metadata log & on Build,
    we trim the metadata log
    - ports remaining tests from iceberg-rust and implements fixes for them
---
 catalog/glue/schema_test.go             |   2 +-
 catalog/internal/utils.go               |  30 +-
 errors.go                               |   6 +-
 table/arrow_utils_internal_test.go      |   2 +-
 table/metadata.go                       | 101 +++++--
 table/metadata_builder_internal_test.go | 515 +++++++++++++++++++++++++++++++-
 table/metadata_internal_test.go         |  13 +-
 table/partitioned_fanout_writer_test.go |   2 +-
 table/table.go                          |   2 +-
 table/table_test.go                     |  15 +-
 table/time_travel_test.go               |   2 +-
 11 files changed, 591 insertions(+), 99 deletions(-)

diff --git a/catalog/glue/schema_test.go b/catalog/glue/schema_test.go
index 88412db0..04f5dd74 100644
--- a/catalog/glue/schema_test.go
+++ b/catalog/glue/schema_test.go
@@ -437,7 +437,7 @@ func TestSchemasToGlueColumns(t *testing.T) {
        metadata, err := table.NewMetadata(schemas[0], nil, table.SortOrder{}, 
"s3://example/path", nil)
        assert.NoError(t, err)
 
-       mb, err := table.MetadataBuilderFromBase(metadata)
+       mb, err := table.MetadataBuilderFromBase(metadata, "")
        assert.NoError(t, err)
 
        err = mb.AddSchema(schemas[1])
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index 2cbdc906..99f6bfa5 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -70,35 +70,7 @@ func WriteMetadata(ctx context.Context, metadata 
table.Metadata, loc string, pro
 }
 
 func UpdateTableMetadata(base table.Metadata, updates []table.Update, 
metadataLoc string) (table.Metadata, error) {
-       bldr, err := table.MetadataBuilderFromBase(base)
-       if err != nil {
-               return nil, err
-       }
-
-       for _, update := range updates {
-               if err := update.Apply(bldr); err != nil {
-                       return nil, err
-               }
-       }
-
-       if bldr.HasChanges() {
-               if metadataLoc != "" {
-                       maxMetadataLogEntries := max(1,
-                               base.Properties().GetInt(
-                                       table.MetadataPreviousVersionsMaxKey, 
table.MetadataPreviousVersionsMaxDefault))
-
-                       bldr.TrimMetadataLogs(maxMetadataLogEntries + 1).
-                               AppendMetadataLog(table.MetadataLogEntry{
-                                       MetadataFile: metadataLoc,
-                                       TimestampMs:  base.LastUpdatedMillis(),
-                               })
-               }
-               if base.LastUpdatedMillis() == bldr.LastUpdatedMS() {
-                       bldr.SetLastUpdatedMS()
-               }
-       }
-
-       return bldr.Build()
+       return table.UpdateTableMetadata(base, updates, metadataLoc)
 }
 
 func CreateStagedTable(ctx context.Context, catprops iceberg.Properties, 
nspropsFn GetNamespacePropsFn, ident table.Identifier, sc *iceberg.Schema, opts 
...catalog.CreateTableOpt) (table.StagedTable, error) {
diff --git a/errors.go b/errors.go
index 9ecc26d5..f1194fc0 100644
--- a/errors.go
+++ b/errors.go
@@ -17,12 +17,16 @@
 
 package iceberg
 
-import "errors"
+import (
+       "errors"
+       "fmt"
+)
 
 var (
        ErrInvalidTypeString       = errors.New("invalid type")
        ErrNotImplemented          = errors.New("not implemented")
        ErrInvalidArgument         = errors.New("invalid argument")
+       ErrInvalidFormatVersion    = fmt.Errorf("%w: invalid format version", 
ErrInvalidArgument)
        ErrInvalidSchema           = errors.New("invalid schema")
        ErrInvalidTransform        = errors.New("invalid transform syntax")
        ErrType                    = errors.New("type error")
diff --git a/table/arrow_utils_internal_test.go 
b/table/arrow_utils_internal_test.go
index 4305642e..d3229822 100644
--- a/table/arrow_utils_internal_test.go
+++ b/table/arrow_utils_internal_test.go
@@ -184,7 +184,7 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta 
iceberg.Properties, writeSt
 
        schema := tableMeta.CurrentSchema()
        if len(meta) > 0 {
-               bldr, err := MetadataBuilderFromBase(tableMeta)
+               bldr, err := MetadataBuilderFromBase(tableMeta, "")
                suite.Require().NoError(err)
                err = bldr.SetProperties(meta)
                suite.Require().NoError(err)
diff --git a/table/metadata.go b/table/metadata.go
index 2240549b..b41d40a5 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -175,6 +175,7 @@ type MetadataBuilder struct {
        defaultSortOrderID int
        refs               map[string]SnapshotRef
 
+       previousFileEntry *MetadataLogEntry
        // >v1 specific
        lastSequenceNumber *int64
        // update tracking
@@ -183,28 +184,41 @@ type MetadataBuilder struct {
        lastAddedSortOrderID *int
 }
 
-func NewMetadataBuilder() (*MetadataBuilder, error) {
+func NewMetadataBuilder(formatVersion int) (*MetadataBuilder, error) {
+       if formatVersion < 1 || formatVersion > supportedTableFormatVersion {
+               return nil, fmt.Errorf("%w: %d", 
iceberg.ErrInvalidFormatVersion, formatVersion)
+       }
+
        return &MetadataBuilder{
-               updates:       make([]Update, 0),
-               schemaList:    make([]*iceberg.Schema, 0),
-               specs:         make([]iceberg.PartitionSpec, 0),
-               props:         make(iceberg.Properties),
-               snapshotList:  make([]Snapshot, 0),
-               snapshotLog:   make([]SnapshotLogEntry, 0),
-               metadataLog:   make([]MetadataLogEntry, 0),
-               sortOrderList: make([]SortOrder, 0),
-               refs:          make(map[string]SnapshotRef),
+               updates:            make([]Update, 0),
+               schemaList:         make([]*iceberg.Schema, 0),
+               specs:              make([]iceberg.PartitionSpec, 0),
+               props:              make(iceberg.Properties),
+               snapshotList:       make([]Snapshot, 0),
+               snapshotLog:        make([]SnapshotLogEntry, 0),
+               metadataLog:        make([]MetadataLogEntry, 0),
+               sortOrderList:      make([]SortOrder, 0),
+               refs:               make(map[string]SnapshotRef),
+               currentSchemaID:    -1,
+               defaultSortOrderID: -1,
+               defaultSpecID:      -1,
+               lastColumnId:       -1,
+               formatVersion:      formatVersion,
        }, nil
 }
 
-func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+// MetadataBuilderFromBase creates a MetadataBuilder from an existing Metadata 
object.
+// currentFileLocation is the location where the current version of the 
metadata
+// file is stored. This is used to update the metadata log. If 
currentFileLocation is
+// empty, the metadata log will not be updated. This should only be used to 
stage-create tables.
+func MetadataBuilderFromBase(metadata Metadata, currentFileLocation string) 
(*MetadataBuilder, error) {
        b := &MetadataBuilder{}
        b.base = metadata
 
        b.formatVersion = metadata.Version()
        b.uuid = metadata.TableUUID()
        b.loc = metadata.Location()
-       b.lastUpdatedMS = metadata.LastUpdatedMillis()
+       b.lastUpdatedMS = 0
        b.lastColumnId = metadata.LastColumnID()
        b.schemaList = slices.Clone(metadata.Schemas())
        b.currentSchemaID = metadata.CurrentSchema().ID
@@ -229,6 +243,13 @@ func MetadataBuilderFromBase(metadata Metadata) 
(*MetadataBuilder, error) {
        b.snapshotLog = slices.Collect(metadata.SnapshotLogs())
        b.metadataLog = slices.Collect(metadata.PreviousFiles())
 
+       if currentFileLocation != "" {
+               b.previousFileEntry = &MetadataLogEntry{
+                       MetadataFile: currentFileLocation,
+                       TimestampMs:  metadata.LastUpdatedMillis(),
+               }
+       }
+
        return b, nil
 }
 
@@ -451,6 +472,7 @@ func (b *MetadataBuilder) AddSortOrder(sortOrder 
*SortOrder) error {
        }
 
        b.sortOrderList = append(sortOrders, *sortOrder)
+       b.lastAddedSortOrderID = &sortOrder.orderID
        b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder))
 
        return nil
@@ -474,8 +496,7 @@ func (b *MetadataBuilder) RemoveProperties(keys []string) 
error {
 }
 
 func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) error {
-       takeLast := currentSchemaID == -1
-       if takeLast {
+       if currentSchemaID == -1 {
                if b.lastAddedSchemaID == nil {
                        return errors.New("can't set current schema to last 
added schema, no schema has been added")
                }
@@ -491,7 +512,7 @@ func (b *MetadataBuilder) 
SetCurrentSchemaID(currentSchemaID int) error {
                return fmt.Errorf("can't set current schema to schema with id 
%d: %w", currentSchemaID, err)
        }
 
-       if takeLast {
+       if b.lastAddedSchemaID != nil && currentSchemaID == 
*b.lastAddedSchemaID {
                b.updates = append(b.updates, NewSetCurrentSchemaUpdate(-1))
        } else {
                b.updates = append(b.updates, 
NewSetCurrentSchemaUpdate(currentSchemaID))
@@ -503,14 +524,10 @@ func (b *MetadataBuilder) 
SetCurrentSchemaID(currentSchemaID int) error {
 
 func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) error {
        if defaultSortOrderID == -1 {
-               defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder) 
int {
-                       return s.OrderID()
-               })
-               if !slices.ContainsFunc(b.updates, func(u Update) bool {
-                       return u.Action() == UpdateAddSortOrder && 
u.(*addSortOrderUpdate).SortOrder.OrderID() == defaultSortOrderID
-               }) {
+               if b.lastAddedSortOrderID == nil {
                        return errors.New("can't set default sort order to last 
added with no added sort orders")
                }
+               defaultSortOrderID = *b.lastAddedSortOrderID
        }
 
        if defaultSortOrderID == b.defaultSortOrderID {
@@ -521,17 +538,20 @@ func (b *MetadataBuilder) 
SetDefaultSortOrderID(defaultSortOrderID int) error {
                return fmt.Errorf("can't set default sort order to sort order 
with id %d: %w", defaultSortOrderID, err)
        }
 
-       b.updates = append(b.updates, 
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+       if b.lastAddedSortOrderID != nil && defaultSortOrderID == 
*b.lastAddedSortOrderID {
+               b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(-1))
+       } else {
+               b.updates = append(b.updates, 
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+       }
+
        b.defaultSortOrderID = defaultSortOrderID
 
        return nil
 }
 
 func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) error {
-       lastUsed := false
        if defaultSpecID == -1 {
                if b.lastAddedPartitionID != nil {
-                       lastUsed = true
                        defaultSpecID = *b.lastAddedPartitionID
                } else {
                        return errors.New("can't set default spec to last added 
with no added partition specs")
@@ -546,7 +566,7 @@ func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID 
int) error {
                return fmt.Errorf("can't set default spec to spec with id %d: 
%w", defaultSpecID, err)
        }
 
-       if lastUsed {
+       if b.lastAddedPartitionID != nil && defaultSpecID == 
*b.lastAddedPartitionID {
                b.updates = append(b.updates, NewSetDefaultSpecUpdate(-1))
        } else {
                b.updates = append(b.updates, 
NewSetDefaultSpecUpdate(defaultSpecID))
@@ -563,7 +583,7 @@ func (b *MetadataBuilder) SetFormatVersion(formatVersion 
int) error {
        }
 
        if formatVersion > supportedTableFormatVersion {
-               return fmt.Errorf("unsupported format version %d", 
formatVersion)
+               return fmt.Errorf("%w: %d", iceberg.ErrInvalidFormatVersion, 
formatVersion)
        }
 
        if formatVersion == b.formatVersion {
@@ -757,6 +777,14 @@ func (b *MetadataBuilder) buildCommonMetadata() 
(*commonMetadata, error) {
                b.lastUpdatedMS = time.Now().UnixMilli()
        }
 
+       if b.previousFileEntry != nil && b.HasChanges() {
+               maxMetadataLogEntries := max(1,
+                       b.base.Properties().GetInt(
+                               MetadataPreviousVersionsMaxKey, 
MetadataPreviousVersionsMaxDefault))
+               b.AppendMetadataLog(*b.previousFileEntry)
+               b.TrimMetadataLogs(maxMetadataLogEntries)
+       }
+
        return &commonMetadata{
                FormatVersion:      b.formatVersion,
                UUID:               b.uuid,
@@ -1702,15 +1730,11 @@ func NewMetadataWithUUID(sc *iceberg.Schema, partitions 
*iceberg.PartitionSpec,
                return nil, err
        }
 
-       builder, err := NewMetadataBuilder()
+       builder, err := NewMetadataBuilder(formatVersion)
        if err != nil {
                return nil, err
        }
 
-       if err = builder.SetFormatVersion(formatVersion); err != nil {
-               return nil, err
-       }
-
        if err = builder.SetUUID(tableUuid); err != nil {
                return nil, err
        }
@@ -1791,3 +1815,18 @@ func reassignIDs(sc *iceberg.Schema, partitions 
*iceberg.PartitionSpec, sortOrde
                sortOrder:     freshSortOrder,
        }, nil
 }
+
+func UpdateTableMetadata(base Metadata, updates []Update, metadataLoc string) 
(Metadata, error) {
+       bldr, err := MetadataBuilderFromBase(base, metadataLoc)
+       if err != nil {
+               return nil, err
+       }
+
+       for _, update := range updates {
+               if err := update.Apply(bldr); err != nil {
+                       return nil, err
+               }
+       }
+
+       return bldr.Build()
+}
diff --git a/table/metadata_builder_internal_test.go 
b/table/metadata_builder_internal_test.go
index addb182f..01034817 100644
--- a/table/metadata_builder_internal_test.go
+++ b/table/metadata_builder_internal_test.go
@@ -20,6 +20,7 @@ package table
 import (
        "fmt"
        "testing"
+       "time"
 
        "github.com/apache/iceberg-go"
        "github.com/davecgh/go-spew/spew"
@@ -70,12 +71,11 @@ func builderWithoutChanges(formatVersion int) 
MetadataBuilder {
        partitionSpec := partitionSpec()
        sortOrder := sortOrder()
 
-       builder, err := NewMetadataBuilder()
+       builder, err := NewMetadataBuilder(formatVersion)
        if err != nil {
                panic(err)
        }
-       if err = builder.SetFormatVersion(formatVersion); err != nil {
-               panic(err)
+       if err = builder.SetLoc("s3://bucket/test/location"); err != nil {
        }
        if err = builder.AddSchema(&tableSchema); err != nil {
                panic(err)
@@ -100,7 +100,7 @@ func builderWithoutChanges(formatVersion int) 
MetadataBuilder {
        if err != nil {
                panic(err)
        }
-       builder, err = MetadataBuilderFromBase(meta)
+       builder, err = MetadataBuilderFromBase(meta, 
"s3://bucket/test/location/metadata/metadata1.json")
        if err != nil {
                panic(err)
        }
@@ -108,6 +108,162 @@ func builderWithoutChanges(formatVersion int) 
MetadataBuilder {
        return *builder
 }
 
+func TestBuildUnpartitionedUnsorted(t *testing.T) {
+       TestLocation := "file:///tmp/iceberg-test"
+       tableSchema := schema()
+       partitionSpec := iceberg.NewPartitionSpecID(0)
+
+       builder, err := NewMetadataBuilder(2)
+       require.NoError(t, err)
+       require.NoError(t, builder.SetFormatVersion(2))
+       require.NoError(t, builder.AddSchema(&tableSchema))
+       require.NoError(t, builder.SetCurrentSchemaID(-1))
+       require.NoError(t, builder.AddSortOrder(&UnsortedSortOrder))
+       require.NoError(t, builder.SetDefaultSortOrderID(-1))
+       require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true))
+       require.NoError(t, builder.SetDefaultSpecID(-1))
+       require.NoError(t, builder.SetLoc(TestLocation))
+
+       meta, err := builder.Build()
+
+       require.NoError(t, err)
+       require.NotNil(t, meta)
+
+       require.Equal(t, 2, meta.Version())
+       require.Equal(t, TestLocation, meta.Location())
+       require.Equal(t, 0, meta.CurrentSchema().ID)
+       require.Equal(t, 0, meta.DefaultPartitionSpec())
+       require.Equal(t, 0, meta.DefaultSortOrder())
+       require.Equal(t, 999, *meta.LastPartitionSpecID())
+       require.Equal(t, 3, meta.LastColumnID())
+       require.Equal(t, 0, len(meta.Snapshots()))
+       require.Nil(t, meta.CurrentSnapshot())
+       for range meta.Refs() {
+               t.Fatalf("refs should be empty.")
+       }
+       require.Equal(t, len(meta.Properties()), 0)
+       for range meta.PreviousFiles() {
+               t.Fatalf("metadata log should be empty.")
+       }
+       require.Equal(t, meta.LastSequenceNumber(), int64(0))
+       require.Equal(t, meta.LastColumnID(), 3)
+}
+
+func TestReassignIds(t *testing.T) {
+       TestLocation := "file:///tmp/iceberg-test"
+       schema := iceberg.NewSchema(10, iceberg.NestedField{
+               ID:       11,
+               Name:     "a",
+               Type:     iceberg.PrimitiveTypes.Int64,
+               Required: true,
+       }, iceberg.NestedField{
+               ID:       12,
+               Name:     "b",
+               Type:     iceberg.PrimitiveTypes.Int64,
+               Required: true,
+       }, iceberg.NestedField{
+               ID:   13,
+               Name: "struct",
+               Type: &iceberg.StructType{
+                       FieldList: []iceberg.NestedField{
+                               {
+                                       Type:     iceberg.PrimitiveTypes.Int64,
+                                       ID:       14,
+                                       Name:     "nested",
+                                       Required: true,
+                               },
+                       },
+               },
+               Required: true,
+       },
+               iceberg.NestedField{
+                       ID:       15,
+                       Name:     "c",
+                       Type:     iceberg.PrimitiveTypes.Int64,
+                       Required: true,
+               })
+
+       spec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(20),
+               iceberg.AddPartitionFieldByName("a", "a", 
iceberg.IdentityTransform{}, schema, nil),
+               iceberg.AddPartitionFieldByName("struct.nested", 
"nested_partition", iceberg.IdentityTransform{}, schema, nil))
+
+       require.NoError(t, err)
+
+       sortOrder, err := NewSortOrder(10, []SortField{
+               {
+                       SourceID:  11,
+                       Transform: iceberg.IdentityTransform{},
+                       Direction: SortASC,
+                       NullOrder: NullsFirst,
+               },
+       })
+       require.NoError(t, err)
+       meta, err := NewMetadata(
+               schema,
+               &spec,
+               sortOrder,
+               TestLocation,
+               map[string]string{})
+
+       require.NoError(t, err)
+       require.NotNil(t, meta)
+
+       expectedSchema := iceberg.NewSchema(0, iceberg.NestedField{
+               ID:       1,
+               Name:     "a",
+               Type:     iceberg.PrimitiveTypes.Int64,
+               Required: true,
+       }, iceberg.NestedField{
+               ID:       2,
+               Name:     "b",
+               Type:     iceberg.PrimitiveTypes.Int64,
+               Required: true,
+       }, iceberg.NestedField{
+               ID:   3,
+               Name: "struct",
+               Type: &iceberg.StructType{
+                       FieldList: []iceberg.NestedField{
+                               {
+                                       Type: iceberg.PrimitiveTypes.Int64,
+                                       // TODO: this is discrepancy with rust 
impl, is 5 over there
+                                       ID:       4,
+                                       Name:     "nested",
+                                       Required: true,
+                               },
+                       },
+               },
+               Required: true,
+       },
+               iceberg.NestedField{
+                       // TODO: this is discrepancy with rust impl, is 4 over 
there
+                       ID:       5,
+                       Name:     "c",
+                       Type:     iceberg.PrimitiveTypes.Int64,
+                       Required: true,
+               })
+       id := 1000
+       fieldID := 1001
+       expectedSpec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(0),
+               iceberg.AddPartitionFieldByName("a", "a", 
iceberg.IdentityTransform{}, expectedSchema, &id),
+               iceberg.AddPartitionFieldByName("struct.nested", 
"nested_partition", iceberg.IdentityTransform{}, expectedSchema, &fieldID))
+
+       require.NoError(t, err)
+
+       expectedSortOrder, err := NewSortOrder(1, []SortField{
+               {
+                       SourceID:  1,
+                       Transform: iceberg.IdentityTransform{},
+                       Direction: SortASC,
+                       NullOrder: NullsFirst,
+               },
+       })
+       require.NoError(t, err)
+
+       require.True(t, expectedSchema.Equals(meta.Schemas()[0]))
+       require.True(t, expectedSpec.Equals(meta.PartitionSpecs()[0]))
+       require.True(t, expectedSortOrder.Equals(meta.SortOrders()[0]))
+}
+
 func TestAddRemovePartitionSpec(t *testing.T) {
        builder := builderWithoutChanges(2)
        builderRef := &builder
@@ -141,7 +297,7 @@ func TestAddRemovePartitionSpec(t *testing.T) {
        }
        require.True(t, found, "expected partition spec to be added")
 
-       newBuilder, err := MetadataBuilderFromBase(metadata)
+       newBuilder, err := MetadataBuilderFromBase(metadata, "")
        require.NoError(t, err)
        // Remove the spec
        require.NoError(t, newBuilder.RemovePartitionSpecs([]int{1}))
@@ -214,7 +370,7 @@ func TestSetExistingDefaultPartitionSpec(t *testing.T) {
 
        require.True(t, expectedSpec.Equals(metadata.PartitionSpec()), 
"expected partition spec to match added spec")
 
-       newBuilder, err := MetadataBuilderFromBase(metadata)
+       newBuilder, err := MetadataBuilderFromBase(metadata, "")
        require.NoError(t, err)
        require.NotNil(t, newBuilder)
 
@@ -269,7 +425,7 @@ func TestSetRef(t *testing.T) {
                SnapshotID:       1,
                ParentSnapshotID: nil,
                SequenceNumber:   0,
-               TimestampMs:      builder.lastUpdatedMS + 1,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
                ManifestList:     "/snap-1.avro",
                Summary: &Summary{
                        Operation: OpAppend,
@@ -381,7 +537,7 @@ func TestSetBranchSnapshotCreatesBranchIfNotExists(t 
*testing.T) {
                SnapshotID:       2,
                ParentSnapshotID: nil,
                SequenceNumber:   0,
-               TimestampMs:      builder.lastUpdatedMS + 1,
+               TimestampMs:      builder.base.LastUpdatedMillis(),
                ManifestList:     "/snap-1.avro",
                Summary: &Summary{
                        Operation: OpAppend,
@@ -419,7 +575,7 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) {
                SnapshotID:       2,
                ParentSnapshotID: nil,
                SequenceNumber:   0,
-               TimestampMs:      builder.lastUpdatedMS + 1,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
                ManifestList:     "/snap-1.avro",
                Summary: &Summary{
                        Operation: OpAppend,
@@ -449,7 +605,7 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) {
        require.Equal(t, BranchRef, 
builder.updates[1].(*setSnapshotRefUpdate).RefType)
        require.Equal(t, int64(2), 
builder.updates[1].(*setSnapshotRefUpdate).SnapshotID)
 
-       newBuilder, err := MetadataBuilderFromBase(meta)
+       newBuilder, err := MetadataBuilderFromBase(meta, "")
        require.NoError(t, err)
        require.NoError(t, 
newBuilder.RemoveSnapshots([]int64{snapshot.SnapshotID}))
        newMeta, err := newBuilder.Build()
@@ -464,6 +620,81 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) {
        }
 }
 
+func TestExpireMetadataLog(t *testing.T) {
+       builder1 := builderWithoutChanges(2)
+       meta, err := builder1.Build()
+       require.NoError(t, err)
+       builder, err := MetadataBuilderFromBase(meta, "s3://bla")
+       require.NoError(t, err)
+       err = builder.SetProperties(map[string]string{
+               MetadataPreviousVersionsMaxKey: "2",
+       })
+       require.NoError(t, err)
+       meta, err = builder.Build()
+       require.NoError(t, err)
+       require.Len(t, meta.(*metadataV2).MetadataLog, 1)
+
+       location := "p"
+       newBuilder, err := MetadataBuilderFromBase(meta, location)
+       require.NoError(t, err)
+       err = newBuilder.SetProperties(map[string]string{
+               "change_nr": "1",
+       })
+       require.NoError(t, err)
+       meta, err = newBuilder.Build()
+       require.NoError(t, err)
+       require.Len(t, meta.(*metadataV2).MetadataLog, 2)
+
+       newBuilder, err = MetadataBuilderFromBase(meta, location)
+       require.NoError(t, err)
+       err = newBuilder.SetProperties(map[string]string{
+               "change_nr": "2",
+       })
+       require.NoError(t, err)
+       meta, err = newBuilder.Build()
+       require.NoError(t, err)
+       require.Len(t, meta.(*metadataV2).MetadataLog, 2)
+}
+
+func TestV2SequenceNumberCannotDecrease(t *testing.T) {
+       builder := builderWithoutChanges(2)
+       schemaID := 0
+       snapshot1 := Snapshot{
+               SnapshotID:       1,
+               ParentSnapshotID: nil,
+               SequenceNumber:   1,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
+               ManifestList:     "/snap-1.avro",
+               Summary: &Summary{
+                       Operation:  OpAppend,
+                       Properties: map[string]string{},
+               },
+               SchemaID: &schemaID,
+       }
+
+       err := builder.AddSnapshot(&snapshot1)
+       require.NoError(t, err)
+
+       err = builder.SetSnapshotRef(MainBranch, 1, BranchRef, 
WithMinSnapshotsToKeep(10))
+       require.NoError(t, err)
+
+       parentSnapshotID := int64(1)
+       snapshot2 := Snapshot{
+               SnapshotID:       2,
+               ParentSnapshotID: &parentSnapshotID,
+               SequenceNumber:   0, // Lower sequence number than previous
+               TimestampMs:      builder.lastUpdatedMS + 1,
+               ManifestList:     "/snap-0.avro",
+               Summary: &Summary{
+                       Operation:  OpAppend,
+                       Properties: map[string]string{},
+               },
+               SchemaID: &schemaID,
+       }
+       err = builder.AddSnapshot(&snapshot2)
+       require.ErrorContains(t, err, "can't add snapshot with sequence number 
0, must be > than last sequence number 1")
+}
+
 func TestCannotAddDuplicateSnapshotID(t *testing.T) {
        builder := builderWithoutChanges(2)
        schemaID := 0
@@ -471,7 +702,7 @@ func TestCannotAddDuplicateSnapshotID(t *testing.T) {
                SnapshotID:       2,
                ParentSnapshotID: nil,
                SequenceNumber:   0,
-               TimestampMs:      builder.lastUpdatedMS + 1,
+               TimestampMs:      builder.base.LastUpdatedMillis() + 1,
                ManifestList:     "/snap-1.avro",
                Summary: &Summary{
                        Operation: OpAppend,
@@ -488,6 +719,31 @@ func TestCannotAddDuplicateSnapshotID(t *testing.T) {
        require.ErrorContains(t, builder.AddSnapshot(&snapshot), "can't add 
snapshot with id 2, already exists")
 }
 
+func TestLastUpdateIncreasedForPropertyOnlyUpdate(t *testing.T) {
+       builder := builderWithoutChanges(2)
+       meta, err := builder.Build()
+       require.NoError(t, err)
+       lastUpdatedMS := builder.lastUpdatedMS
+       time.Sleep(5 * time.Millisecond)
+       // Set a property
+
+       location := "some-location"
+
+       newBuilder, err := MetadataBuilderFromBase(meta, location)
+       require.NoError(t, err)
+
+       err = newBuilder.SetProperties(map[string]string{
+               "foo": "bar",
+       })
+       require.NoError(t, err)
+       newMeta, err := newBuilder.Build()
+       require.NoError(t, err)
+       require.NotNil(t, newMeta)
+
+       // Check that the last updated timestamp has increased
+       require.Greater(t, newMeta.LastUpdatedMillis(), lastUpdatedMS, 
"expected last updated timestamp to increase after property update")
+}
+
 func TestAddSnapshotRejectsInvalidTimestamp(t *testing.T) {
        builder := builderWithoutChanges(2)
        schemaID := 0
@@ -542,7 +798,7 @@ func TestConstructDefaultMainBranch(t *testing.T) {
        require.NoError(t, err)
        require.NotNil(t, meta)
 
-       builder, err := MetadataBuilderFromBase(meta)
+       builder, err := MetadataBuilderFromBase(meta, "")
        require.NoError(t, err)
 
        meta, err = builder.Build()
@@ -568,7 +824,7 @@ func TestRemoveMainSnapshotRef(t *testing.T) {
        require.NoError(t, err)
        require.NotNil(t, meta)
        require.NotNil(t, meta.CurrentSnapshot())
-       builder, err := MetadataBuilderFromBase(meta)
+       builder, err := MetadataBuilderFromBase(meta, "")
        require.NoError(t, err)
        require.NotNil(t, builder.currentSnapshotID)
        if _, ok := builder.refs[MainBranch]; !ok {
@@ -592,7 +848,7 @@ func TestRemoveSchemas(t *testing.T) {
        meta, err := getTestTableMetadata("TableMetadataV2Valid.json")
        require.NoError(t, err)
        require.Len(t, meta.Schemas(), 2, "expected 2 schemas in the metadata")
-       builder, err := MetadataBuilderFromBase(meta)
+       builder, err := MetadataBuilderFromBase(meta, "")
        require.NoError(t, err)
        err = builder.RemoveSchemas([]int{0})
        require.NoError(t, err, "expected to remove schema with ID 1")
@@ -634,7 +890,7 @@ func TestUpdateSchema(t *testing.T) {
                iceberg.NestedField{ID: 2, Name: "x", Type: 
iceberg.PrimitiveTypes.String, Required: true},
        )
 
-       builder, err := MetadataBuilderFromBase(meta)
+       builder, err := MetadataBuilderFromBase(meta, "")
        require.NoError(t, err)
 
        err = builder.AddSchema(schema2)
@@ -715,3 +971,232 @@ func TestRemoveReservedPropertiesFails(t *testing.T) {
        require.NoError(t, err)
        require.True(t, builder.HasChanges())
 }
+
+func TestIdsAreReassignedForNewMetadata(t *testing.T) {
+       // Create schema with ID 10 (should be reassigned to 0)
+       tableSchema := iceberg.NewSchema(
+               10,
+               iceberg.NestedField{ID: 1, Name: "x", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "y", Type: 
iceberg.PrimitiveTypes.Int64, Required: true, Doc: "comment"},
+               iceberg.NestedField{ID: 3, Name: "z", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       partitionSpec := partitionSpec()
+       sortOrder := sortOrder()
+
+       metadata, err := NewMetadata(
+               tableSchema,
+               &partitionSpec,
+               sortOrder,
+               "file:///tmp/iceberg-test",
+               map[string]string{},
+       )
+
+       require.NoError(t, err)
+       require.NotNil(t, metadata)
+
+       require.Equal(t, 0, metadata.CurrentSchema().ID)
+       require.Equal(t, 0, metadata.(*metadataV2).CurrentSchemaID)
+}
+
+func TestNewMetadataChanges(t *testing.T) {
+       tableSchema := schema()
+       partitionSpec := partitionSpec()
+       sortOrder := sortOrder()
+       properties := map[string]string{
+               "property 1": "value 1",
+       }
+
+       builder, err := NewMetadataBuilder(1)
+       require.NoError(t, err)
+       require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test"))
+       require.NoError(t, builder.AddSchema(&tableSchema))
+       require.NoError(t, builder.SetCurrentSchemaID(-1))
+       require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true))
+       require.NoError(t, builder.SetDefaultSpecID(-1))
+       require.NoError(t, builder.AddSortOrder(&sortOrder))
+       require.NoError(t, builder.SetDefaultSortOrderID(-1))
+       require.NoError(t, builder.SetProperties(properties))
+
+       _, err = builder.Build()
+       require.NoError(t, err)
+
+       require.Len(t, builder.updates, 8)
+
+       require.IsType(t, &setLocationUpdate{}, builder.updates[0])
+       require.Equal(t, "file:///tmp/iceberg-test", 
builder.updates[0].(*setLocationUpdate).Location)
+
+       require.IsType(t, &addSchemaUpdate{}, builder.updates[1])
+       require.True(t, 
builder.updates[1].(*addSchemaUpdate).Schema.Equals(&tableSchema))
+
+       require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[2])
+       require.Equal(t, -1, 
builder.updates[2].(*setCurrentSchemaUpdate).SchemaID)
+
+       require.IsType(t, &addPartitionSpecUpdate{}, builder.updates[3])
+       // For new tables, field IDs should be assigned (1000 for first 
partition field)
+       addedSpec := builder.updates[3].(*addPartitionSpecUpdate).Spec
+       require.Equal(t, 0, addedSpec.ID())
+       require.Equal(t, 1, addedSpec.Len())
+       require.Equal(t, 1000, addedSpec.Field(0).FieldID)
+
+       require.IsType(t, &setDefaultSpecUpdate{}, builder.updates[4])
+       require.Equal(t, -1, builder.updates[4].(*setDefaultSpecUpdate).SpecID)
+
+       require.IsType(t, &addSortOrderUpdate{}, builder.updates[5])
+       require.True(t, 
builder.updates[5].(*addSortOrderUpdate).SortOrder.Equals(sortOrder))
+
+       require.IsType(t, &setDefaultSortOrderUpdate{}, builder.updates[6])
+       require.Equal(t, -1, 
builder.updates[6].(*setDefaultSortOrderUpdate).SortOrderID)
+
+       require.IsType(t, &setPropertiesUpdate{}, builder.updates[7])
+       require.Equal(t, iceberg.Properties{"property 1": "value 1"}, 
builder.updates[7].(*setPropertiesUpdate).Updates)
+}
+
+func TestNewMetadataChangesUnpartitionedUnsorted(t *testing.T) {
+       tableSchema := *iceberg.NewSchema(0)
+       partitionSpec := *iceberg.UnpartitionedSpec
+       sortOrder := UnsortedSortOrder
+
+       builder, err := NewMetadataBuilder(1)
+       require.NoError(t, err)
+       require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test"))
+       require.NoError(t, builder.AddSchema(&tableSchema))
+       require.NoError(t, builder.SetCurrentSchemaID(-1))
+       require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true))
+       require.NoError(t, builder.SetDefaultSpecID(-1))
+       require.NoError(t, builder.AddSortOrder(&sortOrder))
+       require.NoError(t, builder.SetDefaultSortOrderID(-1))
+
+       _, err = builder.Build()
+       require.NoError(t, err)
+
+       // Verify the expected updates were created (7 updates, no properties)
+       require.Len(t, builder.updates, 7)
+
+       // Check each update type in order
+       require.IsType(t, &setLocationUpdate{}, builder.updates[0])
+       require.Equal(t, "file:///tmp/iceberg-test", 
builder.updates[0].(*setLocationUpdate).Location)
+
+       require.IsType(t, &addSchemaUpdate{}, builder.updates[1])
+       require.True(t, 
builder.updates[1].(*addSchemaUpdate).Schema.Equals(&tableSchema))
+
+       require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[2])
+       require.Equal(t, -1, 
builder.updates[2].(*setCurrentSchemaUpdate).SchemaID)
+
+       require.IsType(t, &addPartitionSpecUpdate{}, builder.updates[3])
+
+       addedSpec := builder.updates[3].(*addPartitionSpecUpdate).Spec
+       require.Equal(t, 0, addedSpec.ID())
+       require.Equal(t, 0, addedSpec.Len()) // Unpartitioned = no fields
+
+       require.IsType(t, &setDefaultSpecUpdate{}, builder.updates[4])
+       require.Equal(t, -1, builder.updates[4].(*setDefaultSpecUpdate).SpecID)
+
+       require.IsType(t, &addSortOrderUpdate{}, builder.updates[5])
+       require.True(t, 
builder.updates[5].(*addSortOrderUpdate).SortOrder.Equals(sortOrder))
+
+       require.IsType(t, &setDefaultSortOrderUpdate{}, builder.updates[6])
+       require.Equal(t, -1, 
builder.updates[6].(*setDefaultSortOrderUpdate).SortOrderID)
+}
+
+func TestSetCurrentSchemaChangeIsMinusOneIfSchemaWasAddedInThisChange(t 
*testing.T) {
+       builder := builderWithoutChanges(2)
+
+       addedSchema := iceberg.NewSchema(
+               1,
+               iceberg.NestedField{ID: 1, Name: "x", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "y", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 3, Name: "z", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 4, Name: "a", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+
+       err := builder.AddSchema(addedSchema)
+       require.NoError(t, err)
+
+       err = builder.SetCurrentSchemaID(1)
+       require.NoError(t, err)
+
+       _, err = builder.Build()
+       require.NoError(t, err)
+
+       // Should have 2 updates
+       require.Len(t, builder.updates, 2)
+
+       // First update should be AddSchema
+       require.IsType(t, &addSchemaUpdate{}, builder.updates[0])
+       require.True(t, 
builder.updates[0].(*addSchemaUpdate).Schema.Equals(addedSchema))
+
+       // Second update should be SetCurrentSchema with schema_id = -1 
(indicates last added)
+       require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[1])
+       require.Equal(t, -1, 
builder.updates[1].(*setCurrentSchemaUpdate).SchemaID)
+}
+
+func TestNoMetadataLogForCreateTable(t *testing.T) {
+       tableSchema := schema()
+       partitionSpec := partitionSpec()
+       sortOrder := sortOrder()
+
+       metadata, err := NewMetadata(
+               &tableSchema,
+               &partitionSpec,
+               sortOrder,
+               "file:///tmp/iceberg-test",
+               map[string]string{},
+       )
+
+       require.NoError(t, err)
+       require.NotNil(t, metadata)
+
+       require.Len(t, metadata.(*metadataV2).MetadataLog, 0)
+}
+
+func TestNoMetadataLogEntryForNoPreviousLocation(t *testing.T) {
+       builder := builderWithoutChanges(2)
+       require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test"))
+       metadata, err := builder.Build()
+       require.NoError(t, err)
+       require.NotNil(t, metadata)
+       require.Len(t, metadata.(*metadataV2).MetadataLog, 1)
+
+       newBuilder, err := MetadataBuilderFromBase(metadata, "")
+       require.NoError(t, err)
+
+       err = newBuilder.SetProperties(map[string]string{
+               "foo": "bar",
+       })
+       require.NoError(t, err)
+
+       newMetadata, err := newBuilder.Build()
+       require.NoError(t, err)
+
+       require.Len(t, newMetadata.(*metadataV2).MetadataLog, 1)
+}
+
+func TestFromMetadataGeneratesMetadataLog(t *testing.T) {
+       metadataPath := "s3://bucket/test/location/metadata/metadata1.json"
+
+       tableSchema := schema()
+       partitionSpec := partitionSpec()
+       sortOrder := sortOrder()
+
+       metadata, err := NewMetadata(
+               &tableSchema,
+               &partitionSpec,
+               sortOrder,
+               "file:///tmp/iceberg-test",
+               map[string]string{},
+       )
+       require.NoError(t, err)
+       require.NotNil(t, metadata)
+
+       builder, err := MetadataBuilderFromBase(metadata, metadataPath)
+       require.NoError(t, err)
+
+       err = builder.AddSortOrder(&UnsortedSortOrder)
+       require.NoError(t, err)
+
+       newMetadata, err := builder.Build()
+       require.NoError(t, err)
+
+       require.Len(t, newMetadata.(*metadataV2).MetadataLog, 1)
+       require.Equal(t, metadataPath, 
newMetadata.(*metadataV2).MetadataLog[0].MetadataFile)
+}
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index 7b58eb01..3401780f 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -846,7 +846,7 @@ func TestMetadataV2Serialize(t *testing.T) {
 }
 
 func TestMetadataBuilderSetDefaultSpecIDLastPartition(t *testing.T) {
-       builder, err := NewMetadataBuilder()
+       builder, err := NewMetadataBuilder(2)
        assert.NoError(t, err)
        schema := schema()
        assert.NoError(t, builder.AddSchema(&schema))
@@ -860,9 +860,8 @@ func TestMetadataBuilderSetDefaultSpecIDLastPartition(t 
*testing.T) {
 }
 
 func TestMetadataBuilderSetLastAddedSchema(t *testing.T) {
-       builder, err := NewMetadataBuilder()
+       builder, err := NewMetadataBuilder(2)
        assert.NoError(t, err)
-       assert.NoError(t, builder.SetFormatVersion(2))
        schema := iceberg.NewSchema(1,
                iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.StringType{}, Required: true},
        )
@@ -874,6 +873,10 @@ func TestMetadataBuilderSetLastAddedSchema(t *testing.T) {
 
        assert.NoError(t, builder.SetDefaultSpecID(-1))
 
+       unsorted := UnsortedSortOrder
+       require.NoError(t, builder.AddSortOrder(&unsorted))
+       require.NoError(t, builder.SetDefaultSortOrderID(-1))
+
        meta, err := builder.Build()
        assert.NoError(t, err)
        assert.Equal(t, schema.ID, meta.CurrentSchema().ID)
@@ -881,7 +884,7 @@ func TestMetadataBuilderSetLastAddedSchema(t *testing.T) {
 }
 
 func TestMetadataBuilderSchemaIncreasingNumbering(t *testing.T) {
-       builder, err := NewMetadataBuilder()
+       builder, err := NewMetadataBuilder(2)
        assert.NoError(t, err)
        assert.NoError(t, builder.SetFormatVersion(2))
        schema := iceberg.NewSchema(1,
@@ -905,7 +908,7 @@ func TestMetadataBuilderSchemaIncreasingNumbering(t 
*testing.T) {
 }
 
 func TestMetadataBuilderReuseSchema(t *testing.T) {
-       builder, err := NewMetadataBuilder()
+       builder, err := NewMetadataBuilder(2)
        assert.NoError(t, err)
        schema := iceberg.NewSchema(1,
                iceberg.NestedField{ID: 1, Name: "foo", Type: 
iceberg.StringType{}, Required: true},
diff --git a/table/partitioned_fanout_writer_test.go 
b/table/partitioned_fanout_writer_test.go
index 09c172e7..b7e0f6d8 100644
--- a/table/partitioned_fanout_writer_test.go
+++ b/table/partitioned_fanout_writer_test.go
@@ -106,7 +106,7 @@ func (s *FanoutWriterTestSuite) 
testTransformPartition(transform iceberg.Transfo
        meta, err := NewMetadata(icebergSchema, &spec, UnsortedSortOrder, loc, 
iceberg.Properties{})
        s.Require().NoError(err)
 
-       metaBuilder, err := MetadataBuilderFromBase(meta)
+       metaBuilder, err := MetadataBuilderFromBase(meta, "")
        s.Require().NoError(err)
 
        args := recordWritingArgs{
diff --git a/table/table.go b/table/table.go
index 04a0803b..bd2ab8cb 100644
--- a/table/table.go
+++ b/table/table.go
@@ -83,7 +83,7 @@ func (t Table) LocationProvider() (LocationProvider, error) {
 }
 
 func (t Table) NewTransaction() *Transaction {
-       meta, _ := MetadataBuilderFromBase(t.metadata)
+       meta, _ := MetadataBuilderFromBase(t.metadata, t.metadataLocation)
 
        return &Transaction{
                tbl:  &t,
diff --git a/table/table_test.go b/table/table_test.go
index 01eff6b2..47fbe966 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -789,18 +789,7 @@ func (m *mockedCatalog) LoadTable(ctx context.Context, 
ident table.Identifier) (
 }
 
 func (m *mockedCatalog) CommitTable(ctx context.Context, ident 
table.Identifier, reqs []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
-       bldr, err := table.MetadataBuilderFromBase(m.metadata)
-       if err != nil {
-               return nil, "", err
-       }
-
-       for _, u := range updates {
-               if err := u.Apply(bldr); err != nil {
-                       return nil, "", err
-               }
-       }
-
-       meta, err := bldr.Build()
+       meta, err := table.UpdateTableMetadata(m.metadata, updates, "")
        if err != nil {
                return nil, "", err
        }
@@ -1336,7 +1325,7 @@ func (m *DeleteOldMetadataMockedCatalog) LoadTable(ctx 
context.Context, ident ta
 }
 
 func (m *DeleteOldMetadataMockedCatalog) CommitTable(ctx context.Context, 
ident table.Identifier, reqs []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
-       bldr, err := table.MetadataBuilderFromBase(m.metadata)
+       bldr, err := table.MetadataBuilderFromBase(m.metadata, "")
        if err != nil {
                return nil, "", err
        }
diff --git a/table/time_travel_test.go b/table/time_travel_test.go
index dcc23805..d20548d1 100644
--- a/table/time_travel_test.go
+++ b/table/time_travel_test.go
@@ -280,7 +280,7 @@ func createTestMetadata(snapshots []Snapshot, snapshotLog 
[]SnapshotLogEntry) (M
 
        // If we have custom snapshots or logs, we need to modify the metadata
        if len(snapshots) > 0 || len(snapshotLog) > 0 {
-               builder, err := MetadataBuilderFromBase(meta)
+               builder, err := MetadataBuilderFromBase(meta, "")
                if err != nil {
                        return nil, err
                }


Reply via email to