This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 123a5583 feat(table): preserve row lineage through copy-on-write
rewrites (#1099)
123a5583 is described below
commit 123a558339d5b83b0142a1305429a5dabf703d91
Author: Tanmay Rauth <[email protected]>
AuthorDate: Sat May 23 16:20:40 2026 +0530
feat(table): preserve row lineage through copy-on-write rewrites (#1099)
Parent: #999
Preserves `_row_id` through CoW rewrites and compaction on v3 tables.
The scanner synthesizes row IDs from file metadata, the filter is
applied post-synthesis to preserve correct positions, and the writer
stores `_row_id` explicitly in output Parquet.
`_last_updated_sequence_number` is left null to inherit from the new
file's `data_sequence_number`.
Also documents that `next-row-id` accounting intentionally overcounts
(matching Java's `ManifestListWriter.V4Writer`) and removes the old
`Projection()` synthesis helpers replaced by the explicit
`WithRowLineage()` scan option.
## Test plan
- `TestCoWRewritePreservesRowID` : verifies surviving rows keep original
`_row_id` after a CoW delete
- `TestCoWRewriteRowIDNextRowIDAccounting` : validates next-row-id
advances correctly
---
metadata_columns.go | 47 +++-
table/arrow_utils.go | 3 +-
table/rewrite_data_files.go | 67 ++++++
table/row_lineage_rewrite_test.go | 294 ++++++++++++++++++++++++
table/scanner.go | 187 +++++++--------
table/scanner_internal_test.go | 470 ++++++++++++++------------------------
table/snapshot_producers.go | 5 +
table/table.go | 11 +
table/transaction.go | 295 ++++++++++++++++++++----
table/write_records.go | 61 ++++-
10 files changed, 992 insertions(+), 448 deletions(-)
diff --git a/metadata_columns.go b/metadata_columns.go
index efe568f7..fa9e2db4 100644
--- a/metadata_columns.go
+++ b/metadata_columns.go
@@ -17,13 +17,16 @@
package iceberg
-// Row lineage metadata column field IDs (v3+). Reserved IDs are
Integer.MAX_VALUE - 107 and 108
-// per the Iceberg spec (Metadata Columns / Row Lineage).
+import "slices"
+
+// Row lineage metadata column field IDs (v3+). Reserved IDs are
Integer.MAX_VALUE - 107
+// and Integer.MAX_VALUE - 108 per the Iceberg spec (Metadata Columns / Row
Lineage).
const (
// RowIDFieldID is the field ID for _row_id (optional long). A unique
long identifier for every row.
+ // Reserved as Integer.MAX_VALUE - 107.
RowIDFieldID = 2147483540
// LastUpdatedSequenceNumberFieldID is the field ID for
_last_updated_sequence_number (optional long).
- // The sequence number of the commit that last updated the row.
+ // The sequence number of the commit that last updated the row.
Reserved as Integer.MAX_VALUE - 108.
LastUpdatedSequenceNumberFieldID = 2147483539
)
@@ -59,3 +62,41 @@ func LastUpdatedSequenceNumber() NestedField {
func IsMetadataColumn(fieldID int) bool {
return fieldID == RowIDFieldID || fieldID ==
LastUpdatedSequenceNumberFieldID
}
+
+// SchemaWithRowLineage returns a new schema with the row-lineage metadata
columns
+// (_row_id, _last_updated_sequence_number) appended to the given schema's
fields.
+// Used when reading source files during a CoW rewrite or compaction so that
row
+// identity and per-row update sequence are preserved in the output.
+//
+// Idempotent: if a row-lineage column is already present (by reserved field
ID),
+// it is not appended again. The returned schema always allocates a fresh field
+// slice so it cannot alias the input schema's backing array.
+func SchemaWithRowLineage(s *Schema) *Schema {
+ if s == nil {
+ return nil
+ }
+ // Clone the field slice up front so we never share a backing array
with the
+ // caller's schema — append-with-spare-capacity could otherwise mutate
the
+ // source schema's fields when the caller next mutates either side.
+ fields := slices.Clone(s.Fields())
+
+ hasRowID := false
+ hasSeqNum := false
+ for _, f := range fields {
+ switch f.ID {
+ case RowIDFieldID:
+ hasRowID = true
+ case LastUpdatedSequenceNumberFieldID:
+ hasSeqNum = true
+ }
+ }
+
+ if !hasRowID {
+ fields = append(fields, RowID())
+ }
+ if !hasSeqNum {
+ fields = append(fields, LastUpdatedSequenceNumber())
+ }
+
+ return NewSchemaWithIdentifiers(s.ID, s.IdentifierFieldIDs, fields...)
+}
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 48c04c74..55507c92 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1516,6 +1516,7 @@ type recordWritingArgs struct {
counter iter.Seq[int]
maxWriteWorkers int
clustered bool
+ factoryOpts []writerFactoryOption
}
func recordsToDataFiles(ctx context.Context, rootLocation string, meta
*MetadataBuilder, args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile,
error]) {
@@ -1554,7 +1555,7 @@ func recordsToDataFiles(ctx context.Context, rootLocation
string, meta *Metadata
}
}
- factory, err := newWriterFactory(rootLocation, args, meta, taskSchema,
targetFileSize)
+ factory, err := newWriterFactory(rootLocation, args, meta, taskSchema,
targetFileSize, args.factoryOpts...)
if err != nil {
panic(err)
}
diff --git a/table/rewrite_data_files.go b/table/rewrite_data_files.go
index e2cee62f..d0d1b875 100644
--- a/table/rewrite_data_files.go
+++ b/table/rewrite_data_files.go
@@ -20,6 +20,7 @@ package table
import (
"context"
"fmt"
+ "log/slog"
"maps"
"github.com/apache/iceberg-go"
@@ -289,6 +290,40 @@ func ExecuteCompactionGroup(ctx context.Context, tbl
*Table, group CompactionTas
scanOpts = append(scanOpts,
WitMaxConcurrency(cfg.scanConcurrency))
}
+ // Preserve row lineage only when every source file in the group carries
+ // it. A mixed group (some files with FirstRowID, some without — e.g.
+ // legacy files on a v3 table) would otherwise produce one output where
+ // post-lineage rows have explicit _row_id values and pre-lineage rows
+ // have nulls, which violates the per-file uniqueness/coverage
+ // invariant the v3 spec requires. Splitting mixed groups into separate
+ // outputs is a larger refactor and is left as a follow-up; for now we
+ // degrade gracefully (the rewrite still succeeds, but lineage is not
+ // preserved for the surviving rows).
+ preserveLineage := tbl.metadata.Version() >= 3 &&
allTasksHaveRowLineage(group.Tasks)
+ if preserveLineage {
+ scanOpts = append(scanOpts, WithRowLineage())
+ } else if tbl.metadata.Version() >= 3 {
+ // Mixed group on a v3 table — at least one source file lacks
+ // FirstRowID so we drop lineage on the surviving rows. This is
the
+ // common case during a v1/v2→v3 migration; surface it so
operators
+ // can detect silent lineage loss instead of having to diff
+ // metadata before/after.
+ var lineageFiles, legacyFiles int
+ for _, t := range group.Tasks {
+ if t.FirstRowID != nil {
+ lineageFiles++
+ } else {
+ legacyFiles++
+ }
+ }
+ if lineageFiles > 0 {
+ slog.Warn("compaction group has mixed row lineage;
dropping _row_id on output",
+ "partition_key", group.PartitionKey,
+ "lineage_files", lineageFiles,
+ "legacy_files", legacyFiles)
+ }
+ }
+
arrowSchema, records, err := tbl.Scan(scanOpts...).ReadTasks(ctx,
group.Tasks)
if err != nil {
return CompactionGroupResult{}, fmt.Errorf("read tasks for
compaction group %q: %w", group.PartitionKey, err)
@@ -300,6 +335,20 @@ func ExecuteCompactionGroup(ctx context.Context, tbl
*Table, group CompactionTas
if cfg.targetFileSize > 0 {
writeOpts = append(writeOpts,
WithTargetFileSize(cfg.targetFileSize))
}
+ if preserveLineage {
+ // Rebuild the arrow schema from the projected iceberg schema
so the
+ // reserved row-lineage field IDs (_row_id,
_last_updated_sequence_number)
+ // are attached as Arrow field metadata. ArrowSchemaToIceberg
prefers
+ // embedded field IDs when present and otherwise falls back to
the
+ // table's name mapping — which doesn't (and cannot) contain the
+ // reserved metadata column names, so the fallback path panics.
+ projectedSchema := iceberg.SchemaWithRowLineage(tbl.Schema())
+ arrowSchema, err = SchemaToArrowSchema(projectedSchema, nil,
true, false)
+ if err != nil {
+ return CompactionGroupResult{}, fmt.Errorf("build arrow
schema for lineage write in group %q: %w", group.PartitionKey, err)
+ }
+ writeOpts = append(writeOpts,
WithPreserveRowLineage(projectedSchema))
+ }
var (
newFiles []iceberg.DataFile
@@ -328,6 +377,24 @@ func ExecuteCompactionGroup(ctx context.Context, tbl
*Table, group CompactionTas
}, nil
}
+// allTasksHaveRowLineage returns true iff every task in the group has a
+// non-nil FirstRowID — i.e. every source file already carries v3 row lineage.
+// Used to gate the preservation path on compaction: mixed groups (some
+// lineage, some legacy) would otherwise produce per-file invariant
+// violations, so the gate is conservative.
+func allTasksHaveRowLineage(tasks []FileScanTask) bool {
+ if len(tasks) == 0 {
+ return false
+ }
+ for _, t := range tasks {
+ if t.FirstRowID == nil {
+ return false
+ }
+ }
+
+ return true
+}
+
// rewriteDataFilesPartial stages each group as its own rewrite
// snapshot via [Transaction.ReplaceFiles] directly. Per-group staging
// lets a mid-loop write failure leave already-staged groups on the
diff --git a/table/row_lineage_rewrite_test.go
b/table/row_lineage_rewrite_test.go
new file mode 100644
index 00000000..9b1fc193
--- /dev/null
+++ b/table/row_lineage_rewrite_test.go
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "context"
+ "path/filepath"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func newV3RowLineageTestTable(t *testing.T) *table.Table {
+ t.Helper()
+
+ location := filepath.ToSlash(t.TempDir())
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
table.UnsortedSortOrder, location,
+ iceberg.Properties{table.PropertyFormatVersion: "3"})
+ require.NoError(t, err)
+
+ metaLoc := location + "/metadata/v1.metadata.json"
+ fsF := func(context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil }
+ cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF:
fsF}
+
+ return table.New(table.Identifier{"db", "row_lineage_test"}, meta,
metaLoc, fsF, cat)
+}
+
+// TestCoWRewritePreservesRowID verifies that a copy-on-write overwrite with a
+// row filter preserves the original _row_id and _last_updated_sequence_number
+// values in the rewritten file. Surviving rows must keep both values from the
+// pre-rewrite snapshot — the rewrite is "physically rewritten", not "logically
+// updated", per the v3 spec.
+func TestCoWRewritePreservesRowID(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+
+ tbl := newV3RowLineageTestTable(t)
+
+ // Append 3 rows: id=1,2,3
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ initialData, err := array.TableFromJSON(mem, arrowSchema, []string{
+ `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}, {"id": 3,
"data": "c"}]`,
+ })
+ require.NoError(t, err)
+ defer initialData.Release()
+
+ tbl, err = tbl.Append(ctx, array.NewTableReader(initialData, -1), nil)
+ require.NoError(t, err)
+
+ // Verify the append created a valid v3 snapshot with row lineage.
+ snap := tbl.CurrentSnapshot()
+ require.NotNil(t, snap)
+ require.NotNil(t, snap.FirstRowID, "v3 snapshot must have first-row-id")
+ require.NotNil(t, snap.AddedRows, "v3 snapshot must have added-rows")
+ assert.Equal(t, int64(0), *snap.FirstRowID)
+ assert.Equal(t, int64(3), *snap.AddedRows)
+
+ // Capture the snapshot's sequence number so we can assert preservation
+ // after the rewrite. After the append, every row's effective
+ // _last_updated_sequence_number should be this value.
+ createSeq := snap.SequenceNumber
+
+ // Scan with row lineage to see synthesized _row_id values before the
rewrite.
+ lineageScan := tbl.Scan(table.WithRowLineage())
+ schema, itr, err := lineageScan.ToArrowRecords(ctx)
+ require.NoError(t, err)
+
+ rowIDIdx := -1
+ for i, f := range schema.Fields() {
+ if f.Name == iceberg.RowIDColumnName {
+ rowIDIdx = i
+
+ break
+ }
+ }
+ require.GreaterOrEqual(t, rowIDIdx, 0, "_row_id should be in scan
projection")
+
+ var originalRowIDs []int64
+ for rec, err := range itr {
+ require.NoError(t, err)
+ col := rec.Column(rowIDIdx).(*array.Int64)
+ for i := 0; i < col.Len(); i++ {
+ originalRowIDs = append(originalRowIDs, col.Value(i))
+ }
+ rec.Release()
+ }
+ require.Equal(t, []int64{0, 1, 2}, originalRowIDs, "initial _row_id
should be 0,1,2")
+
+ // CoW overwrite: delete the row where id=2, preserving id=1 and id=3.
+ filter := iceberg.EqualTo(iceberg.Reference("id"), int64(2))
+ tbl, err = tbl.Delete(ctx, filter, nil)
+ require.NoError(t, err)
+
+ snap = tbl.CurrentSnapshot()
+ require.NotNil(t, snap)
+ require.Greater(t, snap.SequenceNumber, createSeq,
+ "sanity: rewrite snapshot must have a higher sequence number
than the create snapshot")
+
+ // Scan the result with row lineage. The surviving rows should preserve
their
+ // original _row_id values: 0 and 2. Their
_last_updated_sequence_number must
+ // also still report the create snapshot's seq, NOT the rewrite
snapshot's
+ // seq — the rewrite is physical only, not a logical update.
+ lineageScan = tbl.Scan(table.WithRowLineage())
+ _, itr, err = lineageScan.ToArrowRecords(ctx)
+ require.NoError(t, err)
+
+ var afterRowIDs []int64
+ var afterIDs []int64
+ var afterSeq []int64
+ for rec, err := range itr {
+ require.NoError(t, err)
+ idIdx := rec.Schema().FieldIndices("id")
+ require.NotEmpty(t, idIdx)
+ rowIDIndices :=
rec.Schema().FieldIndices(iceberg.RowIDColumnName)
+ require.NotEmpty(t, rowIDIndices)
+ seqIndices :=
rec.Schema().FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ require.NotEmpty(t, seqIndices, "_last_updated_sequence_number
must be in projection")
+
+ idCol := rec.Column(idIdx[0]).(*array.Int64)
+ rowIDCol := rec.Column(rowIDIndices[0]).(*array.Int64)
+ seqCol := rec.Column(seqIndices[0]).(*array.Int64)
+ for i := 0; i < int(rec.NumRows()); i++ {
+ afterIDs = append(afterIDs, idCol.Value(i))
+ afterRowIDs = append(afterRowIDs, rowIDCol.Value(i))
+ require.False(t, seqCol.IsNull(i),
+ "row %d must have a non-null
_last_updated_sequence_number after CoW rewrite", i)
+ afterSeq = append(afterSeq, seqCol.Value(i))
+ }
+ rec.Release()
+ }
+
+ assert.Equal(t, []int64{1, 3}, afterIDs, "remaining rows should be
id=1,3")
+ assert.Equal(t, []int64{0, 2}, afterRowIDs,
+ "_row_id must be preserved through CoW rewrite: row with id=1
keeps _row_id=0, row with id=3 keeps _row_id=2")
+ assert.Equal(t, []int64{createSeq, createSeq}, afterSeq,
+ "_last_updated_sequence_number must report the original
creation snapshot's sequence number, not the rewrite's")
+}
+
+// TestCoWRewriteRowIDNextRowIDAccounting verifies that row-id accounting
remains
+// correct after a CoW rewrite. The overcounting (where next-row-id advances by
+// the full manifest row count including preserved survivors) is intentional
and
+// matches Java's ManifestListWriter.V3Writer behavior.
+func TestCoWRewriteRowIDNextRowIDAccounting(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+
+ tbl := newV3RowLineageTestTable(t)
+
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ data, err := array.TableFromJSON(mem, arrowSchema, []string{
+ `[{"id": 10, "data": "x"}, {"id": 20, "data": "y"}, {"id": 30,
"data": "z"}]`,
+ })
+ require.NoError(t, err)
+ defer data.Release()
+
+ tbl, err = tbl.Append(ctx, array.NewTableReader(data, -1), nil)
+ require.NoError(t, err)
+
+ // next-row-id should be 3 after appending 3 rows.
+ assert.Equal(t, int64(3), tbl.Metadata().NextRowID())
+
+ // Delete one row via CoW.
+ filter := iceberg.EqualTo(iceberg.Reference("id"), int64(20))
+ tbl, err = tbl.Delete(ctx, filter, nil)
+ require.NoError(t, err)
+
+ // next-row-id advances by the new manifest's added-rows count (2 here),
+ // even though the surviving rows preserve their old IDs. Going from 3
+ // (after the initial append) to 5 (= prior NextRowID + manifest's added
+ // rows). This "wastes" ID space but doesn't violate uniqueness — actual
+ // row IDs come from the explicit Parquet column, not the global
counter.
+ // Mirrors Java's ManifestListWriter.V3Writer.prepare() in
+ // table/snapshot_producers.go.
+ assert.Equal(t, int64(5), tbl.Metadata().NextRowID(),
+ "next-row-id should advance from 3 by the rewrite manifest's 2
added rows")
+}
+
+// TestExecuteCompactionGroupPreservesRowID verifies that
+// ExecuteCompactionGroup preserves _row_id values through compaction on a v3
+// table. After compaction every row should retain its original _row_id even
+// though the underlying file paths have changed.
+func TestExecuteCompactionGroupPreservesRowID(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+
+ tbl := newV3RowLineageTestTable(t)
+
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ // Two appends produce two data files in two snapshots; row IDs should
+ // span both files (0..1 in the first, 2..3 in the second).
+ for i, payload := range []string{
+ `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}]`,
+ `[{"id": 3, "data": "c"}, {"id": 4, "data": "d"}]`,
+ } {
+ data, err := array.TableFromJSON(mem, arrowSchema,
[]string{payload})
+ require.NoError(t, err, "append %d", i)
+ t.Cleanup(data.Release)
+
+ tbl, err = tbl.Append(ctx, array.NewTableReader(data, -1), nil)
+ require.NoError(t, err, "append %d", i)
+ }
+
+ tasks, err := tbl.Scan().PlanFiles(ctx)
+ require.NoError(t, err)
+ require.Len(t, tasks, 2, "two source files for compaction")
+
+ scanTasks := make([]table.FileScanTask, len(tasks))
+ var totalSize int64
+ for i, st := range tasks {
+ scanTasks[i] = st
+ totalSize += st.File.FileSizeBytes()
+ }
+ group := table.CompactionTaskGroup{
+ PartitionKey: "single",
+ Tasks: scanTasks,
+ TotalSizeBytes: totalSize,
+ }
+
+ gr, err := table.ExecuteCompactionGroup(ctx, tbl, group)
+ require.NoError(t, err)
+ require.Equal(t, 2, len(gr.OldDataFiles), "both source files should be
replaced")
+ require.GreaterOrEqual(t, len(gr.NewDataFiles), 1, "compaction should
produce at least one output file")
+
+ tx := tbl.NewTransaction()
+ rewrite := tx.NewRewrite(nil)
+ rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes)
+ require.NoError(t, rewrite.Commit(ctx))
+ tbl, err = tx.Commit(ctx)
+ require.NoError(t, err)
+
+ // Read back with row lineage projected; every row should retain its
+ // pre-compaction _row_id (0,1,2,3).
+ _, itr, err := tbl.Scan(table.WithRowLineage()).ToArrowRecords(ctx)
+ require.NoError(t, err)
+
+ got := map[int64]int64{}
+ for rec, err := range itr {
+ require.NoError(t, err)
+ idIdx := rec.Schema().FieldIndices("id")
+ require.NotEmpty(t, idIdx)
+ rowIDIdx := rec.Schema().FieldIndices(iceberg.RowIDColumnName)
+ require.NotEmpty(t, rowIDIdx, "_row_id must be projected")
+
+ idCol := rec.Column(idIdx[0]).(*array.Int64)
+ rowIDCol := rec.Column(rowIDIdx[0]).(*array.Int64)
+ for i := 0; i < int(rec.NumRows()); i++ {
+ got[idCol.Value(i)] = rowIDCol.Value(i)
+ }
+ rec.Release()
+ }
+
+ assert.Equal(t,
+ map[int64]int64{1: 0, 2: 1, 3: 2, 4: 3},
+ got,
+ "compaction must preserve every row's _row_id")
+}
diff --git a/table/scanner.go b/table/scanner.go
index 3e9c6e0e..8e94088f 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -194,6 +194,8 @@ type Scan struct {
options iceberg.Properties
limit int64
+ includeRowLineage bool
+
partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
concurrency int
}
@@ -243,7 +245,6 @@ func (scan *Scan) Snapshot() *Snapshot {
func (scan *Scan) Projection() (*iceberg.Schema, error) {
curSchema := scan.metadata.CurrentSchema()
curVersion := scan.metadata.Version()
- caseSensitive := scan.caseSensitive
if scan.snapshotID != nil {
snap := scan.metadata.SnapshotByID(*scan.snapshotID)
if snap == nil {
@@ -261,26 +262,100 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
}
}
- if slices.Contains(scan.selectedFields, "*") {
- return curSchema, nil
+ if scan.includeRowLineage && curVersion < minFormatVersionRowLineage {
+ return nil, fmt.Errorf("%w: row lineage requires format version
%d, table is v%d",
+ ErrInvalidOperation, minFormatVersionRowLineage,
curVersion)
}
- selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields,
caseSensitive)
- schemaMeta := metaFieldsFromSchema(curSchema)
- synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
- if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+ var schema *iceberg.Schema
+ if slices.Contains(scan.selectedFields, "*") {
+ schema = curSchema
+ } else {
+ // Intercept row-lineage metadata column names (_row_id,
+ // _last_updated_sequence_number) before calling Select: they
are
+ // reserved and never appear in the user schema's fields, so
+ // Select would fail with "could not find column" on v3 tables
+ // where they are otherwise legal to project. The scanner reads
+ // them from file metadata (or synthesizes them) at scan time;
+ // here we just need to ensure they survive into the projection.
+ userFields, lineageFields :=
splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive)
+ if len(lineageFields) > 0 && curVersion <
minFormatVersionRowLineage {
+ // Reject explicitly so the contract lives in the code
rather
+ // than emerging from Select's "could not find column"
path —
+ // a future v2 schema field literally named _row_id
should not
+ // silently succeed here.
+ return nil, fmt.Errorf("%w: row lineage column %q
requires format version %d, table is v%d",
+ ErrInvalidOperation, lineageFields[0].Name,
minFormatVersionRowLineage, curVersion)
+ }
- // synthesis path
- removedMetaSlice, missingMetaFields :=
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
- sch, err := curSchema.Select(scan.caseSensitive,
removedMetaSlice...)
+ var err error
+ schema, err = curSchema.Select(scan.caseSensitive,
userFields...)
if err != nil {
return nil, err
}
+ // Skip the per-name append when scan.includeRowLineage is set:
the
+ // SchemaWithRowLineage call below adds both lineage columns
+ // unconditionally, and appendMissingLineageFields would just be
+ // redundant work whose result is overwritten.
+ if len(lineageFields) > 0 && !scan.includeRowLineage {
+ schema = appendMissingLineageFields(schema,
lineageFields)
+ }
+ }
+
+ if scan.includeRowLineage {
+ schema = iceberg.SchemaWithRowLineage(schema)
+ }
+
+ return schema, nil
+}
+
+// splitLineageMetadataFields partitions selectedFields into user fields and
+// row-lineage metadata fields (_row_id, _last_updated_sequence_number). The
+// returned lineage slice contains the canonical NestedField for each
+// metadata column name found, in the order encountered.
+func splitLineageMetadataFields(selectedFields []string, caseSensitive bool)
(userFields []string, lineageFields []iceberg.NestedField) {
+ matches := func(field, target string) bool {
+ if caseSensitive {
+ return field == target
+ }
+
+ return strings.EqualFold(field, target)
+ }
+
+ userFields = make([]string, 0, len(selectedFields))
+ for _, field := range selectedFields {
+ switch {
+ case matches(field, iceberg.RowIDColumnName):
+ lineageFields = append(lineageFields, iceberg.RowID())
+ case matches(field,
iceberg.LastUpdatedSequenceNumberColumnName):
+ lineageFields = append(lineageFields,
iceberg.LastUpdatedSequenceNumber())
+ default:
+ userFields = append(userFields, field)
+ }
+ }
+
+ return userFields, lineageFields
+}
- return iceberg.NewSchemaWithIdentifiers(sch.ID,
sch.IdentifierFieldIDs, append(sch.Fields(), missingMetaFields...)...), nil
+// appendMissingLineageFields returns a new schema with each lineage field
+// appended only if no field with that ID is already present. Idempotent so
+// callers can pass schemas that already declare the reserved fields.
+func appendMissingLineageFields(s *iceberg.Schema, lineageFields
[]iceberg.NestedField) *iceberg.Schema {
+ existing := make(map[int]struct{}, len(s.Fields()))
+ for _, f := range s.Fields() {
+ existing[f.ID] = struct{}{}
}
- return curSchema.Select(scan.caseSensitive, scan.selectedFields...)
+ fields := slices.Clone(s.Fields())
+ for _, f := range lineageFields {
+ if _, ok := existing[f.ID]; ok {
+ continue
+ }
+ fields = append(fields, f)
+ existing[f.ID] = struct{}{}
+ }
+
+ return iceberg.NewSchemaWithIdentifiers(s.ID, s.IdentifierFieldIDs,
fields...)
}
func (scan *Scan) buildPartitionProjection(specID int)
(iceberg.BooleanExpression, error) {
@@ -621,10 +696,15 @@ func (scan *Scan) PlanFiles(ctx context.Context)
([]FileScanTask, error) {
Length: e.DataFile().FileSizeBytes(),
}
// Row lineage constants: readers use these to synthesize
_row_id and
- // _last_updated_sequence_number when requested.
+ // _last_updated_sequence_number when requested. Per spec the
+ // synthesized _last_updated_sequence_number is the manifest
entry's
+ // data sequence number (field id 3), not file sequence number
+ // (field id 4); back-dated EXISTING entries can have the two
+ // diverge and Java/iceberg-rust use the data sequence number.
task.FirstRowID = e.DataFile().FirstRowID()
- if fseq := e.FileSequenceNum(); fseq != nil {
- task.DataSequenceNumber = fseq
+ if seq := e.SequenceNum(); seq >= 0 {
+ s := seq
+ task.DataSequenceNumber = &s
}
results = append(results, task)
}
@@ -721,80 +801,3 @@ func (scan *Scan) ToArrowTable(ctx context.Context)
(arrow.Table, error) {
return array.NewTableFromRecords(schema, records), nil
}
-
-// Removes metaFields from selectedField if it exists. Returns a []string
representing the filtered selectedFields
-// and an iceberg.NestedField[] representing the removed metadata. Note that
metaFields is passed in
-// after being validated from metaFieldsFromSelectedFields.
-func removeMetadataFromSelectedFields(selectedFields []string, metaFields
[]string) ([]string, []iceberg.NestedField) {
- filteredFields := []string{}
- meta := []iceberg.NestedField{}
-
- for _, field := range selectedFields {
- if slices.Contains(metaFields, strings.ToLower(field)) {
-
- switch strings.ToLower(field) {
- case iceberg.LastUpdatedSequenceNumberColumnName:
- meta = append(meta,
iceberg.LastUpdatedSequenceNumber())
- case iceberg.RowIDColumnName:
- meta = append(meta, iceberg.RowID())
- }
-
- continue
- }
-
- filteredFields = append(filteredFields, field)
- }
-
- return filteredFields, meta
-}
-
-func metaFieldsFromSelectedFields(selectedFields []string, caseSensitive bool)
[]string {
- meta := []string{}
- if !caseSensitive {
- for _, field := range selectedFields {
- if strings.EqualFold(field, iceberg.RowIDColumnName) ||
strings.EqualFold(field, iceberg.LastUpdatedSequenceNumberColumnName) {
- meta = append(meta, strings.ToLower(field))
- }
- }
-
- return meta
- }
-
- for _, field := range selectedFields {
- if field == iceberg.RowIDColumnName || field ==
iceberg.LastUpdatedSequenceNumberColumnName {
- meta = append(meta, strings.ToLower(field))
- }
- }
-
- return meta
-}
-
-// Takes in a *iceberg.Schema and returns a []string representing the row
lineage metadata present
-// in the schema.
-func metaFieldsFromSchema(sch *iceberg.Schema) []string {
- meta := []string{}
- _, hasRowIDMeta := sch.FindFieldByName(iceberg.RowIDColumnName)
- _, hasSeqMeta :=
sch.FindFieldByName(iceberg.LastUpdatedSequenceNumberColumnName)
-
- if hasRowIDMeta {
- meta = append(meta, iceberg.RowIDColumnName)
- }
- if hasSeqMeta {
- meta = append(meta, iceberg.LastUpdatedSequenceNumberColumnName)
- }
-
- return meta
-}
-
-// Any metadata which is in selectedFieldsMeta and not in schemaMeta is a
synthesis meta
-func synthesizeMeta(selectedFieldsMeta []string, schemaMeta []string) []string
{
- synthesis := []string{}
-
- for _, f := range selectedFieldsMeta {
- if !slices.Contains(schemaMeta, f) {
- synthesis = append(synthesis, f)
- }
- }
-
- return synthesis
-}
diff --git a/table/scanner_internal_test.go b/table/scanner_internal_test.go
index 789d9906..077f315b 100644
--- a/table/scanner_internal_test.go
+++ b/table/scanner_internal_test.go
@@ -19,8 +19,6 @@ package table
import (
"runtime"
- "slices"
- "strconv"
"sync"
"sync/atomic"
"testing"
@@ -238,98 +236,146 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t
*testing.T) {
assert.ErrorContains(t, err, "id 999")
}
-// TestProjectionV3PreLineageFile verifies that Projection() succeeds and
returns
-// _row_id and _last_updated_sequence_number as nullable (all-null-capable)
fields when
-// the table is v3 with next-row-id set but the data file predates row lineage
(those
-// columns are absent from the schema).
-func TestProjectionV3PreLineageFile(t *testing.T) {
- schema := iceberg.NewSchema(
- 1,
- iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
- iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
- )
+// TestSynthesizeRowLineageColumns verifies that _row_id and
_last_updated_sequence_number
+// are filled from task constants when those columns are present and null.
+func TestSynthesizeRowLineageColumns(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ ctx := compute.WithAllocator(t.Context(), mem)
+ defer mem.AssertSize(t, 0)
+ firstRowID := int64(1000)
+ dataSeqNum := int64(5)
+ task := FileScanTask{FirstRowID: &firstRowID, DataSequenceNumber:
&dataSeqNum}
+ rowOffset := int64(0)
- metadata, err := NewMetadata(
- schema,
- iceberg.UnpartitionedSpec,
- UnsortedSortOrder,
- "s3://test-bucket/test_table",
- iceberg.Properties{"format-version": "3"},
+ // Build a batch with a data column plus _row_id and
_last_updated_sequence_number (all nulls).
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "x", Type: arrow.PrimitiveTypes.Int64, Nullable:
true},
+ {Name: iceberg.RowIDColumnName, Type:
arrow.PrimitiveTypes.Int64, Nullable: true},
+ {Name: iceberg.LastUpdatedSequenceNumberColumnName,
Type: arrow.PrimitiveTypes.Int64, Nullable: true},
+ },
+ nil,
)
+ const nrows = 3
+ xBldr := array.NewInt64Builder(mem)
+ defer xBldr.Release()
+ xBldr.AppendValues([]int64{1, 2, 3}, nil)
+ rowIDBldr := array.NewInt64Builder(mem)
+ defer rowIDBldr.Release()
+ rowIDBldr.AppendNulls(nrows)
+ seqBldr := array.NewInt64Builder(mem)
+ defer seqBldr.Release()
+ seqBldr.AppendNulls(nrows)
+
+ xArr := xBldr.NewArray()
+ rowIDArr := rowIDBldr.NewArray()
+ seqArr := seqBldr.NewArray()
+ batch := array.NewRecordBatch(schema, []arrow.Array{xArr, rowIDArr,
seqArr}, nrows)
+ xArr.Release()
+ rowIDArr.Release()
+ seqArr.Release()
+ defer batch.Release()
+
+ out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch)
require.NoError(t, err)
- assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+ defer out.Release()
- // Request the two user columns plus both row-lineage metadata columns.
- // These metadata columns do NOT exist in the physical schema of a
pre-lineage file.
- scan := &Scan{
- metadata: metadata,
- selectedFields: []string{"id", "payload",
iceberg.RowIDColumnName, iceberg.LastUpdatedSequenceNumberColumnName},
- caseSensitive: true,
+ // _row_id should be 1000, 1001, 1002
+ rowIDCol := out.Column(1).(*array.Int64)
+ require.Equal(t, nrows, rowIDCol.Len())
+ for i := 0; i < nrows; i++ {
+ assert.False(t, rowIDCol.IsNull(i), "row %d", i)
+ assert.EqualValues(t, 1000+int64(i), rowIDCol.Value(i), "row
%d", i)
}
+ // _last_updated_sequence_number should be 5 for all
+ seqCol := out.Column(2).(*array.Int64)
+ for i := 0; i < nrows; i++ {
+ assert.False(t, seqCol.IsNull(i), "row %d", i)
+ assert.EqualValues(t, 5, seqCol.Value(i), "row %d", i)
+ }
+ assert.EqualValues(t, 3, rowOffset)
+}
- proj, err := scan.Projection()
- require.NoError(t, err, "Projection must not error for pre-lineage
metadata columns")
- require.NotNil(t, proj)
-
- fields := proj.Fields()
- require.Len(t, fields, 4, "projected schema must contain all four
requested fields")
+// TestSynthesizeRowLineageColumnsPreservesExplicit covers the spec's null-
+// coalescing rule: if a row already has an explicit (non-null) value in the
+// source file, that value MUST be preserved; only null entries inherit the
+// file-level constants. This is the case that arises when rewriting a file
+// that already carries explicit lineage from a prior rewrite.
+func TestSynthesizeRowLineageColumnsPreservesExplicit(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ ctx := compute.WithAllocator(t.Context(), mem)
+ defer mem.AssertSize(t, 0)
- fieldByName := make(map[string]iceberg.NestedField, len(fields))
- for _, f := range fields {
- fieldByName[f.Name] = f
- }
+ firstRowID := int64(1000)
+ dataSeqNum := int64(5)
+ task := FileScanTask{FirstRowID: &firstRowID, DataSequenceNumber:
&dataSeqNum}
+ rowOffset := int64(0)
- // Regular columns must survive unchanged.
- idField, ok := fieldByName["id"]
- require.True(t, ok, "id must be in projection")
- assert.Equal(t, 1, idField.ID)
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "x", Type: arrow.PrimitiveTypes.Int64, Nullable:
true},
+ {Name: iceberg.RowIDColumnName, Type:
arrow.PrimitiveTypes.Int64, Nullable: true},
+ {Name: iceberg.LastUpdatedSequenceNumberColumnName,
Type: arrow.PrimitiveTypes.Int64, Nullable: true},
+ },
+ nil,
+ )
+ const nrows = 3
- payloadField, ok := fieldByName["payload"]
- require.True(t, ok, "payload must be in projection")
- assert.Equal(t, 2, payloadField.ID)
+ xBldr := array.NewInt64Builder(mem)
+ defer xBldr.Release()
+ xBldr.AppendValues([]int64{1, 2, 3}, nil)
- // Row lineage columns must be present as optional (nullable) fields —
the scanner
- // will return all-nulls for any data file that was written before row
lineage existed.
- rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
- require.True(t, ok, "_row_id must be in projection")
- assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
- assert.False(t, rowIDField.Required, "_row_id must be optional
(nullable) for pre-lineage files")
+ // Mixed _row_id: explicit 42, null, explicit 99. Non-null values must
+ // survive untouched. Null gets firstRowID + row position = 1000 + 1.
+ rowIDBldr := array.NewInt64Builder(mem)
+ defer rowIDBldr.Release()
+ rowIDBldr.AppendValues([]int64{42, 0, 99}, []bool{true, false, true})
- seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
- require.True(t, ok, "_last_updated_sequence_number must be in
projection")
- assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID,
"_last_updated_sequence_number field ID")
- assert.False(t, seqField.Required, "_last_updated_sequence_number must
be optional (nullable) for pre-lineage files")
-}
+ // Mixed _last_updated_sequence_number: explicit 7, null, explicit 9.
+ // Null gets task.DataSequenceNumber = 5; others survive.
+ seqBldr := array.NewInt64Builder(mem)
+ defer seqBldr.Release()
+ seqBldr.AppendValues([]int64{7, 0, 9}, []bool{true, false, true})
-func TestProjectionV3PreLineageFileCaseSensitive(t *testing.T) {
- schema := iceberg.NewSchema(
- 1,
- iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
- iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
- )
+ xArr := xBldr.NewArray()
+ rowIDArr := rowIDBldr.NewArray()
+ seqArr := seqBldr.NewArray()
+ batch := array.NewRecordBatch(schema, []arrow.Array{xArr, rowIDArr,
seqArr}, nrows)
+ xArr.Release()
+ rowIDArr.Release()
+ seqArr.Release()
+ defer batch.Release()
- metadata, err := NewMetadata(
- schema,
- iceberg.UnpartitionedSpec,
- UnsortedSortOrder,
- "s3://test-bucket/test_table",
- iceberg.Properties{"format-version": "3"},
- )
+ out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch)
require.NoError(t, err)
- assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+ defer out.Release()
- scan := &Scan{
- metadata: metadata,
- selectedFields: []string{"id", "payload", "_Row_Id"},
- caseSensitive: true,
- }
+ rowIDCol := out.Column(1).(*array.Int64)
+ require.Equal(t, nrows, rowIDCol.Len())
+ assert.False(t, rowIDCol.IsNull(0))
+ assert.EqualValues(t, 42, rowIDCol.Value(0), "explicit _row_id must
survive")
+ assert.False(t, rowIDCol.IsNull(1))
+ assert.EqualValues(t, 1001, rowIDCol.Value(1), "null _row_id must
inherit firstRowID + position")
+ assert.False(t, rowIDCol.IsNull(2))
+ assert.EqualValues(t, 99, rowIDCol.Value(2), "explicit _row_id must
survive even with a different value")
- _, err = scan.Projection()
- require.Error(t, err)
- require.ErrorContains(t, err, "could not find column _Row_Id")
+ seqCol := out.Column(2).(*array.Int64)
+ require.Equal(t, nrows, seqCol.Len())
+ assert.False(t, seqCol.IsNull(0))
+ assert.EqualValues(t, 7, seqCol.Value(0), "explicit seq must survive")
+ assert.False(t, seqCol.IsNull(1))
+ assert.EqualValues(t, 5, seqCol.Value(1), "null seq must inherit
task.DataSequenceNumber")
+ assert.False(t, seqCol.IsNull(2))
+ assert.EqualValues(t, 9, seqCol.Value(2), "explicit seq must survive
even with a different value")
+
+ assert.EqualValues(t, 3, rowOffset)
}
-func TestProjectionV3PreLineageFileCaseInsensitive(t *testing.T) {
+// TestProjectionV3SelectRowLineageColumns verifies that explicitly selecting
+// _row_id (and _last_updated_sequence_number) on a v3 table yields a
projection
+// containing those metadata columns, even though they are not declared in the
+// user schema. This is the legacy "Select(_row_id)" escape hatch.
+func TestProjectionV3SelectRowLineageColumns(t *testing.T) {
schema := iceberg.NewSchema(
1,
iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
@@ -348,44 +394,37 @@ func TestProjectionV3PreLineageFileCaseInsensitive(t
*testing.T) {
scan := &Scan{
metadata: metadata,
- selectedFields: []string{"id", "payload", "_Row_Id",
"_Last_Updated_SEQUENCE_number"},
- caseSensitive: false,
+ selectedFields: []string{"id", "payload",
iceberg.RowIDColumnName, iceberg.LastUpdatedSequenceNumberColumnName},
+ caseSensitive: true,
}
proj, err := scan.Projection()
- require.NoError(t, err)
+ require.NoError(t, err, "Projection must accept lineage column names on
v3")
require.NotNil(t, proj)
fields := proj.Fields()
- require.Len(t, fields, 4, "projected schema must contain all four
requested fields")
+ require.Len(t, fields, 4, "projected schema must contain id, payload,
_row_id, _last_updated_sequence_number")
fieldByName := make(map[string]iceberg.NestedField, len(fields))
for _, f := range fields {
fieldByName[f.Name] = f
}
- idField, ok := fieldByName["id"]
- require.True(t, ok, "id must be in projection")
- assert.Equal(t, 1, idField.ID)
-
- payloadField, ok := fieldByName["payload"]
- require.True(t, ok, "payload must be in projection")
- assert.Equal(t, 2, payloadField.ID)
-
rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
require.True(t, ok, "_row_id must be in projection")
- assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
- assert.False(t, rowIDField.Required, "_row_id must be optional
(nullable) for pre-lineage files")
+ assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID)
+ assert.False(t, rowIDField.Required, "_row_id must be optional")
seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
require.True(t, ok, "_last_updated_sequence_number must be in
projection")
- assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID,
"_last_updated_sequence_number field ID")
- assert.False(t, seqField.Required, "_last_updated_sequence_number must
be optional (nullable) for pre-lineage files")
+ assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID)
+ assert.False(t, seqField.Required, "_last_updated_sequence_number must
be optional")
}
-// TestProjectionV2RowLineage asserts that requesting row-lineage metadata
columns on a v1 or v2
-// table does not use the v3-only synthesis path
-func TestProjectionV2RowLineage(t *testing.T) {
+// TestProjectionRowLineageRejectedOnV1V2 asserts that requesting a row-lineage
+// metadata column via Select on a v1 or v2 table is an error (those format
+// versions do not support row lineage).
+func TestProjectionRowLineageRejectedOnV1V2(t *testing.T) {
schema := iceberg.NewSchema(
1,
iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
@@ -394,10 +433,10 @@ func TestProjectionV2RowLineage(t *testing.T) {
for _, tc := range []struct {
name string
- ver int
+ ver string
}{
- {name: "v1", ver: 1},
- {name: "v2", ver: 2},
+ {name: "v1", ver: "1"},
+ {name: "v2", ver: "2"},
} {
t.Run(tc.name, func(t *testing.T) {
metadata, err := NewMetadata(
@@ -405,10 +444,9 @@ func TestProjectionV2RowLineage(t *testing.T) {
iceberg.UnpartitionedSpec,
UnsortedSortOrder,
"s3://test-bucket/test_table",
- iceberg.Properties{PropertyFormatVersion:
strconv.Itoa(tc.ver)},
+ iceberg.Properties{PropertyFormatVersion:
tc.ver},
)
require.NoError(t, err)
- assert.Equal(t, tc.ver, metadata.Version(), "sanity:
metadata format version")
scan := &Scan{
metadata: metadata,
@@ -417,21 +455,20 @@ func TestProjectionV2RowLineage(t *testing.T) {
}
_, err = scan.Projection()
- require.Error(t, err)
- assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+ require.Error(t, err, "Projection must reject lineage
columns on pre-v3 tables")
+ assert.ErrorIs(t, err, ErrInvalidOperation)
assert.ErrorContains(t, err, iceberg.RowIDColumnName)
+ assert.ErrorContains(t, err, "format version")
})
}
}
-// TestProjectionV3SchemaWithRowIDOnly covers a v3 table whose schema
-// already declares _row_id (reserved field id) but does not declare
_last_updated_sequence_number.
-func TestProjectionV3SchemaWithRowIDOnly(t *testing.T) {
+// TestProjectionWithRowLineageRequiresV3 asserts that the WithRowLineage scan
+// option errors out on Projection() when the table format version is below 3.
+func TestProjectionWithRowLineageRequiresV3(t *testing.T) {
schema := iceberg.NewSchema(
1,
iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
- iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
- iceberg.RowID(),
)
metadata, err := NewMetadata(
@@ -439,67 +476,31 @@ func TestProjectionV3SchemaWithRowIDOnly(t *testing.T) {
iceberg.UnpartitionedSpec,
UnsortedSortOrder,
"s3://test-bucket/test_table",
- iceberg.Properties{"format-version": "3"},
+ iceberg.Properties{PropertyFormatVersion: "2"},
)
require.NoError(t, err)
- assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
scan := &Scan{
- metadata: metadata,
- selectedFields: []string{
- "id", "payload",
- iceberg.RowIDColumnName,
- iceberg.LastUpdatedSequenceNumberColumnName,
- },
- caseSensitive: true,
+ metadata: metadata,
+ selectedFields: []string{"*"},
+ caseSensitive: true,
+ includeRowLineage: true,
}
- var proj *iceberg.Schema
- require.NotPanics(t, func() {
- var perr error
- proj, perr = scan.Projection()
- require.NoError(t, perr)
- })
- require.NotNil(t, proj)
-
- fields := proj.Fields()
- require.Len(t, fields, 4, "projection must include id, payload,
_row_id, _last_updated_sequence_number")
-
- fieldByName := make(map[string]iceberg.NestedField, len(fields))
- idsSeen := make(map[int]string, len(fields))
- for _, f := range fields {
- if prev, dup := idsSeen[f.ID]; dup {
- t.Fatalf("duplicate field id %d: %q and %q", f.ID,
prev, f.Name)
- }
- idsSeen[f.ID] = f.Name
- fieldByName[f.Name] = f
- }
-
- idField, ok := fieldByName["id"]
- require.True(t, ok)
- assert.Equal(t, 1, idField.ID)
-
- payloadField, ok := fieldByName["payload"]
- require.True(t, ok)
- assert.Equal(t, 2, payloadField.ID)
-
- rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
- require.True(t, ok)
- assert.NotEqual(t, iceberg.RowIDFieldID, rowIDField.ID) // NewMetadata
reorders schema field numbers
- assert.False(t, rowIDField.Required)
-
- seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
- require.True(t, ok)
- assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID)
- assert.False(t, seqField.Required)
+ _, err = scan.Projection()
+ require.Error(t, err, "WithRowLineage on a v2 table must be an explicit
error")
+ assert.ErrorContains(t, err, "row lineage")
}
-func TestProjectionV3SchemaWithLastUpdatedSequenceNumberOnly(t *testing.T) {
+// TestProjectionV3SchemaAlreadyHasRowID covers the case where the user schema
+// already declares _row_id (a reserved field id, but legal in v3). The
+// projection helper must be idempotent and not panic on the duplicate ID.
+func TestProjectionV3SchemaAlreadyHasRowID(t *testing.T) {
schema := iceberg.NewSchema(
1,
iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
- iceberg.LastUpdatedSequenceNumber(),
+ iceberg.RowID(),
)
metadata, err := NewMetadata(
@@ -513,154 +514,23 @@ func
TestProjectionV3SchemaWithLastUpdatedSequenceNumberOnly(t *testing.T) {
assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
scan := &Scan{
- metadata: metadata,
- selectedFields: []string{
- "id", "payload",
- iceberg.RowIDColumnName,
- iceberg.LastUpdatedSequenceNumberColumnName,
- },
- caseSensitive: true,
+ metadata: metadata,
+ selectedFields: []string{"*"},
+ caseSensitive: true,
+ includeRowLineage: true,
}
- var proj *iceberg.Schema
require.NotPanics(t, func() {
- var perr error
- proj, perr = scan.Projection()
+ proj, perr := scan.Projection()
require.NoError(t, perr)
- })
- require.NotNil(t, proj)
+ require.NotNil(t, proj)
- fields := proj.Fields()
- require.Len(t, fields, 4, "projection must include id, payload,
_row_id, _last_updated_sequence_number")
-
- fieldByName := make(map[string]iceberg.NestedField, len(fields))
- idsSeen := make(map[int]string, len(fields))
- for _, f := range fields {
- if prev, dup := idsSeen[f.ID]; dup {
- t.Fatalf("duplicate field id %d: %q and %q", f.ID,
prev, f.Name)
+ seen := make(map[int]string, len(proj.Fields()))
+ for _, f := range proj.Fields() {
+ if prev, dup := seen[f.ID]; dup {
+ t.Fatalf("duplicate field id %d: %q and %q",
f.ID, prev, f.Name)
+ }
+ seen[f.ID] = f.Name
}
- idsSeen[f.ID] = f.Name
- fieldByName[f.Name] = f
- }
-
- idField, ok := fieldByName["id"]
- require.True(t, ok)
- assert.Equal(t, 1, idField.ID)
-
- payloadField, ok := fieldByName["payload"]
- require.True(t, ok)
- assert.Equal(t, 2, payloadField.ID)
-
- rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
- require.True(t, ok)
- assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID)
- assert.False(t, rowIDField.Required)
-
- seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
- require.True(t, ok)
- assert.NotEqual(t, iceberg.LastUpdatedSequenceNumberFieldID,
seqField.ID) // NewMetadata reorders schema field numbers
- assert.False(t, seqField.Required)
-}
-
-// TestSynthesizeRowLineageColumns verifies that _row_id and
_last_updated_sequence_number
-// are filled from task constants when those columns are present and null.
-func TestSynthesizeRowLineageColumns(t *testing.T) {
- mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
- ctx := compute.WithAllocator(t.Context(), mem)
- defer mem.AssertSize(t, 0)
- firstRowID := int64(1000)
- dataSeqNum := int64(5)
- task := FileScanTask{FirstRowID: &firstRowID, DataSequenceNumber:
&dataSeqNum}
- rowOffset := int64(0)
-
- // Build a batch with a data column plus _row_id and
_last_updated_sequence_number (all nulls).
- schema := arrow.NewSchema(
- []arrow.Field{
- {Name: "x", Type: arrow.PrimitiveTypes.Int64, Nullable:
true},
- {Name: iceberg.RowIDColumnName, Type:
arrow.PrimitiveTypes.Int64, Nullable: true},
- {Name: iceberg.LastUpdatedSequenceNumberColumnName,
Type: arrow.PrimitiveTypes.Int64, Nullable: true},
- },
- nil,
- )
- const nrows = 3
- xBldr := array.NewInt64Builder(mem)
- defer xBldr.Release()
- xBldr.AppendValues([]int64{1, 2, 3}, nil)
- rowIDBldr := array.NewInt64Builder(mem)
- defer rowIDBldr.Release()
- rowIDBldr.AppendNulls(nrows)
- seqBldr := array.NewInt64Builder(mem)
- defer seqBldr.Release()
- seqBldr.AppendNulls(nrows)
-
- xArr := xBldr.NewArray()
- rowIDArr := rowIDBldr.NewArray()
- seqArr := seqBldr.NewArray()
- batch := array.NewRecordBatch(schema, []arrow.Array{xArr, rowIDArr,
seqArr}, nrows)
- xArr.Release()
- rowIDArr.Release()
- seqArr.Release()
- defer batch.Release()
-
- out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch)
- require.NoError(t, err)
- defer out.Release()
-
- // _row_id should be 1000, 1001, 1002
- rowIDCol := out.Column(1).(*array.Int64)
- require.Equal(t, nrows, rowIDCol.Len())
- for i := 0; i < nrows; i++ {
- assert.False(t, rowIDCol.IsNull(i), "row %d", i)
- assert.EqualValues(t, 1000+int64(i), rowIDCol.Value(i), "row
%d", i)
- }
- // _last_updated_sequence_number should be 5 for all
- seqCol := out.Column(2).(*array.Int64)
- for i := 0; i < nrows; i++ {
- assert.False(t, seqCol.IsNull(i), "row %d", i)
- assert.EqualValues(t, 5, seqCol.Value(i), "row %d", i)
- }
- assert.EqualValues(t, 3, rowOffset)
-}
-
-func TestRemoveMetadataFromSelectedFields(t *testing.T) {
- selectedFields := []string{
- "id",
- "payload",
- }
-
- metaFields := []string{
- "_row_id",
- }
-
- sf, mf := removeMetadataFromSelectedFields(selectedFields, metaFields)
-
- assert.Equal(t, 2, len(sf))
- assert.Equal(t, 0, len(mf))
-
- assert.True(t, slices.Contains(sf, "id"))
- assert.True(t, slices.Contains(sf, "payload"))
-}
-
-func TestRemoveMetadataFromSelectedFieldsCasing(t *testing.T) {
- selectedFields := []string{
- "id",
- "payload",
- "_ROW_Id",
- "lastupdatedsequence_number",
- }
-
- metaFields := []string{
- "_row_id",
- "_last_updated_sequence_number",
- }
-
- sf, mf := removeMetadataFromSelectedFields(selectedFields, metaFields)
-
- assert.Equal(t, 3, len(sf))
- assert.Equal(t, 1, len(mf))
-
- assert.True(t, slices.Contains(sf, "id"))
- assert.True(t, slices.Contains(sf, "payload"))
- assert.True(t, slices.Contains(sf, "lastupdatedsequence_number"))
- assert.True(t, slices.Contains(mf, iceberg.RowID()))
+ })
}
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 4e98a822..05430710 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -917,6 +917,11 @@ func (sp *snapshotProducer) commit(ctx context.Context) (_
[]Update, _ []Require
return nil, nil, err
}
if writer.NextRowID() != nil {
+ // addedRows counts ALL rows in new manifests (existing
+ added), even
+ // for rewrites where survivors preserve old _row_id
values. This
+ // "wastes" ID space but doesn't violate uniqueness:
actual row IDs come
+ // from the explicit Parquet column, not the global
counter. Java's
+ // ManifestListWriter.V3Writer uses the same accounting.
addedRows = *writer.NextRowID() - firstRowID
}
} else {
diff --git a/table/table.go b/table/table.go
index 3dd78057..a9a725af 100644
--- a/table/table.go
+++ b/table/table.go
@@ -816,6 +816,17 @@ func WithOptions(opts iceberg.Properties) ScanOption {
}
}
+// WithRowLineage projects the row-lineage metadata columns (_row_id and
+// _last_updated_sequence_number) so that row identity and per-row update
+// sequence are preserved through rewrites and compactions. Requires a v3
+// table — calling Scan.Projection on a v1/v2 table after applying this
+// option returns an error.
+func WithRowLineage() ScanOption {
+ return func(scan *Scan) {
+ scan.includeRowLineage = true
+ }
+}
+
func (t Table) Scan(opts ...ScanOption) *Scan {
s := &Scan{
metadata: t.metadata,
diff --git a/table/transaction.go b/table/transaction.go
index 72d887dd..526bed6c 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -523,7 +523,15 @@ func validateDataFilePartitionData(df iceberg.DataFile,
spec *iceberg.PartitionS
// validateDataFilesToAdd performs metadata-only validation for caller-provided
// DataFiles and returns a set of paths that passed validation.
-func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile,
operation string) (map[string]struct{}, error) {
+//
+// requireFirstRowID gates the v3 first_row_id check: callers handing in
+// externally-written parquet files (e.g. [Transaction.AddDataFiles]) must set
+// first_row_id explicitly because the library cannot fabricate row IDs for
+// files it did not write. Library-internal rewrite paths (compaction via
+// [withRewriteSemantics]) pass false because the manifest-list writer assigns
+// first_row_id via inheritance at write time and the output files carry
+// explicit _row_id columns that win on read regardless.
+func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile,
operation string, requireFirstRowID bool) (map[string]struct{}, error) {
currentSpec, err := t.meta.CurrentSpec()
if err != nil {
return nil, fmt.Errorf("could not get current partition spec:
%w", err)
@@ -569,7 +577,7 @@ func (t *Transaction) validateDataFilesToAdd(dataFiles
[]iceberg.DataFile, opera
return nil, fmt.Errorf("data file %s has invalid
partition data for %s: %w", path, operation, err)
}
- if t.meta.formatVersion >= 3 && df.FirstRowID() == nil {
+ if requireFirstRowID && t.meta.formatVersion >= 3 &&
df.FirstRowID() == nil {
return nil, fmt.Errorf(
"data file %s is missing first_row_id which is
required for v3 tables for %s: use DataFileBuilder.FirstRowID() to set it
explicitly",
path, operation)
@@ -668,7 +676,7 @@ func (t *Transaction) AddDataFiles(ctx context.Context,
dataFiles []iceberg.Data
o(&cfg)
}
- setToAdd, err := t.validateDataFilesToAdd(dataFiles, "AddDataFiles")
+ setToAdd, err := t.validateDataFilesToAdd(dataFiles, "AddDataFiles",
true)
if err != nil {
return err
}
@@ -752,7 +760,7 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx
context.Context, filesTo
o(&cfg)
}
- setToAdd, err := t.validateDataFilesToAdd(filesToAdd,
"ReplaceDataFilesWithDataFiles")
+ setToAdd, err := t.validateDataFilesToAdd(filesToAdd,
"ReplaceDataFilesWithDataFiles", !cfg.rewriteSemantics)
if err != nil {
return err
}
@@ -852,7 +860,7 @@ func (t *Transaction) ReplaceFiles(ctx context.Context,
dataFilesToDelete, dataF
o(&cfg)
}
- setToAdd, err := t.validateDataFilesToAdd(dataFilesToAdd,
"ReplaceFiles")
+ setToAdd, err := t.validateDataFilesToAdd(dataFilesToAdd,
"ReplaceFiles", !cfg.rewriteSemantics)
if err != nil {
return err
}
@@ -1215,7 +1223,7 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx
context.Context, operation
commitUUID := uuid.New()
updater := t.updateSnapshot(fs, snapshotProps,
operation).mergeOverwrite(&commitUUID, filter)
- filesToDelete, filesToRewrite, err := t.classifyFilesForDeletions(ctx,
fs, filter, caseSensitive, concurrency)
+ filesToDelete, filesToRewrite, fileSeqByPath, err :=
t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency)
if err != nil {
return nil, err
}
@@ -1225,7 +1233,7 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx
context.Context, operation
}
if len(filesToRewrite) > 0 {
- if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, filter, caseSensitive, concurrency); err != nil {
+ if err := t.rewriteFilesWithFilter(ctx, fs, updater,
filesToRewrite, fileSeqByPath, filter, caseSensitive, concurrency); err != nil {
return nil, err
}
}
@@ -1254,7 +1262,7 @@ func (t *Transaction) performMergeOnReadDeletion(ctx
context.Context, snapshotPr
commitUUID := uuid.New()
updater := t.updateSnapshot(fs, snapshotProps,
OpDelete).mergeOverwrite(&commitUUID, filter)
- filesToDelete, withPartialDeletions, err :=
t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency)
+ filesToDelete, withPartialDeletions, _, err :=
t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency)
if err != nil {
return nil, err
}
@@ -1355,24 +1363,26 @@ func (t *Transaction) Delete(ctx context.Context,
filter iceberg.BooleanExpressi
}
// classifyFilesForDeletions classifies existing data files based on the
provided filter.
-// Returns files to delete completely, files to rewrite partially, and any
error.
-func (t *Transaction) classifyFilesForDeletions(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int)
(filesToDelete, filesWithPartialDeletions []iceberg.DataFile, err error) {
+// Returns files to delete completely, files to rewrite partially, the
+// per-file file_sequence_number map for rewrite candidates (used to
+// synthesize row-lineage columns when reading), and any error.
+func (t *Transaction) classifyFilesForDeletions(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int)
(filesToDelete, filesWithPartialDeletions []iceberg.DataFile, fileSeqByPath
map[string]*int64, err error) {
s := t.meta.currentSnapshot()
if s == nil {
- return nil, nil, nil
+ return nil, nil, nil, nil
}
if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
for df, err := range s.dataFiles(fs, nil) {
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
if df.ContentType() == iceberg.EntryContentData {
filesToDelete = append(filesToDelete, df)
}
}
- return filesToDelete, filesWithPartialDeletions, nil
+ return filesToDelete, filesWithPartialDeletions, nil, nil
}
return t.classifyFilesForFilteredDeletions(ctx, fs, filter,
caseSensitive, concurrency)
@@ -1405,22 +1415,23 @@ func (t *fileClassificationTask)
buildPartitionProjection(specID int) (iceberg.B
}
// classifyFilesForFilteredDeletions classifies files for filtered overwrite
operations.
-// Returns files to delete completely, files to rewrite partially, and any
error.
-func (t *Transaction) classifyFilesForFilteredDeletions(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency
int) (filesToDelete, filesWithPartialDeletes []iceberg.DataFile, err error) {
+// Returns files to delete completely, files to rewrite partially, the
+// per-file file_sequence_number map for rewrite candidates, and any error.
+func (t *Transaction) classifyFilesForFilteredDeletions(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency
int) (filesToDelete, filesWithPartialDeletes []iceberg.DataFile, fileSeqByPath
map[string]*int64, err error) {
schema := t.meta.CurrentSchema()
meta, err := t.meta.Build()
if err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
caseSensitive, false)
if err != nil {
- return nil, nil, fmt.Errorf("failed to create inclusive metrics
evaluator: %w", err)
+ return nil, nil, nil, fmt.Errorf("failed to create inclusive
metrics evaluator: %w", err)
}
strictEvaluator, err := newStrictMetricsEvaluator(schema, filter,
caseSensitive, false)
if err != nil {
- return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
+ return nil, nil, nil, fmt.Errorf("failed to create strict
metrics evaluator: %w", err)
}
classificationTask := newFileClassificationTask(meta, filter,
caseSensitive)
@@ -1431,11 +1442,12 @@ func (t *Transaction)
classifyFilesForFilteredDeletions(ctx context.Context, fs
if s != nil {
manifests, err = s.Manifests(fs)
if err != nil {
- return nil, nil, fmt.Errorf("failed to get manifests:
%w", err)
+ return nil, nil, nil, fmt.Errorf("failed to get
manifests: %w", err)
}
}
var mu sync.Mutex
+ fileSeqByPath = make(map[string]*int64)
g, _ := errgroup.WithContext(ctx)
g.SetLimit(min(concurrency, len(manifests)))
@@ -1456,6 +1468,7 @@ func (t *Transaction)
classifyFilesForFilteredDeletions(ctx context.Context, fs
localDelete := make([]iceberg.DataFile, 0)
localRewrite := make([]iceberg.DataFile, 0)
+ localSeqByPath := make(map[string]*int64)
for entry, err := range manifest.Entries(fs, false) {
if err != nil {
@@ -1488,6 +1501,19 @@ func (t *Transaction)
classifyFilesForFilteredDeletions(ctx context.Context, fs
localDelete = append(localDelete, df)
} else {
localRewrite = append(localRewrite, df)
+ // Capture the entry's data sequence
number so the
+ // rewrite path can synthesize
+ // _last_updated_sequence_number for
source rows that
+ // have a null value (or no column) in
the file. Per
+ // spec the synthesized value is the
manifest entry's
+ // sequence_number (field id 3, the
data sequence
+ // number) — not file_sequence_number
(field id 4) —
+ // so back-dated EXISTING entries
assign the correct
+ // value to surviving rows.
+ if seq := entry.SequenceNum(); seq >= 0
{
+ s := seq
+ localSeqByPath[df.FilePath()] =
&s
+ }
}
}
@@ -1495,6 +1521,9 @@ func (t *Transaction)
classifyFilesForFilteredDeletions(ctx context.Context, fs
mu.Lock()
filesToDelete = append(filesToDelete,
localDelete...)
filesWithPartialDeletes =
append(filesWithPartialDeletes, localRewrite...)
+ for k, v := range localSeqByPath {
+ fileSeqByPath[k] = v
+ }
mu.Unlock()
}
@@ -1503,20 +1532,42 @@ func (t *Transaction)
classifyFilesForFilteredDeletions(ctx context.Context, fs
}
if err := g.Wait(); err != nil {
- return nil, nil, err
+ return nil, nil, nil, err
}
- return filesToDelete, filesWithPartialDeletes, nil
+ return filesToDelete, filesWithPartialDeletes, fileSeqByPath, nil
}
-// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
-func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, caseSensitive bool, concurrency int) error {
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter.
+// fileSeqByPath maps each file's path to its file_sequence_number from the
source manifest entry,
+// used to synthesize _last_updated_sequence_number when the source row's
value is null. The
+// filter binding and substrait conversion are computed once here and reused
across every file.
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO,
updater *snapshotProducer, files []iceberg.DataFile, fileSeqByPath
map[string]*int64, filter iceberg.BooleanExpression, caseSensitive bool,
concurrency int) error {
complementFilter := iceberg.NewNot(filter)
+ // Bind + convert the complement filter once for the whole rewrite. The
+ // per-batch filter function is reused across every record batch from
+ // every file in this rewrite — substrait conversion in particular is
+ // non-trivial so doing it inside the iterator loop wastes work.
+ postFilter, err := prepareBatchFilter(complementFilter,
t.meta.CurrentSchema(), caseSensitive)
+ if err != nil {
+ return err
+ }
+
for _, originalFile := range files {
// Use a separate UUID for rewrite operations to avoid filename
collisions with new data files
rewriteUUID := uuid.New()
- rewrittenFiles, err := t.rewriteSingleFile(ctx, fs,
originalFile, complementFilter, caseSensitive, rewriteUUID, concurrency)
+ args := rewriteSingleFileArgs{
+ fs: fs,
+ originalFile: originalFile,
+ fileSeqNum: fileSeqByPath[originalFile.FilePath()],
+ filter: complementFilter,
+ postFilter: postFilter,
+ caseSensitive: caseSensitive,
+ commitUUID: rewriteUUID,
+ concurrency: concurrency,
+ }
+ rewrittenFiles, err := t.rewriteSingleFile(ctx, args)
if err != nil {
return fmt.Errorf("failed to rewrite file %s: %w",
originalFile.FilePath(), err)
}
@@ -1530,17 +1581,82 @@ func (t *Transaction) rewriteFilesWithFilter(ctx
context.Context, fs io.IO, upda
return nil
}
-// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
-func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive
bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+// rewriteSingleFileArgs bundles the parameters for
[Transaction.rewriteSingleFile]
+// — there are several same-typed fields (two filter values, plus a UUID and an
+// int) so positional ordering offers no compile-time order protection.
+type rewriteSingleFileArgs struct {
+ fs io.IO
+ originalFile iceberg.DataFile
+ // fileSeqNum is the source file's data sequence number from its
+ // manifest entry; required to synthesize _last_updated_sequence_number
+ // for rows whose value is null in the source file.
+ fileSeqNum *int64
+ // filter is the per-row complement (rows that survive the rewrite).
+ filter iceberg.BooleanExpression
+ // postFilter is the pre-bound, pre-substrait-converted version of
+ // filter. When preserveRowLineage applies it is the only place the
+ // filter is evaluated; the scanner's row filter is bypassed so
+ // _row_id positions stay correct.
+ postFilter func(context.Context, arrow.RecordBatch)
(arrow.RecordBatch, error)
+ caseSensitive bool
+ commitUUID uuid.UUID
+ concurrency int
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes
+// new files with filtered data. See [rewriteSingleFileArgs] for parameter
+// semantics.
+func (t *Transaction) rewriteSingleFile(ctx context.Context, args
rewriteSingleFileArgs) ([]iceberg.DataFile, error) {
scanTask := &FileScanTask{
- File: originalFile,
+ File: args.originalFile,
Start: 0,
- Length: originalFile.FileSizeBytes(),
- }
-
- boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
caseSensitive)
- if err != nil {
- return nil, fmt.Errorf("failed to bind filter: %w", err)
+ Length: args.originalFile.FileSizeBytes(),
+ }
+
+ // Preserve row lineage for v3 tables: include _row_id and
+ // _last_updated_sequence_number in the scan projection so they are read
+ // from the source file (or synthesized from file metadata when null)
and
+ // written explicitly into the output Parquet. The Iceberg v3 spec says
+ // _last_updated_sequence_number on a data file row is the sequence
+ // number of the snapshot that last *logically* updated the row — for a
+ // pure CoW rewrite the surviving rows are unchanged, so they must keep
+ // their original value rather than inheriting the new commit's seq.
+ //
+ // When preserving lineage, the row filter must NOT be applied inside
the
+ // scanner because _row_id synthesis depends on original file position.
The
+ // filter is applied post-synthesis in the record iterator instead.
+ //
+ // TODO(perf): disabling the scan-time row filter forfeits both
manifest-
+ // and file-level pushdown for the rewrite read. For selective deletes
on
+ // wide partitions this means re-reading every survivor candidate
end-to-
+ // end. File-level skipping (against the bound filter's residual on file
+ // stats) could be re-enabled here without breaking _row_id synthesis,
+ // since synthesis depends only on within-file row positions.
+ preserveRowLineage := t.meta.formatVersion >= 3 &&
args.originalFile.FirstRowID() != nil
+ projectedSchema := t.meta.CurrentSchema()
+ var factoryOpts []writerFactoryOption
+ if preserveRowLineage {
+ projectedSchema = iceberg.SchemaWithRowLineage(projectedSchema)
+ factoryOpts = append(factoryOpts,
withFactoryFileSchema(projectedSchema))
+ scanTask.FirstRowID = args.originalFile.FirstRowID()
+ // fileSeqNum drives _last_updated_sequence_number synthesis for
+ // source rows whose value is null in the file (or for source
+ // files written before row lineage existed). When the column
+ // is non-null the existing per-row value wins, preserving the
+ // original update sequence across the rewrite.
+ scanTask.DataSequenceNumber = args.fileSeqNum
+ }
+
+ // When preserving row lineage, the bound scan-time filter is replaced
+ // with AlwaysTrue and the rebound version inside prepareBatchFilter is
+ // the one actually applied. Skip the wasted BindExpr in that case.
+ scanFilter := iceberg.BooleanExpression(iceberg.AlwaysTrue{})
+ if !preserveRowLineage {
+ boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(),
args.filter, args.caseSensitive)
+ if err != nil {
+ return nil, fmt.Errorf("failed to bind filter: %w", err)
+ }
+ scanFilter = boundFilter
}
meta, err := t.meta.Build()
@@ -1550,12 +1666,12 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
scanner := &arrowScan{
metadata: meta,
- fs: fs,
- projectedSchema: t.meta.CurrentSchema(),
- boundRowFilter: boundFilter,
- caseSensitive: caseSensitive,
+ fs: args.fs,
+ projectedSchema: projectedSchema,
+ boundRowFilter: scanFilter,
+ caseSensitive: args.caseSensitive,
rowLimit: -1, // No limit
- concurrency: concurrency,
+ concurrency: args.concurrency,
}
arrowSchema, recordIter, err := scanner.GetRecords(ctx,
[]FileScanTask{*scanTask})
@@ -1563,7 +1679,19 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
return nil, fmt.Errorf("failed to get records from original
file: %w", err)
}
- // Wrap the iterator to release records after consumption
+ // When preserving row lineage, use an Arrow schema with field IDs so
that
+ // recordsToDataFiles can resolve _row_id via field ID rather than the
name
+ // mapping (which doesn't include metadata columns).
+ if preserveRowLineage {
+ arrowSchema, err = SchemaToArrowSchema(projectedSchema, nil,
true, false)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build arrow schema
with field IDs: %w", err)
+ }
+ }
+
+ // Wrap the iterator to release records after consumption.
+ // When preserving row lineage, also apply the (pre-bound) filter here
+ // post-synthesis so _row_id positions stay correct.
releaseIter := func(yield func(arrow.RecordBatch, error) bool) {
for rec, err := range recordIter {
if err != nil {
@@ -1571,6 +1699,21 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
return
}
+
+ if preserveRowLineage {
+ rec, err = args.postFilter(ctx, rec)
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+ if rec.NumRows() == 0 {
+ rec.Release()
+
+ continue
+ }
+ }
+
if !yield(rec, nil) {
rec.Release()
@@ -1582,10 +1725,11 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
var result []iceberg.DataFile
itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta,
recordWritingArgs{
- sc: arrowSchema,
- itr: releaseIter,
- fs: fs.(io.WriteFileIO),
- writeUUID: &commitUUID,
+ sc: arrowSchema,
+ itr: releaseIter,
+ fs: args.fs.(io.WriteFileIO),
+ writeUUID: &args.commitUUID,
+ factoryOpts: factoryOpts,
})
for df, err := range itr {
@@ -1798,3 +1942,68 @@ func (s *StagedTable) Refresh(ctx context.Context)
(*Table, error) {
func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
}
+
+// prepareBatchFilter binds the given Iceberg filter against schema and
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes a per-call ctx so allocator/deadline values from
+// the call-site context propagate into compute (the extension-ID set is
+// attached internally on each call rather than baked into a captured ctx).
+//
+// Ownership: the returned function takes ownership of the input batch — the
+// AlwaysFalse fast-path and the bound-filter path both Release rec internally.
+// Callers must not Release the input batch separately. The function returns a
+// possibly-new batch with one reference owned by the caller; the caller is
+// responsible for releasing it.
+//
+// The bound substrait filter is evaluated against the input record's column
+// indices, so callers must pass the bound filter's schema verbatim — extra
+// trailing columns (e.g. row-lineage metadata) are tolerated only because
+// they live at indices past the filter's referenced fields.
+func prepareBatchFilter(filter iceberg.BooleanExpression, schema
*iceberg.Schema, caseSensitive bool) (func(context.Context, arrow.RecordBatch)
(arrow.RecordBatch, error), error) {
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ return func(_ context.Context, rec arrow.RecordBatch)
(arrow.RecordBatch, error) {
+ return rec, nil
+ }, nil
+ }
+
+ bound, err := iceberg.BindExpr(schema, filter, caseSensitive)
+ if err != nil {
+ return nil, fmt.Errorf("prepareBatchFilter: bind expression:
%w", err)
+ }
+
+ if bound == nil {
+ return func(_ context.Context, rec arrow.RecordBatch)
(arrow.RecordBatch, error) {
+ return rec, nil
+ }, nil
+ }
+
+ if bound.Equals(iceberg.AlwaysFalse{}) {
+ // Return a record with the same schema and zero rows.
NewSlice(0, 0)
+ // preserves the per-field arrays so downstream code that calls
+ // rec.Column(i) does not panic on the empty result.
+ return func(_ context.Context, rec arrow.RecordBatch)
(arrow.RecordBatch, error) {
+ defer rec.Release()
+
+ return rec.NewSlice(0, 0), nil
+ }, nil
+ }
+
+ extSet, substraitFilter, err := substrait.ConvertExpr(schema, bound,
caseSensitive)
+ if err != nil {
+ return nil, fmt.Errorf("prepareBatchFilter: convert expression:
%w", err)
+ }
+
+ return func(ctx context.Context, rec arrow.RecordBatch)
(arrow.RecordBatch, error) {
+ // Attach a fresh extension-ID set to the call-site ctx so
allocator
+ // and deadline values flow through to compute, while the
(immutable)
+ // extSet computed during prepare is reused across every batch.
+ ctx = exprs.WithExtensionIDSet(ctx,
exprs.NewExtensionSetDefault(*extSet))
+
+ return filterRecords(ctx, substraitFilter)(rec)
+ }, nil
+}
diff --git a/table/write_records.go b/table/write_records.go
index 3db89ab5..9c215fbf 100644
--- a/table/write_records.go
+++ b/table/write_records.go
@@ -35,10 +35,12 @@ import (
type WriteRecordOption func(*writeRecordConfig)
type writeRecordConfig struct {
- targetFileSize int64
- writeUUID *uuid.UUID
- maxWriteWorkers int
- clustered bool
+ targetFileSize int64
+ writeUUID *uuid.UUID
+ maxWriteWorkers int
+ clustered bool
+ fileSchema *iceberg.Schema
+ preserveRowLineage bool
}
// WithTargetFileSize overrides the table's default target file size.
@@ -94,6 +96,24 @@ func WithClusteredWrite() WriteRecordOption {
}
}
+// WithPreserveRowLineage sets the output file schema to include the v3 row-
+// lineage metadata columns (_row_id, _last_updated_sequence_number) so that
+// row identity is preserved through rewrites and compactions. The input
+// records must already carry _row_id (e.g. from a scan that projected the
+// lineage columns).
+//
+// [WriteRecords] validates this option against the table state and the input
+// Arrow schema: it errors when applied to a v1/v2 table or when the input
+// records don't include the _row_id column. The schema parameter is the
+// projected Iceberg schema (typically [iceberg.SchemaWithRowLineage]) and is
+// used to write the output Parquet's field IDs.
+func WithPreserveRowLineage(schema *iceberg.Schema) WriteRecordOption {
+ return func(c *writeRecordConfig) {
+ c.fileSchema = schema
+ c.preserveRowLineage = true
+ }
+}
+
// WriteRecords writes Arrow record batches to Parquet data files for the given
// table, returning an iterator of the resulting DataFile objects.
//
@@ -112,11 +132,6 @@ func WriteRecords(ctx context.Context, tbl *Table,
records iter.Seq2[arrow.RecordBatch, error],
opts ...WriteRecordOption,
) iter.Seq2[iceberg.DataFile, error] {
- if err := checkArrowSchemaCompat(tbl.Schema(), schema, false); err !=
nil {
- return internal.SingleErrorIter[iceberg.DataFile](
- fmt.Errorf("arrow schema is not compatible with the
table schema: %w", err))
- }
-
cfg := writeRecordConfig{}
for _, opt := range opts {
opt(&cfg)
@@ -127,6 +142,30 @@ func WriteRecords(ctx context.Context, tbl *Table,
errors.New("WithClusteredWrite and WithMaxWriteWorkers
are incompatible: the clustered write path is single-threaded"))
}
+ if cfg.preserveRowLineage {
+ if v := tbl.metadata.Version(); v < 3 {
+ return internal.SingleErrorIter[iceberg.DataFile](
+ fmt.Errorf("WithPreserveRowLineage requires a
v3+ table, got v%d", v))
+ }
+ if len(schema.FieldIndices(iceberg.RowIDColumnName)) == 0 {
+ return internal.SingleErrorIter[iceberg.DataFile](
+ fmt.Errorf("WithPreserveRowLineage requires
input records to include the %q column", iceberg.RowIDColumnName))
+ }
+ }
+
+ // Validate the input arrow schema against the projected schema. When
+ // row lineage is being preserved, the projected schema includes the
+ // reserved metadata columns; using tbl.Schema() instead would reject
+ // the lineage columns since they're not declared in the user schema.
+ checkSchema := tbl.Schema()
+ if cfg.fileSchema != nil {
+ checkSchema = cfg.fileSchema
+ }
+ if err := checkArrowSchemaCompat(checkSchema, schema, false); err !=
nil {
+ return internal.SingleErrorIter[iceberg.DataFile](
+ fmt.Errorf("arrow schema is not compatible with the
table schema: %w", err))
+ }
+
fs, err := tbl.fsF(ctx)
if err != nil {
return internal.SingleErrorIter[iceberg.DataFile](err)
@@ -174,5 +213,9 @@ func WriteRecords(ctx context.Context, tbl *Table,
clustered: cfg.clustered,
}
+ if cfg.fileSchema != nil {
+ args.factoryOpts = append(args.factoryOpts,
withFactoryFileSchema(cfg.fileSchema))
+ }
+
return recordsToDataFiles(ctx, tbl.Location(), meta, args)
}