This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new beca2e69 fix(manifest): correct v3 manifest-list first row id
assigment for row-lineage (#741)
beca2e69 is described below
commit beca2e693c85ba12b44b4a8561c0f5562ecd0c7a
Author: ferhat elmas <[email protected]>
AuthorDate: Thu Feb 19 20:45:52 2026 +0100
fix(manifest): correct v3 manifest-list first row id assigment for
row-lineage (#741)
Pointer is aliased and then mutated before encode. Start becomes end of
range. With wiring snapshot lineage from writer delta (#728), this can
cause overlaps/gaps between commits.
Signed-off-by: ferhat elmas <[email protected]>
---
manifest.go | 3 +-
manifest_test.go | 33 ++++++++++++++
table/snapshot_producers_test.go | 94 +++++++++++++++++++++++++++++++++++++++-
3 files changed, 127 insertions(+), 3 deletions(-)
diff --git a/manifest.go b/manifest.go
index 48bae7a6..68a7234a 100644
--- a/manifest.go
+++ b/manifest.go
@@ -1406,7 +1406,8 @@ func (m *ManifestListWriter) AddManifests(files
[]ManifestFile) error {
// Ref:
https://github.com/apache/iceberg/blob/ea2071568dc66148b483a82eefedcd2992b435f7/core/src/main/java/org/apache/iceberg/ManifestListWriter.java#L157-L168
if wrapped.Content == ManifestContentData &&
wrapped.FirstRowId == nil {
if m.nextRowID != nil {
- wrapped.FirstRowId = m.nextRowID
+ firstRowID := *m.nextRowID
+ wrapped.FirstRowId = &firstRowID
*m.nextRowID +=
wrapped.ExistingRowsCount + wrapped.AddedRowsCount
}
}
diff --git a/manifest_test.go b/manifest_test.go
index 87dc4efc..b94b47ae 100644
--- a/manifest_test.go
+++ b/manifest_test.go
@@ -1448,6 +1448,39 @@ func (m *ManifestTestSuite)
TestV3ManifestListWriterDeltaIgnoresNonDataManifests
m.Require().NoError(writer.Close())
}
+func (m *ManifestTestSuite)
TestV3ManifestListWriterPersistsPerManifestFirstRowIDStart() {
+ // Persisted first_row_id per manifest must be the start of each
assigned row-id range.
+ var buf bytes.Buffer
+ commitSnapID := int64(100)
+ firstRowID := int64(5000)
+ sequenceNum := int64(1)
+
+ writer, err := NewManifestListWriterV3(&buf, commitSnapID, sequenceNum,
firstRowID, nil)
+ m.Require().NoError(err)
+
+ manifests := []ManifestFile{
+ NewManifestFile(3, "m1.avro", 10, 1,
commitSnapID).AddedRows(10).ExistingRows(5).Build(), // delta = 15
+ NewManifestFile(3, "m2.avro", 10, 1,
commitSnapID).AddedRows(7).Build(), // delta = 7
+ }
+ m.Require().NoError(writer.AddManifests(manifests))
+ m.Require().NoError(writer.Close())
+
+ list, err := ReadManifestList(bytes.NewReader(buf.Bytes()))
+ m.Require().NoError(err)
+ m.Require().Len(list, 2)
+
+ firstManifest, ok := list[0].(*manifestFile)
+ m.Require().True(ok, "expected v3 manifest file type")
+ secondManifest, ok := list[1].(*manifestFile)
+ m.Require().True(ok, "expected v3 manifest file type")
+ m.Require().NotNil(firstManifest.FirstRowId)
+ m.Require().NotNil(secondManifest.FirstRowId)
+
+ m.EqualValues(5000, *firstManifest.FirstRowId) // start of first range
+ m.EqualValues(5015, *secondManifest.FirstRowId)
+ m.EqualValues(5022, *writer.NextRowID())
+}
+
func (m *ManifestTestSuite) TestV3PrepareEntrySequenceNumberValidation() {
// Test v3writerImpl.prepareEntry sequence number validation logic
v3Writer := v3writerImpl{}
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index f2e55631..025e0b59 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -20,6 +20,7 @@ package table
import (
"bytes"
"context"
+ "encoding/json"
"errors"
"io"
"io/fs"
@@ -135,6 +136,10 @@ func manifestSize(t *testing.T, version int, spec
iceberg.PartitionSpec, schema
}
func newTestDataFile(t *testing.T, spec iceberg.PartitionSpec, path string,
partition map[int]any) iceberg.DataFile {
+ return newTestDataFileWithCount(t, spec, path, partition, 1)
+}
+
+func newTestDataFileWithCount(t *testing.T, spec iceberg.PartitionSpec, path
string, partition map[int]any, count int64) iceberg.DataFile {
t.Helper()
builder, err := iceberg.NewDataFileBuilder(
@@ -145,8 +150,8 @@ func newTestDataFile(t *testing.T, spec
iceberg.PartitionSpec, path string, part
partition,
nil,
nil,
- 1,
- 1,
+ count,
+ count,
)
require.NoError(t, err, "new data file builder")
@@ -299,6 +304,91 @@ func TestCommitV3RowLineageDeltaIncludesExistingRows(t
*testing.T) {
require.Equal(t, int64(3), meta2.NextRowID(), "next-row-id =
first-row-id + assigned delta (1+2)")
}
+func readManifestListFromPath(t *testing.T, fs iceio.IO, path string)
[]iceberg.ManifestFile {
+ t.Helper()
+
+ f, err := fs.Open(path)
+ require.NoError(t, err, "open manifest list: %s", path)
+ defer f.Close()
+
+ list, err := iceberg.ReadManifestList(f)
+ require.NoError(t, err, "read manifest list: %s", path)
+
+ return list
+}
+
+func manifestFirstRowIDForSnapshot(t *testing.T, manifests
[]iceberg.ManifestFile, snapshotID int64) int64 {
+ t.Helper()
+
+ type manifestRowLineage struct {
+ AddedSnapshotID int64 `json:"AddedSnapshotID"`
+ FirstRowID *int64 `json:"FirstRowId"`
+ }
+
+ for _, manifest := range manifests {
+ raw, err := json.Marshal(manifest)
+ require.NoError(t, err, "marshal manifest")
+
+ var decoded manifestRowLineage
+ require.NoError(t, json.Unmarshal(raw, &decoded), "unmarshal
manifest row-lineage fields")
+
+ if decoded.AddedSnapshotID == snapshotID {
+ require.NotNil(t, decoded.FirstRowID, "first_row_id
must be persisted for v3 data manifests")
+
+ return *decoded.FirstRowID
+ }
+ }
+
+ require.Failf(t, "missing manifest for snapshot", "snapshot-id=%d",
snapshotID)
+
+ return 0
+}
+
+// TestCommitV3RowLineagePersistsManifestFirstRowID verifies that snapshot
producer
+// writes first_row_id to manifest list entries using the snapshot's start
row-id.
+func TestCommitV3RowLineagePersistsManifestFirstRowID(t *testing.T) {
+ spec := iceberg.NewPartitionSpec()
+ ident := Identifier{"db", "tbl"}
+ txn, memIO := createTestTransactionWithMemIO(t, spec)
+ txn.meta.formatVersion = 3
+
+ // Use multi-row files to make row-range starts obvious.
+ sp1 := newFastAppendFilesProducer(OpAppend, txn, memIO, nil, nil)
+ sp1.appendDataFile(newTestDataFileWithCount(t, spec,
"file://data-1.parquet", nil, 3))
+ updates1, reqs1, err := sp1.commit()
+ require.NoError(t, err, "first commit should succeed")
+ addSnap1, ok := updates1[0].(*addSnapshotUpdate)
+ require.True(t, ok, "first update must be AddSnapshot")
+ require.Equal(t, int64(0), *addSnap1.Snapshot.FirstRowID, "snapshot
first-row-id for commit 1")
+
+ manifests1 := readManifestListFromPath(t, memIO,
addSnap1.Snapshot.ManifestList)
+ currentManifestFirstRowID1 := manifestFirstRowIDForSnapshot(t,
manifests1, addSnap1.Snapshot.SnapshotID)
+ require.Equal(t, *addSnap1.Snapshot.FirstRowID,
currentManifestFirstRowID1,
+ "persisted manifest first_row_id must match snapshot
first-row-id for current commit")
+
+ err = txn.apply(updates1, reqs1)
+ require.NoError(t, err, "first apply should succeed")
+ meta1, err := txn.meta.Build()
+ require.NoError(t, err)
+ require.Equal(t, int64(3), meta1.NextRowID())
+
+ tbl2 := New(ident, meta1, "metadata.json", func(context.Context)
(iceio.IO, error) { return memIO, nil }, nil)
+ txn2 := tbl2.NewTransaction()
+ txn2.meta.formatVersion = 3
+ sp2 := newFastAppendFilesProducer(OpAppend, txn2, memIO, nil, nil)
+ sp2.appendDataFile(newTestDataFileWithCount(t, spec,
"file://data-2.parquet", nil, 5))
+ updates2, _, err := sp2.commit()
+ require.NoError(t, err, "second commit should succeed")
+ addSnap2, ok := updates2[0].(*addSnapshotUpdate)
+ require.True(t, ok, "first update must be AddSnapshot")
+ require.Equal(t, int64(3), *addSnap2.Snapshot.FirstRowID, "snapshot
first-row-id for commit 2")
+
+ manifests2 := readManifestListFromPath(t, memIO,
addSnap2.Snapshot.ManifestList)
+ currentManifestFirstRowID2 := manifestFirstRowIDForSnapshot(t,
manifests2, addSnap2.Snapshot.SnapshotID)
+ require.Equal(t, *addSnap2.Snapshot.FirstRowID,
currentManifestFirstRowID2,
+ "persisted manifest first_row_id must match snapshot
first-row-id for current commit")
+}
+
func TestSnapshotProducerManifestsClosesWriterOnError(t *testing.T) {
spec := partitionedSpec()
schema := simpleSchema()