This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 072c1efd feat: Wire V3 snapshot producer to row-lineage state (#728)
072c1efd is described below

commit 072c1efd12a1804a37d9e6c6672a64eb50979829
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Wed Feb 18 18:15:55 2026 +0100

    feat: Wire V3 snapshot producer to row-lineage state (#728)
    
    this PR closes the missing v3 commit-path wiring for row lineage in
    iceberg-go (part of #727):
    snapshot lineage fields were validated downstream but not fully
    populated in snapshot producer commit flow.
    
    ###  What changed
    
    v3 snapshot producer now:
    
    - sets snapshot first-row-id from table next-row-id,
    - computes snapshot added-rows from manifest-list writer assigned row-id
    delta (writer.nextRowID - firstRowID),
    - keeps v1/v2 flow unchanged.
    
    Added tests for:
    
    - v3 single-commit lineage
    - two sequential commits (monotonic, gap-free lineage)
    - merge path where assigned delta includes existing rows
    - manifest-list writer delta behavior including non-data manifest
    exclusion
    
    ### Correctness notes
    
    This aligns with strict lineage advancement in metadata builder
    (next-row-id += snapshot.added-rows).
    
    Behavior is aligned with Iceberg reference implementations’ commit-time
    row-lineage model (especially Java writer-delta semantics).
---
 manifest_test.go                 |  43 ++++++++++++
 table/metadata.go                |   5 ++
 table/snapshot_producers.go      |  31 +++++++--
 table/snapshot_producers_test.go | 138 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 212 insertions(+), 5 deletions(-)

diff --git a/manifest_test.go b/manifest_test.go
index 1192474e..87dc4efc 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -1401,10 +1401,53 @@ func (m *ManifestTestSuite) 
TestV3ManifestListWriterRowIDTracking() {
        // Expected: 5000 + 1500 + 2300 = 8800
        expectedNextRowID := firstRowID + 1500 + 2300
        m.EqualValues(expectedNextRowID, *writer.NextRowID())
+       // Assigned row-id delta (for snapshot.added-rows) = 1500 + 2300 = 3800
+       m.EqualValues(int64(3800), *writer.NextRowID()-firstRowID)
        err = writer.Close()
        m.Require().NoError(err)
 }
 
+func (m *ManifestTestSuite) TestV3ManifestListWriterAssignedRowIDDelta() {
+       // Assigned row-id delta = sum of (existing+added) for all data 
manifests in list.
+       var buf bytes.Buffer
+       commitSnapID := int64(100)
+       otherSnapID := int64(99)
+       firstRowID := int64(0)
+       sequenceNum := int64(1)
+       writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, 
firstRowID, nil)
+       m.Require().NoError(err)
+       manifests := []ManifestFile{
+               NewManifestFile(3, "current.avro", 100, 1, 
commitSnapID).AddedRows(10).ExistingRows(5).Build(),
+               NewManifestFile(3, "carried.avro", 200, 1, 
otherSnapID).SequenceNum(0, 0).AddedRows(100).ExistingRows(50).Build(),
+               NewManifestFile(3, "current2.avro", 300, 1, 
commitSnapID).AddedRows(20).Build(),
+       }
+       err = writer.AddManifests(manifests)
+       m.Require().NoError(err)
+       // Delta = 15 + 150 + 20 = 185 (all data manifests get row-id range)
+       m.EqualValues(185, *writer.NextRowID()-firstRowID)
+       m.Require().NoError(writer.Close())
+}
+
+func (m *ManifestTestSuite) 
TestV3ManifestListWriterDeltaIgnoresNonDataManifests() {
+       // Only data manifests get row-id assignment; delete manifests must not 
affect delta.
+       var buf bytes.Buffer
+       commitSnapID := int64(1)
+       firstRowID := int64(100)
+       sequenceNum := int64(1)
+       writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum, 
firstRowID, nil)
+       m.Require().NoError(err)
+       manifests := []ManifestFile{
+               NewManifestFile(3, "data.avro", 100, 1, 
commitSnapID).AddedRows(10).ExistingRows(5).Build(),
+               NewManifestFile(3, "deletes.avro", 200, 1, 
commitSnapID).Content(ManifestContentDeletes).AddedRows(100).Build(),
+               NewManifestFile(3, "data2.avro", 300, 1, 
commitSnapID).AddedRows(20).Build(),
+       }
+       err = writer.AddManifests(manifests)
+       m.Require().NoError(err)
+       // Delta = 15 + 20 = 35 (only data manifests; delete manifest ignored)
+       m.EqualValues(35, *writer.NextRowID()-firstRowID)
+       m.Require().NoError(writer.Close())
+}
+
 func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() {
        // Test v3writerImpl.prepareEntry sequence number validation logic
        v3Writer := v3writerImpl{}
diff --git a/table/metadata.go b/table/metadata.go
index 92b976ca..4df78e32 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -480,6 +480,11 @@ func (b *MetadataBuilder) currentNextRowID() int64 {
        return initialRowID
 }
 
+// NextRowID returns the next available row ID (for v3 row lineage). For v1/v2 
returns 0.
+func (b *MetadataBuilder) NextRowID() int64 {
+       return b.currentNextRowID()
+}
+
 func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) error {
        if b.currentSnapshotID != nil && slices.Contains(snapshotIds, 
*b.currentSnapshotID) {
                return errors.New("current snapshot cannot be removed")
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 8022f5a4..c310499f 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -703,17 +703,34 @@ func (sp *snapshotProducer) commit() (_ []Update, _ 
[]Requirement, err error) {
                parentSnapshot = &sp.parentSnapshotID
        }
 
+       firstRowID := int64(0)
+       var addedRows int64
+
        out, err := sp.io.Create(manifestListFilePath)
        if err != nil {
                return nil, nil, err
        }
        defer internal.CheckedClose(out, &err)
 
-       // TODO: Implement v3 here
-       err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
-               sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests)
-       if err != nil {
-               return nil, nil, err
+       if sp.txn.meta.formatVersion == 3 {
+               firstRowID = sp.txn.meta.NextRowID()
+               writer, err := iceberg.NewManifestListWriterV3(out, 
sp.snapshotID, nextSequence, firstRowID, parentSnapshot)
+               if err != nil {
+                       return nil, nil, err
+               }
+               defer internal.CheckedClose(writer, &err)
+               if err = writer.AddManifests(newManifests); err != nil {
+                       return nil, nil, err
+               }
+               if writer.NextRowID() != nil {
+                       addedRows = *writer.NextRowID() - firstRowID
+               }
+       } else {
+               err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
+                       sp.snapshotID, parentSnapshot, &nextSequence, 
firstRowID, newManifests)
+               if err != nil {
+                       return nil, nil, err
+               }
        }
 
        snapshot := Snapshot{
@@ -725,6 +742,10 @@ func (sp *snapshotProducer) commit() (_ []Update, _ 
[]Requirement, err error) {
                SchemaID:         &sp.txn.meta.currentSchemaID,
                TimestampMs:      time.Now().UnixMilli(),
        }
+       if sp.txn.meta.formatVersion == 3 {
+               snapshot.FirstRowID = &firstRowID
+               snapshot.AddedRows = &addedRows
+       }
 
        return []Update{
                        NewAddSnapshotUpdate(&snapshot),
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index 1c6922f1..f2e55631 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -97,6 +97,22 @@ func (m *memIO) Remove(name string) error {
        return nil
 }
 
+// createTestTransactionWithMemIO creates a transaction using the io package's 
mem blob FS
+// so that Create() output is persisted and can be read back (e.g. for 
sequential commits).
+func createTestTransactionWithMemIO(t *testing.T, spec iceberg.PartitionSpec) 
(*Transaction, iceio.WriteFileIO) {
+       t.Helper()
+       ctx := context.Background()
+       fs, err := iceio.LoadFS(ctx, nil, "mem://default/table-location")
+       require.NoError(t, err, "LoadFS mem")
+       wfs := fs.(iceio.WriteFileIO)
+       schema := simpleSchema()
+       meta, err := NewMetadata(schema, &spec, UnsortedSortOrder, 
"mem://default/table-location", nil)
+       require.NoError(t, err, "new metadata")
+       tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json", 
func(context.Context) (iceio.IO, error) { return fs, nil }, nil)
+
+       return tbl.NewTransaction(), wfs
+}
+
 func manifestHeaderSize(t *testing.T, version int, spec iceberg.PartitionSpec, 
schema *iceberg.Schema) int {
        t.Helper()
 
@@ -161,6 +177,128 @@ func createTestTransaction(t *testing.T, io iceio.IO, 
spec iceberg.PartitionSpec
        return tbl.NewTransaction()
 }
 
+// TestCommitV3RowLineage ensures v3 snapshot commits set FirstRowID and 
AddedRows
+// on the snapshot for row lineage, and that applying updates advances 
next-row-id correctly.
+func TestCommitV3RowLineage(t *testing.T) {
+       trackIO := newTrackingIO()
+       spec := iceberg.NewPartitionSpec()
+       txn := createTestTransaction(t, trackIO, spec)
+       txn.meta.formatVersion = 3
+
+       // Single data file with record count 1 (newTestDataFile uses 1, 1 for 
record count and file size).
+       const expectedAddedRows = 1
+       sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil)
+       df := newTestDataFile(t, spec, "file://data.parquet", nil)
+       sp.appendDataFile(df)
+
+       updates, reqs, err := sp.commit()
+       require.NoError(t, err, "commit should succeed")
+       require.Len(t, updates, 2, "expected AddSnapshot and SetSnapshotRef 
updates")
+       addSnap, ok := updates[0].(*addSnapshotUpdate)
+       require.True(t, ok, "first update must be AddSnapshot")
+
+       // Exact snapshot lineage: first-row-id 0 for new table, added-rows 
matches appended file(s).
+       require.NotNil(t, addSnap.Snapshot.FirstRowID, "v3 snapshot must have 
first-row-id")
+       require.NotNil(t, addSnap.Snapshot.AddedRows, "v3 snapshot must have 
added-rows")
+       require.Equal(t, int64(0), *addSnap.Snapshot.FirstRowID, "first-row-id 
should be table next-row-id at commit")
+       require.Equal(t, int64(expectedAddedRows), *addSnap.Snapshot.AddedRows, 
"added-rows should match appended data file record count")
+
+       // Apply updates and verify metadata next-row-id advances monotonically.
+       err = txn.apply(updates, reqs)
+       require.NoError(t, err, "apply should succeed")
+       meta, err := txn.meta.Build()
+       require.NoError(t, err, "build metadata")
+       require.Equal(t, int64(expectedAddedRows), meta.NextRowID(), 
"next-row-id should equal first-row-id + added-rows")
+}
+
+// TestCommitV3RowLineageTwoSequentialCommits runs two commits and asserts 
monotonic,
+// gap-free first-row-id / next-row-id progression.
+func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       ident := Identifier{"db", "tbl"}
+       txn, memIO := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       // First commit: new table, append one file (1 row).
+       sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
+       sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", 
nil))
+       updates1, reqs1, err := sp1.commit()
+       require.NoError(t, err, "first commit should succeed")
+       addSnap1, ok := updates1[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "first 
snapshot first-row-id")
+       require.Equal(t, int64(1), *addSnap1.Snapshot.AddedRows, "first 
snapshot added-rows")
+       err = txn.apply(updates1, reqs1)
+       require.NoError(t, err, "first apply should succeed")
+       meta1, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, int64(1), meta1.NextRowID(), "next-row-id after first 
commit")
+
+       // Second commit: fast append one more file. Carried manifest already 
has first_row_id, so only new manifest gets row IDs; delta = 1.
+       tbl2 := New(ident, meta1, "metadata.json", func(context.Context) 
(iceio.IO, error) { return memIO, nil }, nil)
+       txn2 := tbl2.NewTransaction()
+       txn2.meta.formatVersion = 3
+       sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
+       sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", 
nil))
+       updates2, reqs2, err := sp2.commit()
+       require.NoError(t, err, "second commit should succeed")
+       addSnap2, ok := updates2[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "second 
snapshot first-row-id continues from first next-row-id")
+       require.Equal(t, int64(1), *addSnap2.Snapshot.AddedRows, "only new 
manifest gets row IDs assigned")
+
+       err = txn2.apply(updates2, reqs2)
+       require.NoError(t, err, "second apply should succeed")
+       meta2, err := txn2.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, int64(2), meta2.NextRowID(), "next-row-id = 1 + 1 
(gap-free)")
+}
+
+// TestCommitV3RowLineageDeltaIncludesExistingRows uses merge append so one 
manifest
+// has both existing and added rows; verifies assigned delta includes 
ExistingRowsCount
+// and metadata next-row-id matches.
+func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       ident := Identifier{"db", "tbl"}
+       txn, memIO := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       // First commit: one file (1 row).
+       sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
+       sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet", 
nil))
+       updates1, reqs1, err := sp1.commit()
+       require.NoError(t, err, "first commit should succeed")
+       err = txn.apply(updates1, reqs1)
+       require.NoError(t, err)
+       meta1, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, int64(1), meta1.NextRowID())
+
+       // Second commit: merge append so the two data manifests (existing + 
new) are merged into one with 1 existing + 1 added row.
+       tbl2 := New(ident, meta1, "metadata.json", func(context.Context) 
(iceio.IO, error) { return memIO, nil }, nil)
+       txn2 := tbl2.NewTransaction()
+       txn2.meta.formatVersion = 3
+       if txn2.meta.props == nil {
+               txn2.meta.props = make(iceberg.Properties)
+       }
+       txn2.meta.props[ManifestMergeEnabledKey] = "true"
+       txn2.meta.props[ManifestMinMergeCountKey] = "2"
+       sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
+       sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet", 
nil))
+       updates2, reqs2, err := sp2.commit()
+       require.NoError(t, err, "second commit (merge) should succeed")
+       addSnap2, ok := updates2[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "first-row-id 
continues from first commit")
+       require.Equal(t, int64(2), *addSnap2.Snapshot.AddedRows, "assigned 
delta = existing (1) + added (1) in merged manifest")
+
+       err = txn2.apply(updates2, reqs2)
+       require.NoError(t, err)
+       meta2, err := txn2.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id = 
first-row-id + assigned delta (1+2)")
+}
+
 func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
        spec := partitionedSpec()
        schema := simpleSchema()

Reply via email to