This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 072c1efd feat: Wire V3 snapshot producer to row-lineage state (#728)
072c1efd is described below
commit 072c1efd12a1804a37d9e6c6672a64eb50979829
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Wed Feb 18 18:15:55 2026 +0100
feat: Wire V3 snapshot producer to row-lineage state (#728)
this PR closes the missing v3 commit-path wiring for row lineage in
iceberg-go (part of #727):
snapshot lineage fields were validated downstream but not fully
populated in snapshot producer commit flow.
### What changed
v3 snapshot producer now:
- sets snapshot first-row-id from table next-row-id,
- computes snapshot added-rows from manifest-list writer assigned row-id
delta (writer.nextRowID - firstRowID),
- keeps v1/v2 flow unchanged.
Added tests for:
- v3 single-commit lineage
- two sequential commits (monotonic, gap-free lineage)
- merge path where assigned delta includes existing rows
- manifest-list writer delta behavior including non-data manifest
exclusion
### Correctness notes
This aligns with strict lineage advancement in metadata builder
(next-row-id += snapshot.added-rows).
Behavior is aligned with Iceberg reference implementations’ commit-time
row-lineage model (especially Java writer-delta semantics).
---
manifest_test.go | 43 ++++++++++++
table/metadata.go | 5 ++
table/snapshot_producers.go | 31 +++++++--
table/snapshot_producers_test.go | 138 +++++++++++++++++++++++++++++++++++++++
4 files changed, 212 insertions(+), 5 deletions(-)
diff --git a/manifest_test.go b/manifest_test.go
index 1192474e..87dc4efc 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -1401,10 +1401,53 @@ func (m *ManifestTestSuite)
TestV3ManifestListWriterRowIDTracking() {
// Expected: 5000 + 1500 + 2300 = 8800
expectedNextRowID := firstRowID + 1500 + 2300
m.EqualValues(expectedNextRowID, *writer.NextRowID())
+ // Assigned row-id delta (for snapshot.added-rows) = 1500 + 2300 = 3800
+ m.EqualValues(int64(3800), *writer.NextRowID()-firstRowID)
err = writer.Close()
m.Require().NoError(err)
}
+func (m *ManifestTestSuite) TestV3ManifestListWriterAssignedRowIDDelta() {
+ // Assigned row-id delta = sum of (existing+added) for all data
manifests in list.
+ var buf bytes.Buffer
+ commitSnapID := int64(100)
+ otherSnapID := int64(99)
+ firstRowID := int64(0)
+ sequenceNum := int64(1)
+ writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum,
firstRowID, nil)
+ m.Require().NoError(err)
+ manifests := []ManifestFile{
+ NewManifestFile(3, "current.avro", 100, 1,
commitSnapID).AddedRows(10).ExistingRows(5).Build(),
+ NewManifestFile(3, "carried.avro", 200, 1,
otherSnapID).SequenceNum(0, 0).AddedRows(100).ExistingRows(50).Build(),
+ NewManifestFile(3, "current2.avro", 300, 1,
commitSnapID).AddedRows(20).Build(),
+ }
+ err = writer.AddManifests(manifests)
+ m.Require().NoError(err)
+ // Delta = 15 + 150 + 20 = 185 (all data manifests get row-id range)
+ m.EqualValues(185, *writer.NextRowID()-firstRowID)
+ m.Require().NoError(writer.Close())
+}
+
+func (m *ManifestTestSuite)
TestV3ManifestListWriterDeltaIgnoresNonDataManifests() {
+ // Only data manifests get row-id assignment; delete manifests must not
affect delta.
+ var buf bytes.Buffer
+ commitSnapID := int64(1)
+ firstRowID := int64(100)
+ sequenceNum := int64(1)
+ writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum,
firstRowID, nil)
+ m.Require().NoError(err)
+ manifests := []ManifestFile{
+ NewManifestFile(3, "data.avro", 100, 1,
commitSnapID).AddedRows(10).ExistingRows(5).Build(),
+ NewManifestFile(3, "deletes.avro", 200, 1,
commitSnapID).Content(ManifestContentDeletes).AddedRows(100).Build(),
+ NewManifestFile(3, "data2.avro", 300, 1,
commitSnapID).AddedRows(20).Build(),
+ }
+ err = writer.AddManifests(manifests)
+ m.Require().NoError(err)
+ // Delta = 15 + 20 = 35 (only data manifests; delete manifest ignored)
+ m.EqualValues(35, *writer.NextRowID()-firstRowID)
+ m.Require().NoError(writer.Close())
+}
+
func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() {
// Test v3writerImpl.prepareEntry sequence number validation logic
v3Writer := v3writerImpl{}
diff --git a/table/metadata.go b/table/metadata.go
index 92b976ca..4df78e32 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -480,6 +480,11 @@ func (b *MetadataBuilder) currentNextRowID() int64 {
return initialRowID
}
+// NextRowID returns the next available row ID (for v3 row lineage). For v1/v2
returns 0.
+func (b *MetadataBuilder) NextRowID() int64 {
+ return b.currentNextRowID()
+}
+
func (b *MetadataBuilder) RemoveSnapshots(snapshotIds []int64) error {
if b.currentSnapshotID != nil && slices.Contains(snapshotIds,
*b.currentSnapshotID) {
return errors.New("current snapshot cannot be removed")
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 8022f5a4..c310499f 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -703,17 +703,34 @@ func (sp *snapshotProducer) commit() (_ []Update, _
[]Requirement, err error) {
parentSnapshot = &sp.parentSnapshotID
}
+ firstRowID := int64(0)
+ var addedRows int64
+
out, err := sp.io.Create(manifestListFilePath)
if err != nil {
return nil, nil, err
}
defer internal.CheckedClose(out, &err)
- // TODO: Implement v3 here
- err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
- sp.snapshotID, parentSnapshot, &nextSequence, 0, newManifests)
- if err != nil {
- return nil, nil, err
+ if sp.txn.meta.formatVersion == 3 {
+ firstRowID = sp.txn.meta.NextRowID()
+ writer, err := iceberg.NewManifestListWriterV3(out,
sp.snapshotID, nextSequence, firstRowID, parentSnapshot)
+ if err != nil {
+ return nil, nil, err
+ }
+ defer internal.CheckedClose(writer, &err)
+ if err = writer.AddManifests(newManifests); err != nil {
+ return nil, nil, err
+ }
+ if writer.NextRowID() != nil {
+ addedRows = *writer.NextRowID() - firstRowID
+ }
+ } else {
+ err = iceberg.WriteManifestList(sp.txn.meta.formatVersion, out,
+ sp.snapshotID, parentSnapshot, &nextSequence,
firstRowID, newManifests)
+ if err != nil {
+ return nil, nil, err
+ }
}
snapshot := Snapshot{
@@ -725,6 +742,10 @@ func (sp *snapshotProducer) commit() (_ []Update, _
[]Requirement, err error) {
SchemaID: &sp.txn.meta.currentSchemaID,
TimestampMs: time.Now().UnixMilli(),
}
+ if sp.txn.meta.formatVersion == 3 {
+ snapshot.FirstRowID = &firstRowID
+ snapshot.AddedRows = &addedRows
+ }
return []Update{
NewAddSnapshotUpdate(&snapshot),
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index 1c6922f1..f2e55631 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -97,6 +97,22 @@ func (m *memIO) Remove(name string) error {
return nil
}
+// createTestTransactionWithMemIO creates a transaction using the io package's
mem blob FS
+// so that Create() output is persisted and can be read back (e.g. for
sequential commits).
+func createTestTransactionWithMemIO(t *testing.T, spec iceberg.PartitionSpec)
(*Transaction, iceio.WriteFileIO) {
+ t.Helper()
+ ctx := context.Background()
+ fs, err := iceio.LoadFS(ctx, nil, "mem://default/table-location")
+ require.NoError(t, err, "LoadFS mem")
+ wfs := fs.(iceio.WriteFileIO)
+ schema := simpleSchema()
+ meta, err := NewMetadata(schema, &spec, UnsortedSortOrder,
"mem://default/table-location", nil)
+ require.NoError(t, err, "new metadata")
+ tbl := New(Identifier{"db", "tbl"}, meta, "metadata.json",
func(context.Context) (iceio.IO, error) { return fs, nil }, nil)
+
+ return tbl.NewTransaction(), wfs
+}
+
func manifestHeaderSize(t *testing.T, version int, spec iceberg.PartitionSpec,
schema *iceberg.Schema) int {
t.Helper()
@@ -161,6 +177,128 @@ func createTestTransaction(t *testing.T, io iceio.IO,
spec iceberg.PartitionSpec
return tbl.NewTransaction()
}
+// TestCommitV3RowLineage ensures v3 snapshot commits set FirstRowID and
AddedRows
+// on the snapshot for row lineage, and that applying updates advances
next-row-id correctly.
+func TestCommitV3RowLineage(t *testing.T) {
+ trackIO := newTrackingIO()
+ spec := iceberg.NewPartitionSpec()
+ txn := createTestTransaction(t, trackIO, spec)
+ txn.meta.formatVersion = 3
+
+ // Single data file with record count 1 (newTestDataFile uses 1, 1 for
record count and file size).
+ const expectedAddedRows = 1
+ sp := newFastAppendFilesProducer(OpAppend, txn, trackIO, nil, nil)
+ df := newTestDataFile(t, spec, "file://data.parquet", nil)
+ sp.appendDataFile(df)
+
+ updates, reqs, err := sp.commit()
+ require.NoError(t, err, "commit should succeed")
+ require.Len(t, updates, 2, "expected AddSnapshot and SetSnapshotRef
updates")
+ addSnap, ok := updates[0].(*addSnapshotUpdate)
+ require.True(t, ok, "first update must be AddSnapshot")
+
+ // Exact snapshot lineage: first-row-id 0 for new table, added-rows
matches appended file(s).
+ require.NotNil(t, addSnap.Snapshot.FirstRowID, "v3 snapshot must have
first-row-id")
+ require.NotNil(t, addSnap.Snapshot.AddedRows, "v3 snapshot must have
added-rows")
+ require.Equal(t, int64(0), *addSnap.Snapshot.FirstRowID, "first-row-id
should be table next-row-id at commit")
+ require.Equal(t, int64(expectedAddedRows), *addSnap.Snapshot.AddedRows,
"added-rows should match appended data file record count")
+
+ // Apply updates and verify metadata next-row-id advances monotonically.
+ err = txn.apply(updates, reqs)
+ require.NoError(t, err, "apply should succeed")
+ meta, err := txn.meta.Build()
+ require.NoError(t, err, "build metadata")
+ require.Equal(t, int64(expectedAddedRows), meta.NextRowID(),
"next-row-id should equal first-row-id + added-rows")
+}
+
+// TestCommitV3RowLineageTwoSequentialCommits runs two commits and asserts
monotonic,
+// gap-free first-row-id / next-row-id progression.
+func TestCommitV3RowLineageTwoSequentialCommits(t *testing.T) {
+ spec := iceberg.NewPartitionSpec()
+ ident := Identifier{"db", "tbl"}
+ txn, memIO := createTestTransactionWithMemIO(t, spec)
+ txn.meta.formatVersion = 3
+
+ // First commit: new table, append one file (1 row).
+ sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
+ sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet",
nil))
+ updates1, reqs1, err := sp1.commit()
+ require.NoError(t, err, "first commit should succeed")
+ addSnap1, ok := updates1[0].(*addSnapshotUpdate)
+ require.True(t, ok)
+ require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "first
snapshot first-row-id")
+ require.Equal(t, int64(1), *addSnap1.Snapshot.AddedRows, "first
snapshot added-rows")
+ err = txn.apply(updates1, reqs1)
+ require.NoError(t, err, "first apply should succeed")
+ meta1, err := txn.meta.Build()
+ require.NoError(t, err)
+ require.Equal(t, int64(1), meta1.NextRowID(), "next-row-id after first
commit")
+
+ // Second commit: fast append one more file. Carried manifest already
has first_row_id, so only new manifest gets row IDs; delta = 1.
+ tbl2 := New(ident, meta1, "metadata.json", func(context.Context)
(iceio.IO, error) { return memIO, nil }, nil)
+ txn2 := tbl2.NewTransaction()
+ txn2.meta.formatVersion = 3
+ sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
+ sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet",
nil))
+ updates2, reqs2, err := sp2.commit()
+ require.NoError(t, err, "second commit should succeed")
+ addSnap2, ok := updates2[0].(*addSnapshotUpdate)
+ require.True(t, ok)
+ require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "second
snapshot first-row-id continues from first next-row-id")
+ require.Equal(t, int64(1), *addSnap2.Snapshot.AddedRows, "only new
manifest gets row IDs assigned")
+
+ err = txn2.apply(updates2, reqs2)
+ require.NoError(t, err, "second apply should succeed")
+ meta2, err := txn2.meta.Build()
+ require.NoError(t, err)
+ require.Equal(t, int64(2), meta2.NextRowID(), "next-row-id = 1 + 1
(gap-free)")
+}
+
+// TestCommitV3RowLineageDeltaIncludesExistingRows uses merge append so one
manifest
+// has both existing and added rows; verifies assigned delta includes
ExistingRowsCount
+// and metadata next-row-id matches.
+func TestCommitV3RowLineageDeltaIncludesExistingRows(t *testing.T) {
+ spec := iceberg.NewPartitionSpec()
+ ident := Identifier{"db", "tbl"}
+ txn, memIO := createTestTransactionWithMemIO(t, spec)
+ txn.meta.formatVersion = 3
+
+ // First commit: one file (1 row).
+ sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
+ sp1.appendDataFile(newTestDataFile(t, spec, "file://data-1.parquet",
nil))
+ updates1, reqs1, err := sp1.commit()
+ require.NoError(t, err, "first commit should succeed")
+ err = txn.apply(updates1, reqs1)
+ require.NoError(t, err)
+ meta1, err := txn.meta.Build()
+ require.NoError(t, err)
+ require.Equal(t, int64(1), meta1.NextRowID())
+
+ // Second commit: merge append so the two data manifests (existing +
new) are merged into one with 1 existing + 1 added row.
+ tbl2 := New(ident, meta1, "metadata.json", func(context.Context)
(iceio.IO, error) { return memIO, nil }, nil)
+ txn2 := tbl2.NewTransaction()
+ txn2.meta.formatVersion = 3
+ if txn2.meta.props == nil {
+ txn2.meta.props = make(iceberg.Properties)
+ }
+ txn2.meta.props[ManifestMergeEnabledKey] = "true"
+ txn2.meta.props[ManifestMinMergeCountKey] = "2"
+ sp2 := newMergeAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
+ sp2.appendDataFile(newTestDataFile(t, spec, "file://data-2.parquet",
nil))
+ updates2, reqs2, err := sp2.commit()
+ require.NoError(t, err, "second commit (merge) should succeed")
+ addSnap2, ok := updates2[0].(*addSnapshotUpdate)
+ require.True(t, ok)
+ require.Equal(t, int64(1), *addSnap2.Snapshot.FirstRowID, "first-row-id
continues from first commit")
+ require.Equal(t, int64(2), *addSnap2.Snapshot.AddedRows, "assigned
delta = existing (1) + added (1) in merged manifest")
+
+ err = txn2.apply(updates2, reqs2)
+ require.NoError(t, err)
+ meta2, err := txn2.meta.Build()
+ require.NoError(t, err)
+ require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id =
first-row-id + assigned delta (1+2)")
+}
+
func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
spec := partitionedSpec()
schema := simpleSchema()