This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch fix-manifest-entry-partition-schema
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git

commit 40e63d590f68068d444d0bdc13db330c873fa720
Author: Arnaud Briche <briche.arn...@gmail.com>
AuthorDate: Thu Feb 20 11:41:01 2025 +0100

    - add new WriteManifestEntries to writes Manifest files with proper 
metadata: https://iceberg.apache.org/spec/#manifests
    - fix a few bug in Metadata builder that triggers on table creation
---
 manifest.go           | 83 +++++++++++++++++++++++++++++++++++++++++++++++++--
 schema_conversions.go |  2 +-
 table/metadata.go     | 23 ++++++++++----
 utils.go              |  7 ++---
 4 files changed, 102 insertions(+), 13 deletions(-)

diff --git a/manifest.go b/manifest.go
index 1e2ea76..c1c77e7 100644
--- a/manifest.go
+++ b/manifest.go
@@ -18,8 +18,10 @@
 package iceberg
 
 import (
+       "encoding/json"
        "fmt"
        "io"
+       "strconv"
        "sync"
        "time"
 
@@ -40,6 +42,17 @@ const (
        ManifestContentDeletes ManifestContent = 1
 )
 
+func (m ManifestContent) String() string {
+       switch m {
+       case ManifestContentData:
+               return "data"
+       case ManifestContentDeletes:
+               return "deletes"
+       default:
+               return "UNKNOWN"
+       }
+}
+
 type FieldSummary struct {
        ContainsNull bool    `avro:"contains_null"`
        ContainsNaN  *bool   `avro:"contains_nan"`
@@ -643,7 +656,69 @@ func WriteManifestList(out io.Writer, files 
[]ManifestFile) error {
                return fmt.Errorf("%w: non-recognized version %d", 
ErrInvalidArgument, version)
        }
 
-       return avroEncode(sch, version, files, out)
+       return avroEncode(
+               sch,
+               files,
+               map[string][]byte{
+                       "format-version": []byte(strconv.Itoa(version)),
+               },
+               out,
+       )
+}
+
+// WriteManifestEntries writes a list of manifest entries to an avro file.
+func WriteManifestEntries(
+       out io.Writer,
+       content ManifestContent,
+       partitionSpec PartitionSpec,
+       tableSchema *Schema,
+       entries []ManifestEntry,
+       version int,
+) error {
+       var partSchema, err = TypeToAvroSchema("r102", 
partitionSpec.PartitionType(tableSchema))
+
+       if err != nil {
+               return err
+       }
+
+       var manSchema avro.Schema
+       switch version {
+       case 1:
+               manSchema = internal.MustNewManifestEntryV1Schema(partSchema)
+       case 2:
+               manSchema = internal.MustNewManifestEntryV2Schema(partSchema)
+       default:
+               return fmt.Errorf("%w: non-recognized version %d", 
ErrInvalidArgument, version)
+       }
+
+       var partSpecFields = partSchema.(*avro.RecordSchema).Fields()
+
+       if partSpecFields == nil {
+               partSpecFields = []*avro.Field{}
+       }
+
+       partSpecFieldsJson, err := json.Marshal(partSpecFields)
+
+       if err != nil {
+               return err
+       }
+
+       tableSchemaJson, err := json.Marshal(tableSchema)
+
+       if err != nil {
+               return err
+       }
+
+       var md = map[string][]byte{
+               "schema":            tableSchemaJson,
+               "schema-id":         []byte(strconv.Itoa(tableSchema.ID)),
+               "partition-spec":    []byte(partSpecFieldsJson),
+               "partition-spec-id": []byte(strconv.Itoa(partitionSpec.ID())),
+               "format-version":    []byte(strconv.Itoa(version)),
+               "content":           []byte(content.String()),
+       }
+
+       return avroEncode(manSchema, entries, md, out)
 }
 
 func writeManifestEntries(out io.Writer, partitionType *StructType, entries 
[]ManifestEntry, version int) error {
@@ -663,7 +738,11 @@ func writeManifestEntries(out io.Writer, partitionType 
*StructType, entries []Ma
                return fmt.Errorf("%w: non-recognized version %d", 
ErrInvalidArgument, version)
        }
 
-       return avroEncode(sch, version, entries, out)
+       var md = map[string][]byte{
+               "format-version": []byte(strconv.Itoa(version)),
+       }
+
+       return avroEncode(sch, entries, md, out)
 }
 
 // ManifestEntryStatus defines constants for the entry status of
diff --git a/schema_conversions.go b/schema_conversions.go
index 5d18734..f59d60b 100644
--- a/schema_conversions.go
+++ b/schema_conversions.go
@@ -303,7 +303,7 @@ func TypeToAvroSchema(recordName string, t Type, opts 
...avro.SchemaOption) (avr
                )
 
        case *StructType:
-               var aFields []*avro.Field
+               var aFields = make([]*avro.Field, 0)
 
                for _, field := range t.Fields() {
                        aField, err := NestedFieldToAvroField(field)
diff --git a/table/metadata.go b/table/metadata.go
index bd4896d..43a348c 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -176,10 +176,13 @@ func MetadataBuilderFromBase(metadata Metadata) 
(*MetadataBuilder, error) {
        b.lastPartitionID = metadata.LastPartitionSpecID()
        b.props = metadata.Properties()
        b.snapshotList = metadata.Snapshots()
-       b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
        b.sortOrderList = metadata.SortOrders()
        b.defaultSortOrderID = metadata.DefaultSortOrder()
 
+       if metadata.CurrentSnapshot() != nil {
+               b.currentSnapshotID = &metadata.CurrentSnapshot().SnapshotID
+       }
+
        b.refs = make(map[string]SnapshotRef)
        for name, ref := range metadata.Refs() {
                b.refs[name] = ref
@@ -262,8 +265,8 @@ func (b *MetadataBuilder) AddSnapshot(snapshot *Snapshot) 
(*MetadataBuilder, err
                return nil, fmt.Errorf("can't add snapshot with id %d, already 
exists", snapshot.SnapshotID)
        } else if b.formatVersion == 2 &&
                snapshot.SequenceNumber > 0 &&
-               snapshot.SequenceNumber <= *b.lastSequenceNumber &&
-               snapshot.ParentSnapshotID != nil {
+               snapshot.ParentSnapshotID != nil &&
+               snapshot.SequenceNumber <= *b.lastSequenceNumber {
                return nil, fmt.Errorf("can't add snapshot with sequence number 
%d, must be > than last sequence number %d",
                        snapshot.SequenceNumber, b.lastSequenceNumber)
        }
@@ -490,7 +493,7 @@ func (b *MetadataBuilder) SetSnapshotRef(
                return nil, fmt.Errorf("can't set snapshot ref %s to unknown 
snapshot %d: %w", name, snapshotID, err)
        }
 
-       if refType == MainBranch {
+       if name == MainBranch {
                b.updates = append(b.updates, NewSetSnapshotRefUpdate(name, 
snapshotID, refType, maxRefAgeMs, maxSnapshotAgeMs, minSnapshotsToKeep))
                b.currentSnapshotID = &snapshotID
                b.snapshotLog = append(b.snapshotLog, SnapshotLogEntry{
@@ -521,6 +524,10 @@ func (b *MetadataBuilder) SetUUID(uuid uuid.UUID) 
(*MetadataBuilder, error) {
 }
 
 func (b *MetadataBuilder) buildCommonMetadata() *commonMetadata {
+       if b.lastUpdatedMS == 0 {
+               b.lastUpdatedMS = time.Now().UnixMilli()
+       }
+
        return &commonMetadata{
                FormatVersion:      b.formatVersion,
                UUID:               b.uuid,
@@ -609,8 +616,14 @@ func (b *MetadataBuilder) Build() (Metadata, error) {
                }, nil
 
        case 2:
+               var lastSequenceNumber int64
+
+               if b.lastSequenceNumber != nil {
+                       lastSequenceNumber = *b.lastSequenceNumber
+               }
+
                return &metadataV2{
-                       LastSequenceNumber: *b.lastSequenceNumber,
+                       LastSequenceNumber: lastSequenceNumber,
                        commonMetadata:     *common,
                }, nil
 
diff --git a/utils.go b/utils.go
index dd6abf3..017ec5b 100644
--- a/utils.go
+++ b/utils.go
@@ -24,7 +24,6 @@ import (
        "io"
        "maps"
        "runtime/debug"
-       "strconv"
        "strings"
 
        "github.com/hamba/avro/v2"
@@ -218,13 +217,11 @@ func Difference(a, b []string) []string {
        return diff
 }
 
-func avroEncode[T any](sch avro.Schema, version int, vals []T, out io.Writer) 
error {
+func avroEncode[T any](sch avro.Schema, vals []T, md map[string][]byte, out 
io.Writer) error {
        enc, err := ocf.NewEncoderWithSchema(
                sch,
                out,
-               ocf.WithMetadata(map[string][]byte{
-                       "format-version": []byte(strconv.Itoa(version)),
-               }),
+               ocf.WithMetadata(md),
                ocf.WithCodec(ocf.Deflate),
        )
        if err != nil {

Reply via email to