This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch fix-manifest-entry-partition-schema in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
commit 40e63d590f68068d444d0bdc13db330c873fa720 Author: Arnaud Briche <briche.arn...@gmail.com> AuthorDate: Thu Feb 20 11:41:01 2025 +0100 - add new WriteManifestEntries to writes Manifest files with proper metadata: https://iceberg.apache.org/spec/#manifests - fix a few bug in Metadata builder that triggers on table creation --- manifest.go | 83 +++++++++++++++++++++++++++++++++++++++++++++++++-- schema_conversions.go | 2 +- table/metadata.go | 23 ++++++++++---- utils.go | 7 ++--- 4 files changed, 102 insertions(+), 13 deletions(-) diff --git a/manifest.go b/manifest.go index 1e2ea76..c1c77e7 100644 --- a/manifest.go +++ b/manifest.go @@ -18,8 +18,10 @@ package iceberg import ( + "encoding/json" "fmt" "io" + "strconv" "sync" "time" @@ -40,6 +42,17 @@ const ( ManifestContentDeletes ManifestContent = 1 ) +func (m ManifestContent) String() string { + switch m { + case ManifestContentData: + return "data" + case ManifestContentDeletes: + return "deletes" + default: + return "UNKNOWN" + } +} + type FieldSummary struct { ContainsNull bool `avro:"contains_null"` ContainsNaN *bool `avro:"contains_nan"` @@ -643,7 +656,69 @@ func WriteManifestList(out io.Writer, files []ManifestFile) error { return fmt.Errorf("%w: non-recognized version %d", ErrInvalidArgument, version) } - return avroEncode(sch, version, files, out) + return avroEncode( + sch, + files, + map[string][]byte{ + "format-version": []byte(strconv.Itoa(version)), + }, + out, + ) +} + +// WriteManifestEntries writes a list of manifest entries to an avro file. +func WriteManifestEntries( + out io.Writer, + content ManifestContent, + partitionSpec PartitionSpec, + tableSchema *Schema, + entries []ManifestEntry, + version int, +) error { + var partSchema, err = TypeToAvroSchema("r102", partitionSpec.PartitionType(tableSchema)) + + if err != nil { + return err + } + + var manSchema avro.Schema + switch version { + case 1: + manSchema = internal.MustNewManifestEntryV1Schema(partSchema) + case 2: + manSchema = internal.MustNewManifestEntryV2Schema(partSchema) + default: + return fmt.Errorf("%w: non-recognized version %d", ErrInvalidArgument, version) + } + + var partSpecFields = partSchema.(*avro.RecordSchema).Fields() + + if partSpecFields == nil { + partSpecFields = []*avro.Field{} + } + + partSpecFieldsJson, err := json.Marshal(partSpecFields) + + if err != nil { + return err + } + + tableSchemaJson, err := json.Marshal(tableSchema) + + if err != nil { + return err + } + + var md = map[string][]byte{ + "schema": tableSchemaJson, + "schema-id": []byte(strconv.Itoa(tableSchema.ID)), + "partition-spec": []byte(partSpecFieldsJson), + "partition-spec-id": []byte(strconv.Itoa(partitionSpec.ID())), + "format-version": []byte(strconv.Itoa(version)), + "content": []byte(content.String()), + } + + return avroEncode(manSchema, entries, md, out) } func writeManifestEntries(out io.Writer, partitionType *StructType, entries []ManifestEntry, version int) error { @@ -663,7 +738,11 @@ func writeManifestEntries(out io.Writer, partitionType *StructType, entries []Ma return fmt.Errorf("%w: non-recognized version %d", ErrInvalidArgument, version) } - return avroEncode(sch, version, entries, out) + var md = map[string][]byte{ + "format-version": []byte(strconv.Itoa(version)), + } + + return avroEncode(sch, entries, md, out) } // ManifestEntryStatus defines constants for the entry status of diff --git a/schema_conversions.go b/schema_conversions.go index 5d18734..f59d60b 100644 --- a/schema_conversions.go +++ b/schema_conversions.go @@ -303,7 +303,7 @@ func TypeToAvroSchema(recordName string, t Type, opts ...avro.SchemaOption) (avr ) case *StructType: - var aFields []*avro.Field + var aFields = make([]*avro.Field, 0) for _, field := range t.Fields() { aField, err := NestedFieldToAvroField(field) diff --git a/table/metadata.go b/table/metadata.go index bd4896d..43a348c 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -176,10 +176,13 @@ func MetadataBuilderFromBase(metadata Metadata) (*MetadataBuilder, error) { b.lastPartitionID = metadata.LastPartitionSpecID() b.props = metadata.Properties() b.snapshotList = metadata.Snapshots() - b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID b.sortOrderList = metadata.SortOrders() b.defaultSortOrderID = metadata.DefaultSortOrder() + if metadata.CurrentSnapshot() != nil { + b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID + } + b.refs = make(map[string]SnapshotRef) for name, ref := range metadata.Refs() { b.refs[name] = ref @@ -262,8 +265,8 @@ func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) (*MetadataBuilder, err return nil, fmt.Errorf("can't add snapshot with id %d, already exists", snapshot.SnapshotID) } else if b.formatVersion == 2 && snapshot.SequenceNumber > 0 && - snapshot.SequenceNumber <= *b.lastSequenceNumber && - snapshot.ParentSnapshotID != nil { + snapshot.ParentSnapshotID != nil && + snapshot.SequenceNumber <= *b.lastSequenceNumber { return nil, fmt.Errorf("can't add snapshot with sequence number %d, must be > than last sequence number %d", snapshot.SequenceNumber, b.lastSequenceNumber) } @@ -490,7 +493,7 @@ func (b *MetadataBuilder) SetSnapshotRef( return nil, fmt.Errorf("can't set snapshot ref %s to unknown snapshot %d: %w", name, snapshotID, err) } - if refType == MainBranch { + if name == MainBranch { b.updates = append(b.updates, NewSetSnapshotRefUpdate(name, snapshotID, refType, maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep)) b.currentSnapshotID = &snapshotID b.snapshotLog = append(b.snapshotLog, SnapshotLogEntry{ @@ -521,6 +524,10 @@ func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) (*MetadataBuilder, error) { } func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata { + if b.lastUpdatedMS == 0 { + b.lastUpdatedMS = time.Now().UnixMilli() + } + return &commonMetadata{ FormatVersion: b.formatVersion, UUID: b.uuid, @@ -609,8 +616,14 @@ func (b *MetadataBuilder) Build() (Metadata, error) { }, nil case 2: + var lastSequenceNumber int64 + + if b.lastSequenceNumber != nil { + lastSequenceNumber = *b.lastSequenceNumber + } + return &metadataV2{ - LastSequenceNumber: *b.lastSequenceNumber, + LastSequenceNumber: lastSequenceNumber, commonMetadata: *common, }, nil diff --git a/utils.go b/utils.go index dd6abf3..017ec5b 100644 --- a/utils.go +++ b/utils.go @@ -24,7 +24,6 @@ import ( "io" "maps" "runtime/debug" - "strconv" "strings" "github.com/hamba/avro/v2" @@ -218,13 +217,11 @@ func Difference(a, b []string) []string { return diff } -func avroEncode[T any](sch avro.Schema, version int, vals []T, out io.Writer) error { +func avroEncode[T any](sch avro.Schema, vals []T, md map[string][]byte, out io.Writer) error { enc, err := ocf.NewEncoderWithSchema( sch, out, - ocf.WithMetadata(map[string][]byte{ - "format-version": []byte(strconv.Itoa(version)), - }), + ocf.WithMetadata(md), ocf.WithCodec(ocf.Deflate), ) if err != nil {