This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new dba25d84 feat(metadata): export UpdateTable & fixes (#580)
dba25d84 is described below
commit dba25d84620964be756187d381f8717f1535688f
Author: Tobias Pütz <[email protected]>
AuthorDate: Mon Oct 6 19:33:56 2025 +0200
feat(metadata): export UpdateTable & fixes (#580)
- exports UpdateTable so that outside packages can use it
- adds metadata log maintenance to the builder, i.e. based on
currentFileLocation, an entry is added to the metadata log & on Build,
we trim the metadata log
- ports remaining tests from iceberg-rust and implements fixes for them
---
catalog/glue/schema_test.go | 2 +-
catalog/internal/utils.go | 30 +-
errors.go | 6 +-
table/arrow_utils_internal_test.go | 2 +-
table/metadata.go | 101 +++++--
table/metadata_builder_internal_test.go | 515 +++++++++++++++++++++++++++++++-
table/metadata_internal_test.go | 13 +-
table/partitioned_fanout_writer_test.go | 2 +-
table/table.go | 2 +-
table/table_test.go | 15 +-
table/time_travel_test.go | 2 +-
11 files changed, 591 insertions(+), 99 deletions(-)
diff --git a/catalog/glue/schema_test.go b/catalog/glue/schema_test.go
index 88412db0..04f5dd74 100644
--- a/catalog/glue/schema_test.go
+++ b/catalog/glue/schema_test.go
@@ -437,7 +437,7 @@ func TestSchemasToGlueColumns(t *testing.T) {
metadata, err := table.NewMetadata(schemas[0], nil, table.SortOrder{},
"s3://example/path", nil)
assert.NoError(t, err)
- mb, err := table.MetadataBuilderFromBase(metadata)
+ mb, err := table.MetadataBuilderFromBase(metadata, "")
assert.NoError(t, err)
err = mb.AddSchema(schemas[1])
diff --git a/catalog/internal/utils.go b/catalog/internal/utils.go
index 2cbdc906..99f6bfa5 100644
--- a/catalog/internal/utils.go
+++ b/catalog/internal/utils.go
@@ -70,35 +70,7 @@ func WriteMetadata(ctx context.Context, metadata
table.Metadata, loc string, pro
}
func UpdateTableMetadata(base table.Metadata, updates []table.Update,
metadataLoc string) (table.Metadata, error) {
- bldr, err := table.MetadataBuilderFromBase(base)
- if err != nil {
- return nil, err
- }
-
- for _, update := range updates {
- if err := update.Apply(bldr); err != nil {
- return nil, err
- }
- }
-
- if bldr.HasChanges() {
- if metadataLoc != "" {
- maxMetadataLogEntries := max(1,
- base.Properties().GetInt(
- table.MetadataPreviousVersionsMaxKey,
table.MetadataPreviousVersionsMaxDefault))
-
- bldr.TrimMetadataLogs(maxMetadataLogEntries + 1).
- AppendMetadataLog(table.MetadataLogEntry{
- MetadataFile: metadataLoc,
- TimestampMs: base.LastUpdatedMillis(),
- })
- }
- if base.LastUpdatedMillis() == bldr.LastUpdatedMS() {
- bldr.SetLastUpdatedMS()
- }
- }
-
- return bldr.Build()
+ return table.UpdateTableMetadata(base, updates, metadataLoc)
}
func CreateStagedTable(ctx context.Context, catprops iceberg.Properties,
nspropsFn GetNamespacePropsFn, ident table.Identifier, sc *iceberg.Schema, opts
...catalog.CreateTableOpt) (table.StagedTable, error) {
diff --git a/errors.go b/errors.go
index 9ecc26d5..f1194fc0 100644
--- a/errors.go
+++ b/errors.go
@@ -17,12 +17,16 @@
package iceberg
-import "errors"
+import (
+ "errors"
+ "fmt"
+)
var (
ErrInvalidTypeString = errors.New("invalid type")
ErrNotImplemented = errors.New("not implemented")
ErrInvalidArgument = errors.New("invalid argument")
+ ErrInvalidFormatVersion = fmt.Errorf("%w: invalid format version",
ErrInvalidArgument)
ErrInvalidSchema = errors.New("invalid schema")
ErrInvalidTransform = errors.New("invalid transform syntax")
ErrType = errors.New("type error")
diff --git a/table/arrow_utils_internal_test.go
b/table/arrow_utils_internal_test.go
index 4305642e..d3229822 100644
--- a/table/arrow_utils_internal_test.go
+++ b/table/arrow_utils_internal_test.go
@@ -184,7 +184,7 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta
iceberg.Properties, writeSt
schema := tableMeta.CurrentSchema()
if len(meta) > 0 {
- bldr, err := MetadataBuilderFromBase(tableMeta)
+ bldr, err := MetadataBuilderFromBase(tableMeta, "")
suite.Require().NoError(err)
err = bldr.SetProperties(meta)
suite.Require().NoError(err)
diff --git a/table/metadata.go b/table/metadata.go
index 2240549b..b41d40a5 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -175,6 +175,7 @@ type MetadataBuilder struct {
defaultSortOrderID int
refs map[string]SnapshotRef
+ previousFileEntry *MetadataLogEntry
// >v1 specific
lastSequenceNumber *int64
// update tracking
@@ -183,28 +184,41 @@ type MetadataBuilder struct {
lastAddedSortOrderID *int
}
-func NewMetadataBuilder() (*MetadataBuilder, error) {
+func NewMetadataBuilder(formatVersion int) (*MetadataBuilder, error) {
+ if formatVersion < 1 || formatVersion > supportedTableFormatVersion {
+ return nil, fmt.Errorf("%w: %d",
iceberg.ErrInvalidFormatVersion, formatVersion)
+ }
+
return &MetadataBuilder{
- updates: make([]Update, 0),
- schemaList: make([]*iceberg.Schema, 0),
- specs: make([]iceberg.PartitionSpec, 0),
- props: make(iceberg.Properties),
- snapshotList: make([]Snapshot, 0),
- snapshotLog: make([]SnapshotLogEntry, 0),
- metadataLog: make([]MetadataLogEntry, 0),
- sortOrderList: make([]SortOrder, 0),
- refs: make(map[string]SnapshotRef),
+ updates: make([]Update, 0),
+ schemaList: make([]*iceberg.Schema, 0),
+ specs: make([]iceberg.PartitionSpec, 0),
+ props: make(iceberg.Properties),
+ snapshotList: make([]Snapshot, 0),
+ snapshotLog: make([]SnapshotLogEntry, 0),
+ metadataLog: make([]MetadataLogEntry, 0),
+ sortOrderList: make([]SortOrder, 0),
+ refs: make(map[string]SnapshotRef),
+ currentSchemaID: -1,
+ defaultSortOrderID: -1,
+ defaultSpecID: -1,
+ lastColumnId: -1,
+ formatVersion: formatVersion,
}, nil
}
-func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) {
+// MetadataBuilderFromBase creates a MetadataBuilder from an existing Metadata
object.
+// currentFileLocation is the location where the current version of the
metadata
+// file is stored. This is used to update the metadata log. If
currentFileLocation is
+// empty, the metadata log will not be updated. This should only be used to
stage-create tables.
+func MetadataBuilderFromBase(metadata Metadata, currentFileLocation string)
(*MetadataBuilder, error) {
b := &MetadataBuilder{}
b.base = metadata
b.formatVersion = metadata.Version()
b.uuid = metadata.TableUUID()
b.loc = metadata.Location()
- b.lastUpdatedMS = metadata.LastUpdatedMillis()
+ b.lastUpdatedMS = 0
b.lastColumnId = metadata.LastColumnID()
b.schemaList = slices.Clone(metadata.Schemas())
b.currentSchemaID = metadata.CurrentSchema().ID
@@ -229,6 +243,13 @@ func MetadataBuilderFromBase(metadata Metadata)
(*MetadataBuilder, error) {
b.snapshotLog = slices.Collect(metadata.SnapshotLogs())
b.metadataLog = slices.Collect(metadata.PreviousFiles())
+ if currentFileLocation != "" {
+ b.previousFileEntry = &MetadataLogEntry{
+ MetadataFile: currentFileLocation,
+ TimestampMs: metadata.LastUpdatedMillis(),
+ }
+ }
+
return b, nil
}
@@ -451,6 +472,7 @@ func (b *MetadataBuilder) AddSortOrder(sortOrder
*SortOrder) error {
}
b.sortOrderList = append(sortOrders, *sortOrder)
+ b.lastAddedSortOrderID = &sortOrder.orderID
b.updates = append(b.updates, NewAddSortOrderUpdate(sortOrder))
return nil
@@ -474,8 +496,7 @@ func (b *MetadataBuilder) RemoveProperties(keys []string)
error {
}
func (b *MetadataBuilder) SetCurrentSchemaID(currentSchemaID int) error {
- takeLast := currentSchemaID == -1
- if takeLast {
+ if currentSchemaID == -1 {
if b.lastAddedSchemaID == nil {
return errors.New("can't set current schema to last
added schema, no schema has been added")
}
@@ -491,7 +512,7 @@ func (b *MetadataBuilder)
SetCurrentSchemaID(currentSchemaID int) error {
return fmt.Errorf("can't set current schema to schema with id
%d: %w", currentSchemaID, err)
}
- if takeLast {
+ if b.lastAddedSchemaID != nil && currentSchemaID ==
*b.lastAddedSchemaID {
b.updates = append(b.updates, NewSetCurrentSchemaUpdate(-1))
} else {
b.updates = append(b.updates,
NewSetCurrentSchemaUpdate(currentSchemaID))
@@ -503,14 +524,10 @@ func (b *MetadataBuilder)
SetCurrentSchemaID(currentSchemaID int) error {
func (b *MetadataBuilder) SetDefaultSortOrderID(defaultSortOrderID int) error {
if defaultSortOrderID == -1 {
- defaultSortOrderID = maxBy(b.sortOrderList, func(s SortOrder)
int {
- return s.OrderID()
- })
- if !slices.ContainsFunc(b.updates, func(u Update) bool {
- return u.Action() == UpdateAddSortOrder &&
u.(*addSortOrderUpdate).SortOrder.OrderID() == defaultSortOrderID
- }) {
+ if b.lastAddedSortOrderID == nil {
return errors.New("can't set default sort order to last
added with no added sort orders")
}
+ defaultSortOrderID = *b.lastAddedSortOrderID
}
if defaultSortOrderID == b.defaultSortOrderID {
@@ -521,17 +538,20 @@ func (b *MetadataBuilder)
SetDefaultSortOrderID(defaultSortOrderID int) error {
return fmt.Errorf("can't set default sort order to sort order
with id %d: %w", defaultSortOrderID, err)
}
- b.updates = append(b.updates,
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+ if b.lastAddedSortOrderID != nil && defaultSortOrderID ==
*b.lastAddedSortOrderID {
+ b.updates = append(b.updates, NewSetDefaultSortOrderUpdate(-1))
+ } else {
+ b.updates = append(b.updates,
NewSetDefaultSortOrderUpdate(defaultSortOrderID))
+ }
+
b.defaultSortOrderID = defaultSortOrderID
return nil
}
func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID int) error {
- lastUsed := false
if defaultSpecID == -1 {
if b.lastAddedPartitionID != nil {
- lastUsed = true
defaultSpecID = *b.lastAddedPartitionID
} else {
return errors.New("can't set default spec to last added
with no added partition specs")
@@ -546,7 +566,7 @@ func (b *MetadataBuilder) SetDefaultSpecID(defaultSpecID
int) error {
return fmt.Errorf("can't set default spec to spec with id %d:
%w", defaultSpecID, err)
}
- if lastUsed {
+ if b.lastAddedPartitionID != nil && defaultSpecID ==
*b.lastAddedPartitionID {
b.updates = append(b.updates, NewSetDefaultSpecUpdate(-1))
} else {
b.updates = append(b.updates,
NewSetDefaultSpecUpdate(defaultSpecID))
@@ -563,7 +583,7 @@ func (b *MetadataBuilder) SetFormatVersion(formatVersion
int) error {
}
if formatVersion > supportedTableFormatVersion {
- return fmt.Errorf("unsupported format version %d",
formatVersion)
+ return fmt.Errorf("%w: %d", iceberg.ErrInvalidFormatVersion,
formatVersion)
}
if formatVersion == b.formatVersion {
@@ -757,6 +777,14 @@ func (b *MetadataBuilder) buildCommonMetadata()
(*commonMetadata, error) {
b.lastUpdatedMS = time.Now().UnixMilli()
}
+ if b.previousFileEntry != nil && b.HasChanges() {
+ maxMetadataLogEntries := max(1,
+ b.base.Properties().GetInt(
+ MetadataPreviousVersionsMaxKey,
MetadataPreviousVersionsMaxDefault))
+ b.AppendMetadataLog(*b.previousFileEntry)
+ b.TrimMetadataLogs(maxMetadataLogEntries)
+ }
+
return &commonMetadata{
FormatVersion: b.formatVersion,
UUID: b.uuid,
@@ -1702,15 +1730,11 @@ func NewMetadataWithUUID(sc *iceberg.Schema, partitions
*iceberg.PartitionSpec,
return nil, err
}
- builder, err := NewMetadataBuilder()
+ builder, err := NewMetadataBuilder(formatVersion)
if err != nil {
return nil, err
}
- if err = builder.SetFormatVersion(formatVersion); err != nil {
- return nil, err
- }
-
if err = builder.SetUUID(tableUuid); err != nil {
return nil, err
}
@@ -1791,3 +1815,18 @@ func reassignIDs(sc *iceberg.Schema, partitions
*iceberg.PartitionSpec, sortOrde
sortOrder: freshSortOrder,
}, nil
}
+
+func UpdateTableMetadata(base Metadata, updates []Update, metadataLoc string)
(Metadata, error) {
+ bldr, err := MetadataBuilderFromBase(base, metadataLoc)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, update := range updates {
+ if err := update.Apply(bldr); err != nil {
+ return nil, err
+ }
+ }
+
+ return bldr.Build()
+}
diff --git a/table/metadata_builder_internal_test.go
b/table/metadata_builder_internal_test.go
index addb182f..01034817 100644
--- a/table/metadata_builder_internal_test.go
+++ b/table/metadata_builder_internal_test.go
@@ -20,6 +20,7 @@ package table
import (
"fmt"
"testing"
+ "time"
"github.com/apache/iceberg-go"
"github.com/davecgh/go-spew/spew"
@@ -70,12 +71,11 @@ func builderWithoutChanges(formatVersion int)
MetadataBuilder {
partitionSpec := partitionSpec()
sortOrder := sortOrder()
- builder, err := NewMetadataBuilder()
+ builder, err := NewMetadataBuilder(formatVersion)
if err != nil {
panic(err)
}
- if err = builder.SetFormatVersion(formatVersion); err != nil {
- panic(err)
+ if err = builder.SetLoc("s3://bucket/test/location"); err != nil {
}
if err = builder.AddSchema(&tableSchema); err != nil {
panic(err)
@@ -100,7 +100,7 @@ func builderWithoutChanges(formatVersion int)
MetadataBuilder {
if err != nil {
panic(err)
}
- builder, err = MetadataBuilderFromBase(meta)
+ builder, err = MetadataBuilderFromBase(meta,
"s3://bucket/test/location/metadata/metadata1.json")
if err != nil {
panic(err)
}
@@ -108,6 +108,162 @@ func builderWithoutChanges(formatVersion int)
MetadataBuilder {
return *builder
}
+func TestBuildUnpartitionedUnsorted(t *testing.T) {
+ TestLocation := "file:///tmp/iceberg-test"
+ tableSchema := schema()
+ partitionSpec := iceberg.NewPartitionSpecID(0)
+
+ builder, err := NewMetadataBuilder(2)
+ require.NoError(t, err)
+ require.NoError(t, builder.SetFormatVersion(2))
+ require.NoError(t, builder.AddSchema(&tableSchema))
+ require.NoError(t, builder.SetCurrentSchemaID(-1))
+ require.NoError(t, builder.AddSortOrder(&UnsortedSortOrder))
+ require.NoError(t, builder.SetDefaultSortOrderID(-1))
+ require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true))
+ require.NoError(t, builder.SetDefaultSpecID(-1))
+ require.NoError(t, builder.SetLoc(TestLocation))
+
+ meta, err := builder.Build()
+
+ require.NoError(t, err)
+ require.NotNil(t, meta)
+
+ require.Equal(t, 2, meta.Version())
+ require.Equal(t, TestLocation, meta.Location())
+ require.Equal(t, 0, meta.CurrentSchema().ID)
+ require.Equal(t, 0, meta.DefaultPartitionSpec())
+ require.Equal(t, 0, meta.DefaultSortOrder())
+ require.Equal(t, 999, *meta.LastPartitionSpecID())
+ require.Equal(t, 3, meta.LastColumnID())
+ require.Equal(t, 0, len(meta.Snapshots()))
+ require.Nil(t, meta.CurrentSnapshot())
+ for range meta.Refs() {
+ t.Fatalf("refs should be empty.")
+ }
+ require.Equal(t, len(meta.Properties()), 0)
+ for range meta.PreviousFiles() {
+ t.Fatalf("metadata log should be empty.")
+ }
+ require.Equal(t, meta.LastSequenceNumber(), int64(0))
+ require.Equal(t, meta.LastColumnID(), 3)
+}
+
+func TestReassignIds(t *testing.T) {
+ TestLocation := "file:///tmp/iceberg-test"
+ schema := iceberg.NewSchema(10, iceberg.NestedField{
+ ID: 11,
+ Name: "a",
+ Type: iceberg.PrimitiveTypes.Int64,
+ Required: true,
+ }, iceberg.NestedField{
+ ID: 12,
+ Name: "b",
+ Type: iceberg.PrimitiveTypes.Int64,
+ Required: true,
+ }, iceberg.NestedField{
+ ID: 13,
+ Name: "struct",
+ Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {
+ Type: iceberg.PrimitiveTypes.Int64,
+ ID: 14,
+ Name: "nested",
+ Required: true,
+ },
+ },
+ },
+ Required: true,
+ },
+ iceberg.NestedField{
+ ID: 15,
+ Name: "c",
+ Type: iceberg.PrimitiveTypes.Int64,
+ Required: true,
+ })
+
+ spec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(20),
+ iceberg.AddPartitionFieldByName("a", "a",
iceberg.IdentityTransform{}, schema, nil),
+ iceberg.AddPartitionFieldByName("struct.nested",
"nested_partition", iceberg.IdentityTransform{}, schema, nil))
+
+ require.NoError(t, err)
+
+ sortOrder, err := NewSortOrder(10, []SortField{
+ {
+ SourceID: 11,
+ Transform: iceberg.IdentityTransform{},
+ Direction: SortASC,
+ NullOrder: NullsFirst,
+ },
+ })
+ require.NoError(t, err)
+ meta, err := NewMetadata(
+ schema,
+ &spec,
+ sortOrder,
+ TestLocation,
+ map[string]string{})
+
+ require.NoError(t, err)
+ require.NotNil(t, meta)
+
+ expectedSchema := iceberg.NewSchema(0, iceberg.NestedField{
+ ID: 1,
+ Name: "a",
+ Type: iceberg.PrimitiveTypes.Int64,
+ Required: true,
+ }, iceberg.NestedField{
+ ID: 2,
+ Name: "b",
+ Type: iceberg.PrimitiveTypes.Int64,
+ Required: true,
+ }, iceberg.NestedField{
+ ID: 3,
+ Name: "struct",
+ Type: &iceberg.StructType{
+ FieldList: []iceberg.NestedField{
+ {
+ Type: iceberg.PrimitiveTypes.Int64,
+ // TODO: this is discrepancy with rust
impl, is 5 over there
+ ID: 4,
+ Name: "nested",
+ Required: true,
+ },
+ },
+ },
+ Required: true,
+ },
+ iceberg.NestedField{
+ // TODO: this is discrepancy with rust impl, is 4 over
there
+ ID: 5,
+ Name: "c",
+ Type: iceberg.PrimitiveTypes.Int64,
+ Required: true,
+ })
+ id := 1000
+ fieldID := 1001
+ expectedSpec, err := iceberg.NewPartitionSpecOpts(iceberg.WithSpecID(0),
+ iceberg.AddPartitionFieldByName("a", "a",
iceberg.IdentityTransform{}, expectedSchema, &id),
+ iceberg.AddPartitionFieldByName("struct.nested",
"nested_partition", iceberg.IdentityTransform{}, expectedSchema, &fieldID))
+
+ require.NoError(t, err)
+
+ expectedSortOrder, err := NewSortOrder(1, []SortField{
+ {
+ SourceID: 1,
+ Transform: iceberg.IdentityTransform{},
+ Direction: SortASC,
+ NullOrder: NullsFirst,
+ },
+ })
+ require.NoError(t, err)
+
+ require.True(t, expectedSchema.Equals(meta.Schemas()[0]))
+ require.True(t, expectedSpec.Equals(meta.PartitionSpecs()[0]))
+ require.True(t, expectedSortOrder.Equals(meta.SortOrders()[0]))
+}
+
func TestAddRemovePartitionSpec(t *testing.T) {
builder := builderWithoutChanges(2)
builderRef := &builder
@@ -141,7 +297,7 @@ func TestAddRemovePartitionSpec(t *testing.T) {
}
require.True(t, found, "expected partition spec to be added")
- newBuilder, err := MetadataBuilderFromBase(metadata)
+ newBuilder, err := MetadataBuilderFromBase(metadata, "")
require.NoError(t, err)
// Remove the spec
require.NoError(t, newBuilder.RemovePartitionSpecs([]int{1}))
@@ -214,7 +370,7 @@ func TestSetExistingDefaultPartitionSpec(t *testing.T) {
require.True(t, expectedSpec.Equals(metadata.PartitionSpec()),
"expected partition spec to match added spec")
- newBuilder, err := MetadataBuilderFromBase(metadata)
+ newBuilder, err := MetadataBuilderFromBase(metadata, "")
require.NoError(t, err)
require.NotNil(t, newBuilder)
@@ -269,7 +425,7 @@ func TestSetRef(t *testing.T) {
SnapshotID: 1,
ParentSnapshotID: nil,
SequenceNumber: 0,
- TimestampMs: builder.lastUpdatedMS + 1,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
ManifestList: "/snap-1.avro",
Summary: &Summary{
Operation: OpAppend,
@@ -381,7 +537,7 @@ func TestSetBranchSnapshotCreatesBranchIfNotExists(t
*testing.T) {
SnapshotID: 2,
ParentSnapshotID: nil,
SequenceNumber: 0,
- TimestampMs: builder.lastUpdatedMS + 1,
+ TimestampMs: builder.base.LastUpdatedMillis(),
ManifestList: "/snap-1.avro",
Summary: &Summary{
Operation: OpAppend,
@@ -419,7 +575,7 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) {
SnapshotID: 2,
ParentSnapshotID: nil,
SequenceNumber: 0,
- TimestampMs: builder.lastUpdatedMS + 1,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
ManifestList: "/snap-1.avro",
Summary: &Summary{
Operation: OpAppend,
@@ -449,7 +605,7 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) {
require.Equal(t, BranchRef,
builder.updates[1].(*setSnapshotRefUpdate).RefType)
require.Equal(t, int64(2),
builder.updates[1].(*setSnapshotRefUpdate).SnapshotID)
- newBuilder, err := MetadataBuilderFromBase(meta)
+ newBuilder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
require.NoError(t,
newBuilder.RemoveSnapshots([]int64{snapshot.SnapshotID}))
newMeta, err := newBuilder.Build()
@@ -464,6 +620,81 @@ func TestRemoveSnapshotRemovesBranch(t *testing.T) {
}
}
+func TestExpireMetadataLog(t *testing.T) {
+ builder1 := builderWithoutChanges(2)
+ meta, err := builder1.Build()
+ require.NoError(t, err)
+ builder, err := MetadataBuilderFromBase(meta, "s3://bla")
+ require.NoError(t, err)
+ err = builder.SetProperties(map[string]string{
+ MetadataPreviousVersionsMaxKey: "2",
+ })
+ require.NoError(t, err)
+ meta, err = builder.Build()
+ require.NoError(t, err)
+ require.Len(t, meta.(*metadataV2).MetadataLog, 1)
+
+ location := "p"
+ newBuilder, err := MetadataBuilderFromBase(meta, location)
+ require.NoError(t, err)
+ err = newBuilder.SetProperties(map[string]string{
+ "change_nr": "1",
+ })
+ require.NoError(t, err)
+ meta, err = newBuilder.Build()
+ require.NoError(t, err)
+ require.Len(t, meta.(*metadataV2).MetadataLog, 2)
+
+ newBuilder, err = MetadataBuilderFromBase(meta, location)
+ require.NoError(t, err)
+ err = newBuilder.SetProperties(map[string]string{
+ "change_nr": "2",
+ })
+ require.NoError(t, err)
+ meta, err = newBuilder.Build()
+ require.NoError(t, err)
+ require.Len(t, meta.(*metadataV2).MetadataLog, 2)
+}
+
+func TestV2SequenceNumberCannotDecrease(t *testing.T) {
+ builder := builderWithoutChanges(2)
+ schemaID := 0
+ snapshot1 := Snapshot{
+ SnapshotID: 1,
+ ParentSnapshotID: nil,
+ SequenceNumber: 1,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
+ ManifestList: "/snap-1.avro",
+ Summary: &Summary{
+ Operation: OpAppend,
+ Properties: map[string]string{},
+ },
+ SchemaID: &schemaID,
+ }
+
+ err := builder.AddSnapshot(&snapshot1)
+ require.NoError(t, err)
+
+ err = builder.SetSnapshotRef(MainBranch, 1, BranchRef,
WithMinSnapshotsToKeep(10))
+ require.NoError(t, err)
+
+ parentSnapshotID := int64(1)
+ snapshot2 := Snapshot{
+ SnapshotID: 2,
+ ParentSnapshotID: &parentSnapshotID,
+ SequenceNumber: 0, // Lower sequence number than previous
+ TimestampMs: builder.lastUpdatedMS + 1,
+ ManifestList: "/snap-0.avro",
+ Summary: &Summary{
+ Operation: OpAppend,
+ Properties: map[string]string{},
+ },
+ SchemaID: &schemaID,
+ }
+ err = builder.AddSnapshot(&snapshot2)
+ require.ErrorContains(t, err, "can't add snapshot with sequence number
0, must be > than last sequence number 1")
+}
+
func TestCannotAddDuplicateSnapshotID(t *testing.T) {
builder := builderWithoutChanges(2)
schemaID := 0
@@ -471,7 +702,7 @@ func TestCannotAddDuplicateSnapshotID(t *testing.T) {
SnapshotID: 2,
ParentSnapshotID: nil,
SequenceNumber: 0,
- TimestampMs: builder.lastUpdatedMS + 1,
+ TimestampMs: builder.base.LastUpdatedMillis() + 1,
ManifestList: "/snap-1.avro",
Summary: &Summary{
Operation: OpAppend,
@@ -488,6 +719,31 @@ func TestCannotAddDuplicateSnapshotID(t *testing.T) {
require.ErrorContains(t, builder.AddSnapshot(&snapshot), "can't add
snapshot with id 2, already exists")
}
+func TestLastUpdateIncreasedForPropertyOnlyUpdate(t *testing.T) {
+ builder := builderWithoutChanges(2)
+ meta, err := builder.Build()
+ require.NoError(t, err)
+ lastUpdatedMS := builder.lastUpdatedMS
+ time.Sleep(5 * time.Millisecond)
+ // Set a property
+
+ location := "some-location"
+
+ newBuilder, err := MetadataBuilderFromBase(meta, location)
+ require.NoError(t, err)
+
+ err = newBuilder.SetProperties(map[string]string{
+ "foo": "bar",
+ })
+ require.NoError(t, err)
+ newMeta, err := newBuilder.Build()
+ require.NoError(t, err)
+ require.NotNil(t, newMeta)
+
+ // Check that the last updated timestamp has increased
+ require.Greater(t, newMeta.LastUpdatedMillis(), lastUpdatedMS,
"expected last updated timestamp to increase after property update")
+}
+
func TestAddSnapshotRejectsInvalidTimestamp(t *testing.T) {
builder := builderWithoutChanges(2)
schemaID := 0
@@ -542,7 +798,7 @@ func TestConstructDefaultMainBranch(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, meta)
- builder, err := MetadataBuilderFromBase(meta)
+ builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
meta, err = builder.Build()
@@ -568,7 +824,7 @@ func TestRemoveMainSnapshotRef(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, meta)
require.NotNil(t, meta.CurrentSnapshot())
- builder, err := MetadataBuilderFromBase(meta)
+ builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
require.NotNil(t, builder.currentSnapshotID)
if _, ok := builder.refs[MainBranch]; !ok {
@@ -592,7 +848,7 @@ func TestRemoveSchemas(t *testing.T) {
meta, err := getTestTableMetadata("TableMetadataV2Valid.json")
require.NoError(t, err)
require.Len(t, meta.Schemas(), 2, "expected 2 schemas in the metadata")
- builder, err := MetadataBuilderFromBase(meta)
+ builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
err = builder.RemoveSchemas([]int{0})
require.NoError(t, err, "expected to remove schema with ID 1")
@@ -634,7 +890,7 @@ func TestUpdateSchema(t *testing.T) {
iceberg.NestedField{ID: 2, Name: "x", Type:
iceberg.PrimitiveTypes.String, Required: true},
)
- builder, err := MetadataBuilderFromBase(meta)
+ builder, err := MetadataBuilderFromBase(meta, "")
require.NoError(t, err)
err = builder.AddSchema(schema2)
@@ -715,3 +971,232 @@ func TestRemoveReservedPropertiesFails(t *testing.T) {
require.NoError(t, err)
require.True(t, builder.HasChanges())
}
+
+func TestIdsAreReassignedForNewMetadata(t *testing.T) {
+ // Create schema with ID 10 (should be reassigned to 0)
+ tableSchema := iceberg.NewSchema(
+ 10,
+ iceberg.NestedField{ID: 1, Name: "x", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "y", Type:
iceberg.PrimitiveTypes.Int64, Required: true, Doc: "comment"},
+ iceberg.NestedField{ID: 3, Name: "z", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+ partitionSpec := partitionSpec()
+ sortOrder := sortOrder()
+
+ metadata, err := NewMetadata(
+ tableSchema,
+ &partitionSpec,
+ sortOrder,
+ "file:///tmp/iceberg-test",
+ map[string]string{},
+ )
+
+ require.NoError(t, err)
+ require.NotNil(t, metadata)
+
+ require.Equal(t, 0, metadata.CurrentSchema().ID)
+ require.Equal(t, 0, metadata.(*metadataV2).CurrentSchemaID)
+}
+
+func TestNewMetadataChanges(t *testing.T) {
+ tableSchema := schema()
+ partitionSpec := partitionSpec()
+ sortOrder := sortOrder()
+ properties := map[string]string{
+ "property 1": "value 1",
+ }
+
+ builder, err := NewMetadataBuilder(1)
+ require.NoError(t, err)
+ require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test"))
+ require.NoError(t, builder.AddSchema(&tableSchema))
+ require.NoError(t, builder.SetCurrentSchemaID(-1))
+ require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true))
+ require.NoError(t, builder.SetDefaultSpecID(-1))
+ require.NoError(t, builder.AddSortOrder(&sortOrder))
+ require.NoError(t, builder.SetDefaultSortOrderID(-1))
+ require.NoError(t, builder.SetProperties(properties))
+
+ _, err = builder.Build()
+ require.NoError(t, err)
+
+ require.Len(t, builder.updates, 8)
+
+ require.IsType(t, &setLocationUpdate{}, builder.updates[0])
+ require.Equal(t, "file:///tmp/iceberg-test",
builder.updates[0].(*setLocationUpdate).Location)
+
+ require.IsType(t, &addSchemaUpdate{}, builder.updates[1])
+ require.True(t,
builder.updates[1].(*addSchemaUpdate).Schema.Equals(&tableSchema))
+
+ require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[2])
+ require.Equal(t, -1,
builder.updates[2].(*setCurrentSchemaUpdate).SchemaID)
+
+ require.IsType(t, &addPartitionSpecUpdate{}, builder.updates[3])
+ // For new tables, field IDs should be assigned (1000 for first
partition field)
+ addedSpec := builder.updates[3].(*addPartitionSpecUpdate).Spec
+ require.Equal(t, 0, addedSpec.ID())
+ require.Equal(t, 1, addedSpec.Len())
+ require.Equal(t, 1000, addedSpec.Field(0).FieldID)
+
+ require.IsType(t, &setDefaultSpecUpdate{}, builder.updates[4])
+ require.Equal(t, -1, builder.updates[4].(*setDefaultSpecUpdate).SpecID)
+
+ require.IsType(t, &addSortOrderUpdate{}, builder.updates[5])
+ require.True(t,
builder.updates[5].(*addSortOrderUpdate).SortOrder.Equals(sortOrder))
+
+ require.IsType(t, &setDefaultSortOrderUpdate{}, builder.updates[6])
+ require.Equal(t, -1,
builder.updates[6].(*setDefaultSortOrderUpdate).SortOrderID)
+
+ require.IsType(t, &setPropertiesUpdate{}, builder.updates[7])
+ require.Equal(t, iceberg.Properties{"property 1": "value 1"},
builder.updates[7].(*setPropertiesUpdate).Updates)
+}
+
+func TestNewMetadataChangesUnpartitionedUnsorted(t *testing.T) {
+ tableSchema := *iceberg.NewSchema(0)
+ partitionSpec := *iceberg.UnpartitionedSpec
+ sortOrder := UnsortedSortOrder
+
+ builder, err := NewMetadataBuilder(1)
+ require.NoError(t, err)
+ require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test"))
+ require.NoError(t, builder.AddSchema(&tableSchema))
+ require.NoError(t, builder.SetCurrentSchemaID(-1))
+ require.NoError(t, builder.AddPartitionSpec(&partitionSpec, true))
+ require.NoError(t, builder.SetDefaultSpecID(-1))
+ require.NoError(t, builder.AddSortOrder(&sortOrder))
+ require.NoError(t, builder.SetDefaultSortOrderID(-1))
+
+ _, err = builder.Build()
+ require.NoError(t, err)
+
+ // Verify the expected updates were created (7 updates, no properties)
+ require.Len(t, builder.updates, 7)
+
+ // Check each update type in order
+ require.IsType(t, &setLocationUpdate{}, builder.updates[0])
+ require.Equal(t, "file:///tmp/iceberg-test",
builder.updates[0].(*setLocationUpdate).Location)
+
+ require.IsType(t, &addSchemaUpdate{}, builder.updates[1])
+ require.True(t,
builder.updates[1].(*addSchemaUpdate).Schema.Equals(&tableSchema))
+
+ require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[2])
+ require.Equal(t, -1,
builder.updates[2].(*setCurrentSchemaUpdate).SchemaID)
+
+ require.IsType(t, &addPartitionSpecUpdate{}, builder.updates[3])
+
+ addedSpec := builder.updates[3].(*addPartitionSpecUpdate).Spec
+ require.Equal(t, 0, addedSpec.ID())
+ require.Equal(t, 0, addedSpec.Len()) // Unpartitioned = no fields
+
+ require.IsType(t, &setDefaultSpecUpdate{}, builder.updates[4])
+ require.Equal(t, -1, builder.updates[4].(*setDefaultSpecUpdate).SpecID)
+
+ require.IsType(t, &addSortOrderUpdate{}, builder.updates[5])
+ require.True(t,
builder.updates[5].(*addSortOrderUpdate).SortOrder.Equals(sortOrder))
+
+ require.IsType(t, &setDefaultSortOrderUpdate{}, builder.updates[6])
+ require.Equal(t, -1,
builder.updates[6].(*setDefaultSortOrderUpdate).SortOrderID)
+}
+
+func TestSetCurrentSchemaChangeIsMinusOneIfSchemaWasAddedInThisChange(t
*testing.T) {
+ builder := builderWithoutChanges(2)
+
+ addedSchema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "x", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "y", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 3, Name: "z", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 4, Name: "a", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ )
+
+ err := builder.AddSchema(addedSchema)
+ require.NoError(t, err)
+
+ err = builder.SetCurrentSchemaID(1)
+ require.NoError(t, err)
+
+ _, err = builder.Build()
+ require.NoError(t, err)
+
+ // Should have 2 updates
+ require.Len(t, builder.updates, 2)
+
+ // First update should be AddSchema
+ require.IsType(t, &addSchemaUpdate{}, builder.updates[0])
+ require.True(t,
builder.updates[0].(*addSchemaUpdate).Schema.Equals(addedSchema))
+
+ // Second update should be SetCurrentSchema with schema_id = -1
(indicates last added)
+ require.IsType(t, &setCurrentSchemaUpdate{}, builder.updates[1])
+ require.Equal(t, -1,
builder.updates[1].(*setCurrentSchemaUpdate).SchemaID)
+}
+
+func TestNoMetadataLogForCreateTable(t *testing.T) {
+ tableSchema := schema()
+ partitionSpec := partitionSpec()
+ sortOrder := sortOrder()
+
+ metadata, err := NewMetadata(
+ &tableSchema,
+ &partitionSpec,
+ sortOrder,
+ "file:///tmp/iceberg-test",
+ map[string]string{},
+ )
+
+ require.NoError(t, err)
+ require.NotNil(t, metadata)
+
+ require.Len(t, metadata.(*metadataV2).MetadataLog, 0)
+}
+
+func TestNoMetadataLogEntryForNoPreviousLocation(t *testing.T) {
+ builder := builderWithoutChanges(2)
+ require.NoError(t, builder.SetLoc("file:///tmp/iceberg-test"))
+ metadata, err := builder.Build()
+ require.NoError(t, err)
+ require.NotNil(t, metadata)
+ require.Len(t, metadata.(*metadataV2).MetadataLog, 1)
+
+ newBuilder, err := MetadataBuilderFromBase(metadata, "")
+ require.NoError(t, err)
+
+ err = newBuilder.SetProperties(map[string]string{
+ "foo": "bar",
+ })
+ require.NoError(t, err)
+
+ newMetadata, err := newBuilder.Build()
+ require.NoError(t, err)
+
+ require.Len(t, newMetadata.(*metadataV2).MetadataLog, 1)
+}
+
+func TestFromMetadataGeneratesMetadataLog(t *testing.T) {
+ metadataPath := "s3://bucket/test/location/metadata/metadata1.json"
+
+ tableSchema := schema()
+ partitionSpec := partitionSpec()
+ sortOrder := sortOrder()
+
+ metadata, err := NewMetadata(
+ &tableSchema,
+ &partitionSpec,
+ sortOrder,
+ "file:///tmp/iceberg-test",
+ map[string]string{},
+ )
+ require.NoError(t, err)
+ require.NotNil(t, metadata)
+
+ builder, err := MetadataBuilderFromBase(metadata, metadataPath)
+ require.NoError(t, err)
+
+ err = builder.AddSortOrder(&UnsortedSortOrder)
+ require.NoError(t, err)
+
+ newMetadata, err := builder.Build()
+ require.NoError(t, err)
+
+ require.Len(t, newMetadata.(*metadataV2).MetadataLog, 1)
+ require.Equal(t, metadataPath,
newMetadata.(*metadataV2).MetadataLog[0].MetadataFile)
+}
diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go
index 7b58eb01..3401780f 100644
--- a/table/metadata_internal_test.go
+++ b/table/metadata_internal_test.go
@@ -846,7 +846,7 @@ func TestMetadataV2Serialize(t *testing.T) {
}
func TestMetadataBuilderSetDefaultSpecIDLastPartition(t *testing.T) {
- builder, err := NewMetadataBuilder()
+ builder, err := NewMetadataBuilder(2)
assert.NoError(t, err)
schema := schema()
assert.NoError(t, builder.AddSchema(&schema))
@@ -860,9 +860,8 @@ func TestMetadataBuilderSetDefaultSpecIDLastPartition(t
*testing.T) {
}
func TestMetadataBuilderSetLastAddedSchema(t *testing.T) {
- builder, err := NewMetadataBuilder()
+ builder, err := NewMetadataBuilder(2)
assert.NoError(t, err)
- assert.NoError(t, builder.SetFormatVersion(2))
schema := iceberg.NewSchema(1,
iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.StringType{}, Required: true},
)
@@ -874,6 +873,10 @@ func TestMetadataBuilderSetLastAddedSchema(t *testing.T) {
assert.NoError(t, builder.SetDefaultSpecID(-1))
+ unsorted := UnsortedSortOrder
+ require.NoError(t, builder.AddSortOrder(&unsorted))
+ require.NoError(t, builder.SetDefaultSortOrderID(-1))
+
meta, err := builder.Build()
assert.NoError(t, err)
assert.Equal(t, schema.ID, meta.CurrentSchema().ID)
@@ -881,7 +884,7 @@ func TestMetadataBuilderSetLastAddedSchema(t *testing.T) {
}
func TestMetadataBuilderSchemaIncreasingNumbering(t *testing.T) {
- builder, err := NewMetadataBuilder()
+ builder, err := NewMetadataBuilder(2)
assert.NoError(t, err)
assert.NoError(t, builder.SetFormatVersion(2))
schema := iceberg.NewSchema(1,
@@ -905,7 +908,7 @@ func TestMetadataBuilderSchemaIncreasingNumbering(t
*testing.T) {
}
func TestMetadataBuilderReuseSchema(t *testing.T) {
- builder, err := NewMetadataBuilder()
+ builder, err := NewMetadataBuilder(2)
assert.NoError(t, err)
schema := iceberg.NewSchema(1,
iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.StringType{}, Required: true},
diff --git a/table/partitioned_fanout_writer_test.go
b/table/partitioned_fanout_writer_test.go
index 09c172e7..b7e0f6d8 100644
--- a/table/partitioned_fanout_writer_test.go
+++ b/table/partitioned_fanout_writer_test.go
@@ -106,7 +106,7 @@ func (s *FanoutWriterTestSuite)
testTransformPartition(transform iceberg.Transfo
meta, err := NewMetadata(icebergSchema, &spec, UnsortedSortOrder, loc,
iceberg.Properties{})
s.Require().NoError(err)
- metaBuilder, err := MetadataBuilderFromBase(meta)
+ metaBuilder, err := MetadataBuilderFromBase(meta, "")
s.Require().NoError(err)
args := recordWritingArgs{
diff --git a/table/table.go b/table/table.go
index 04a0803b..bd2ab8cb 100644
--- a/table/table.go
+++ b/table/table.go
@@ -83,7 +83,7 @@ func (t Table) LocationProvider() (LocationProvider, error) {
}
func (t Table) NewTransaction() *Transaction {
- meta, _ := MetadataBuilderFromBase(t.metadata)
+ meta, _ := MetadataBuilderFromBase(t.metadata, t.metadataLocation)
return &Transaction{
tbl: &t,
diff --git a/table/table_test.go b/table/table_test.go
index 01eff6b2..47fbe966 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -789,18 +789,7 @@ func (m *mockedCatalog) LoadTable(ctx context.Context,
ident table.Identifier) (
}
func (m *mockedCatalog) CommitTable(ctx context.Context, ident
table.Identifier, reqs []table.Requirement, updates []table.Update)
(table.Metadata, string, error) {
- bldr, err := table.MetadataBuilderFromBase(m.metadata)
- if err != nil {
- return nil, "", err
- }
-
- for _, u := range updates {
- if err := u.Apply(bldr); err != nil {
- return nil, "", err
- }
- }
-
- meta, err := bldr.Build()
+ meta, err := table.UpdateTableMetadata(m.metadata, updates, "")
if err != nil {
return nil, "", err
}
@@ -1336,7 +1325,7 @@ func (m *DeleteOldMetadataMockedCatalog) LoadTable(ctx
context.Context, ident ta
}
func (m *DeleteOldMetadataMockedCatalog) CommitTable(ctx context.Context,
ident table.Identifier, reqs []table.Requirement, updates []table.Update)
(table.Metadata, string, error) {
- bldr, err := table.MetadataBuilderFromBase(m.metadata)
+ bldr, err := table.MetadataBuilderFromBase(m.metadata, "")
if err != nil {
return nil, "", err
}
diff --git a/table/time_travel_test.go b/table/time_travel_test.go
index dcc23805..d20548d1 100644
--- a/table/time_travel_test.go
+++ b/table/time_travel_test.go
@@ -280,7 +280,7 @@ func createTestMetadata(snapshots []Snapshot, snapshotLog
[]SnapshotLogEntry) (M
// If we have custom snapshots or logs, we need to modify the metadata
if len(snapshots) > 0 || len(snapshotLog) > 0 {
- builder, err := MetadataBuilderFromBase(meta)
+ builder, err := MetadataBuilderFromBase(meta, "")
if err != nil {
return nil, err
}