This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 123a5583 feat(table): preserve row lineage through copy-on-write 
rewrites (#1099)
123a5583 is described below

commit 123a558339d5b83b0142a1305429a5dabf703d91
Author: Tanmay Rauth <[email protected]>
AuthorDate: Sat May 23 16:20:40 2026 +0530

    feat(table): preserve row lineage through copy-on-write rewrites (#1099)
    
    Parent: #999
    Preserves `_row_id` through CoW rewrites and compaction on v3 tables.
    The scanner synthesizes row IDs from file metadata, the filter is
    applied post-synthesis to preserve correct positions, and the writer
    stores `_row_id` explicitly in output Parquet.
    `_last_updated_sequence_number` is left null to inherit from the new
    file's `data_sequence_number`.
    Also documents that `next-row-id` accounting intentionally overcounts
    (matching Java's `ManifestListWriter.V4Writer`) and removes the old
    `Projection()` synthesis helpers replaced by the explicit
    `WithRowLineage()` scan option.
    ## Test plan
    
    - `TestCoWRewritePreservesRowID` : verifies surviving rows keep original
    `_row_id` after a CoW delete
    - `TestCoWRewriteRowIDNextRowIDAccounting` : validates next-row-id
    advances correctly
---
 metadata_columns.go               |  47 +++-
 table/arrow_utils.go              |   3 +-
 table/rewrite_data_files.go       |  67 ++++++
 table/row_lineage_rewrite_test.go | 294 ++++++++++++++++++++++++
 table/scanner.go                  | 187 +++++++--------
 table/scanner_internal_test.go    | 470 ++++++++++++++------------------------
 table/snapshot_producers.go       |   5 +
 table/table.go                    |  11 +
 table/transaction.go              | 295 ++++++++++++++++++++----
 table/write_records.go            |  61 ++++-
 10 files changed, 992 insertions(+), 448 deletions(-)

diff --git a/metadata_columns.go b/metadata_columns.go
index efe568f7..fa9e2db4 100644
--- a/metadata_columns.go
+++ b/metadata_columns.go
@@ -17,13 +17,16 @@
 
 package iceberg
 
-// Row lineage metadata column field IDs (v3+). Reserved IDs are 
Integer.MAX_VALUE - 107 and 108
-// per the Iceberg spec (Metadata Columns / Row Lineage).
+import "slices"
+
+// Row lineage metadata column field IDs (v3+). Reserved IDs are 
Integer.MAX_VALUE - 107
+// and Integer.MAX_VALUE - 108 per the Iceberg spec (Metadata Columns / Row 
Lineage).
 const (
        // RowIDFieldID is the field ID for _row_id (optional long). A unique 
long identifier for every row.
+       // Reserved as Integer.MAX_VALUE - 107.
        RowIDFieldID = 2147483540
        // LastUpdatedSequenceNumberFieldID is the field ID for 
_last_updated_sequence_number (optional long).
-       // The sequence number of the commit that last updated the row.
+       // The sequence number of the commit that last updated the row. 
Reserved as Integer.MAX_VALUE - 108.
        LastUpdatedSequenceNumberFieldID = 2147483539
 )
 
@@ -59,3 +62,41 @@ func LastUpdatedSequenceNumber() NestedField {
 func IsMetadataColumn(fieldID int) bool {
        return fieldID == RowIDFieldID || fieldID == 
LastUpdatedSequenceNumberFieldID
 }
+
+// SchemaWithRowLineage returns a new schema with the row-lineage metadata 
columns
+// (_row_id, _last_updated_sequence_number) appended to the given schema's 
fields.
+// Used when reading source files during a CoW rewrite or compaction so that 
row
+// identity and per-row update sequence are preserved in the output.
+//
+// Idempotent: if a row-lineage column is already present (by reserved field 
ID),
+// it is not appended again. The returned schema always allocates a fresh field
+// slice so it cannot alias the input schema's backing array.
+func SchemaWithRowLineage(s *Schema) *Schema {
+       if s == nil {
+               return nil
+       }
+       // Clone the field slice up front so we never share a backing array 
with the
+       // caller's schema — append-with-spare-capacity could otherwise mutate 
the
+       // source schema's fields when the caller next mutates either side.
+       fields := slices.Clone(s.Fields())
+
+       hasRowID := false
+       hasSeqNum := false
+       for _, f := range fields {
+               switch f.ID {
+               case RowIDFieldID:
+                       hasRowID = true
+               case LastUpdatedSequenceNumberFieldID:
+                       hasSeqNum = true
+               }
+       }
+
+       if !hasRowID {
+               fields = append(fields, RowID())
+       }
+       if !hasSeqNum {
+               fields = append(fields, LastUpdatedSequenceNumber())
+       }
+
+       return NewSchemaWithIdentifiers(s.ID, s.IdentifierFieldIDs, fields...)
+}
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 48c04c74..55507c92 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1516,6 +1516,7 @@ type recordWritingArgs struct {
        counter         iter.Seq[int]
        maxWriteWorkers int
        clustered       bool
+       factoryOpts     []writerFactoryOption
 }
 
 func recordsToDataFiles(ctx context.Context, rootLocation string, meta 
*MetadataBuilder, args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, 
error]) {
@@ -1554,7 +1555,7 @@ func recordsToDataFiles(ctx context.Context, rootLocation 
string, meta *Metadata
                }
        }
 
-       factory, err := newWriterFactory(rootLocation, args, meta, taskSchema, 
targetFileSize)
+       factory, err := newWriterFactory(rootLocation, args, meta, taskSchema, 
targetFileSize, args.factoryOpts...)
        if err != nil {
                panic(err)
        }
diff --git a/table/rewrite_data_files.go b/table/rewrite_data_files.go
index e2cee62f..d0d1b875 100644
--- a/table/rewrite_data_files.go
+++ b/table/rewrite_data_files.go
@@ -20,6 +20,7 @@ package table
 import (
        "context"
        "fmt"
+       "log/slog"
        "maps"
 
        "github.com/apache/iceberg-go"
@@ -289,6 +290,40 @@ func ExecuteCompactionGroup(ctx context.Context, tbl 
*Table, group CompactionTas
                scanOpts = append(scanOpts, 
WitMaxConcurrency(cfg.scanConcurrency))
        }
 
+       // Preserve row lineage only when every source file in the group carries
+       // it. A mixed group (some files with FirstRowID, some without — e.g.
+       // legacy files on a v3 table) would otherwise produce one output where
+       // post-lineage rows have explicit _row_id values and pre-lineage rows
+       // have nulls, which violates the per-file uniqueness/coverage
+       // invariant the v3 spec requires. Splitting mixed groups into separate
+       // outputs is a larger refactor and is left as a follow-up; for now we
+       // degrade gracefully (the rewrite still succeeds, but lineage is not
+       // preserved for the surviving rows).
+       preserveLineage := tbl.metadata.Version() >= 3 && 
allTasksHaveRowLineage(group.Tasks)
+       if preserveLineage {
+               scanOpts = append(scanOpts, WithRowLineage())
+       } else if tbl.metadata.Version() >= 3 {
+               // Mixed group on a v3 table — at least one source file lacks
+               // FirstRowID so we drop lineage on the surviving rows. This is 
the
+               // common case during a v1/v2→v3 migration; surface it so 
operators
+               // can detect silent lineage loss instead of having to diff
+               // metadata before/after.
+               var lineageFiles, legacyFiles int
+               for _, t := range group.Tasks {
+                       if t.FirstRowID != nil {
+                               lineageFiles++
+                       } else {
+                               legacyFiles++
+                       }
+               }
+               if lineageFiles > 0 {
+                       slog.Warn("compaction group has mixed row lineage; 
dropping _row_id on output",
+                               "partition_key", group.PartitionKey,
+                               "lineage_files", lineageFiles,
+                               "legacy_files", legacyFiles)
+               }
+       }
+
        arrowSchema, records, err := tbl.Scan(scanOpts...).ReadTasks(ctx, 
group.Tasks)
        if err != nil {
                return CompactionGroupResult{}, fmt.Errorf("read tasks for 
compaction group %q: %w", group.PartitionKey, err)
@@ -300,6 +335,20 @@ func ExecuteCompactionGroup(ctx context.Context, tbl 
*Table, group CompactionTas
        if cfg.targetFileSize > 0 {
                writeOpts = append(writeOpts, 
WithTargetFileSize(cfg.targetFileSize))
        }
+       if preserveLineage {
+               // Rebuild the arrow schema from the projected iceberg schema 
so the
+               // reserved row-lineage field IDs (_row_id, 
_last_updated_sequence_number)
+               // are attached as Arrow field metadata. ArrowSchemaToIceberg 
prefers
+               // embedded field IDs when present and otherwise falls back to 
the
+               // table's name mapping — which doesn't (and cannot) contain the
+               // reserved metadata column names, so the fallback path panics.
+               projectedSchema := iceberg.SchemaWithRowLineage(tbl.Schema())
+               arrowSchema, err = SchemaToArrowSchema(projectedSchema, nil, 
true, false)
+               if err != nil {
+                       return CompactionGroupResult{}, fmt.Errorf("build arrow 
schema for lineage write in group %q: %w", group.PartitionKey, err)
+               }
+               writeOpts = append(writeOpts, 
WithPreserveRowLineage(projectedSchema))
+       }
 
        var (
                newFiles   []iceberg.DataFile
@@ -328,6 +377,24 @@ func ExecuteCompactionGroup(ctx context.Context, tbl 
*Table, group CompactionTas
        }, nil
 }
 
+// allTasksHaveRowLineage returns true iff every task in the group has a
+// non-nil FirstRowID — i.e. every source file already carries v3 row lineage.
+// Used to gate the preservation path on compaction: mixed groups (some
+// lineage, some legacy) would otherwise produce per-file invariant
+// violations, so the gate is conservative.
+func allTasksHaveRowLineage(tasks []FileScanTask) bool {
+       if len(tasks) == 0 {
+               return false
+       }
+       for _, t := range tasks {
+               if t.FirstRowID == nil {
+                       return false
+               }
+       }
+
+       return true
+}
+
 // rewriteDataFilesPartial stages each group as its own rewrite
 // snapshot via [Transaction.ReplaceFiles] directly. Per-group staging
 // lets a mid-loop write failure leave already-staged groups on the
diff --git a/table/row_lineage_rewrite_test.go 
b/table/row_lineage_rewrite_test.go
new file mode 100644
index 00000000..9b1fc193
--- /dev/null
+++ b/table/row_lineage_rewrite_test.go
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "path/filepath"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func newV3RowLineageTestTable(t *testing.T) *table.Table {
+       t.Helper()
+
+       location := filepath.ToSlash(t.TempDir())
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, 
table.UnsortedSortOrder, location,
+               iceberg.Properties{table.PropertyFormatVersion: "3"})
+       require.NoError(t, err)
+
+       metaLoc := location + "/metadata/v1.metadata.json"
+       fsF := func(context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil }
+       cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF: 
fsF}
+
+       return table.New(table.Identifier{"db", "row_lineage_test"}, meta, 
metaLoc, fsF, cat)
+}
+
+// TestCoWRewritePreservesRowID verifies that a copy-on-write overwrite with a
+// row filter preserves the original _row_id and _last_updated_sequence_number
+// values in the rewritten file. Surviving rows must keep both values from the
+// pre-rewrite snapshot — the rewrite is "physically rewritten", not "logically
+// updated", per the v3 spec.
+func TestCoWRewritePreservesRowID(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+
+       tbl := newV3RowLineageTestTable(t)
+
+       // Append 3 rows: id=1,2,3
+       arrowSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       initialData, err := array.TableFromJSON(mem, arrowSchema, []string{
+               `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}, {"id": 3, 
"data": "c"}]`,
+       })
+       require.NoError(t, err)
+       defer initialData.Release()
+
+       tbl, err = tbl.Append(ctx, array.NewTableReader(initialData, -1), nil)
+       require.NoError(t, err)
+
+       // Verify the append created a valid v3 snapshot with row lineage.
+       snap := tbl.CurrentSnapshot()
+       require.NotNil(t, snap)
+       require.NotNil(t, snap.FirstRowID, "v3 snapshot must have first-row-id")
+       require.NotNil(t, snap.AddedRows, "v3 snapshot must have added-rows")
+       assert.Equal(t, int64(0), *snap.FirstRowID)
+       assert.Equal(t, int64(3), *snap.AddedRows)
+
+       // Capture the snapshot's sequence number so we can assert preservation
+       // after the rewrite. After the append, every row's effective
+       // _last_updated_sequence_number should be this value.
+       createSeq := snap.SequenceNumber
+
+       // Scan with row lineage to see synthesized _row_id values before the 
rewrite.
+       lineageScan := tbl.Scan(table.WithRowLineage())
+       schema, itr, err := lineageScan.ToArrowRecords(ctx)
+       require.NoError(t, err)
+
+       rowIDIdx := -1
+       for i, f := range schema.Fields() {
+               if f.Name == iceberg.RowIDColumnName {
+                       rowIDIdx = i
+
+                       break
+               }
+       }
+       require.GreaterOrEqual(t, rowIDIdx, 0, "_row_id should be in scan 
projection")
+
+       var originalRowIDs []int64
+       for rec, err := range itr {
+               require.NoError(t, err)
+               col := rec.Column(rowIDIdx).(*array.Int64)
+               for i := 0; i < col.Len(); i++ {
+                       originalRowIDs = append(originalRowIDs, col.Value(i))
+               }
+               rec.Release()
+       }
+       require.Equal(t, []int64{0, 1, 2}, originalRowIDs, "initial _row_id 
should be 0,1,2")
+
+       // CoW overwrite: delete the row where id=2, preserving id=1 and id=3.
+       filter := iceberg.EqualTo(iceberg.Reference("id"), int64(2))
+       tbl, err = tbl.Delete(ctx, filter, nil)
+       require.NoError(t, err)
+
+       snap = tbl.CurrentSnapshot()
+       require.NotNil(t, snap)
+       require.Greater(t, snap.SequenceNumber, createSeq,
+               "sanity: rewrite snapshot must have a higher sequence number 
than the create snapshot")
+
+       // Scan the result with row lineage. The surviving rows should preserve 
their
+       // original _row_id values: 0 and 2. Their 
_last_updated_sequence_number must
+       // also still report the create snapshot's seq, NOT the rewrite 
snapshot's
+       // seq — the rewrite is physical only, not a logical update.
+       lineageScan = tbl.Scan(table.WithRowLineage())
+       _, itr, err = lineageScan.ToArrowRecords(ctx)
+       require.NoError(t, err)
+
+       var afterRowIDs []int64
+       var afterIDs []int64
+       var afterSeq []int64
+       for rec, err := range itr {
+               require.NoError(t, err)
+               idIdx := rec.Schema().FieldIndices("id")
+               require.NotEmpty(t, idIdx)
+               rowIDIndices := 
rec.Schema().FieldIndices(iceberg.RowIDColumnName)
+               require.NotEmpty(t, rowIDIndices)
+               seqIndices := 
rec.Schema().FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+               require.NotEmpty(t, seqIndices, "_last_updated_sequence_number 
must be in projection")
+
+               idCol := rec.Column(idIdx[0]).(*array.Int64)
+               rowIDCol := rec.Column(rowIDIndices[0]).(*array.Int64)
+               seqCol := rec.Column(seqIndices[0]).(*array.Int64)
+               for i := 0; i < int(rec.NumRows()); i++ {
+                       afterIDs = append(afterIDs, idCol.Value(i))
+                       afterRowIDs = append(afterRowIDs, rowIDCol.Value(i))
+                       require.False(t, seqCol.IsNull(i),
+                               "row %d must have a non-null 
_last_updated_sequence_number after CoW rewrite", i)
+                       afterSeq = append(afterSeq, seqCol.Value(i))
+               }
+               rec.Release()
+       }
+
+       assert.Equal(t, []int64{1, 3}, afterIDs, "remaining rows should be 
id=1,3")
+       assert.Equal(t, []int64{0, 2}, afterRowIDs,
+               "_row_id must be preserved through CoW rewrite: row with id=1 
keeps _row_id=0, row with id=3 keeps _row_id=2")
+       assert.Equal(t, []int64{createSeq, createSeq}, afterSeq,
+               "_last_updated_sequence_number must report the original 
creation snapshot's sequence number, not the rewrite's")
+}
+
+// TestCoWRewriteRowIDNextRowIDAccounting verifies that row-id accounting 
remains
+// correct after a CoW rewrite. The overcounting (where next-row-id advances by
+// the full manifest row count including preserved survivors) is intentional 
and
+// matches Java's ManifestListWriter.V3Writer behavior.
+func TestCoWRewriteRowIDNextRowIDAccounting(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+
+       tbl := newV3RowLineageTestTable(t)
+
+       arrowSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       data, err := array.TableFromJSON(mem, arrowSchema, []string{
+               `[{"id": 10, "data": "x"}, {"id": 20, "data": "y"}, {"id": 30, 
"data": "z"}]`,
+       })
+       require.NoError(t, err)
+       defer data.Release()
+
+       tbl, err = tbl.Append(ctx, array.NewTableReader(data, -1), nil)
+       require.NoError(t, err)
+
+       // next-row-id should be 3 after appending 3 rows.
+       assert.Equal(t, int64(3), tbl.Metadata().NextRowID())
+
+       // Delete one row via CoW.
+       filter := iceberg.EqualTo(iceberg.Reference("id"), int64(20))
+       tbl, err = tbl.Delete(ctx, filter, nil)
+       require.NoError(t, err)
+
+       // next-row-id advances by the new manifest's added-rows count (2 here),
+       // even though the surviving rows preserve their old IDs. Going from 3
+       // (after the initial append) to 5 (= prior NextRowID + manifest's added
+       // rows). This "wastes" ID space but doesn't violate uniqueness — actual
+       // row IDs come from the explicit Parquet column, not the global 
counter.
+       // Mirrors Java's ManifestListWriter.V3Writer.prepare() in
+       // table/snapshot_producers.go.
+       assert.Equal(t, int64(5), tbl.Metadata().NextRowID(),
+               "next-row-id should advance from 3 by the rewrite manifest's 2 
added rows")
+}
+
+// TestExecuteCompactionGroupPreservesRowID verifies that
+// ExecuteCompactionGroup preserves _row_id values through compaction on a v3
+// table. After compaction every row should retain its original _row_id even
+// though the underlying file paths have changed.
+func TestExecuteCompactionGroupPreservesRowID(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+
+       tbl := newV3RowLineageTestTable(t)
+
+       arrowSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       // Two appends produce two data files in two snapshots; row IDs should
+       // span both files (0..1 in the first, 2..3 in the second).
+       for i, payload := range []string{
+               `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}]`,
+               `[{"id": 3, "data": "c"}, {"id": 4, "data": "d"}]`,
+       } {
+               data, err := array.TableFromJSON(mem, arrowSchema, 
[]string{payload})
+               require.NoError(t, err, "append %d", i)
+               t.Cleanup(data.Release)
+
+               tbl, err = tbl.Append(ctx, array.NewTableReader(data, -1), nil)
+               require.NoError(t, err, "append %d", i)
+       }
+
+       tasks, err := tbl.Scan().PlanFiles(ctx)
+       require.NoError(t, err)
+       require.Len(t, tasks, 2, "two source files for compaction")
+
+       scanTasks := make([]table.FileScanTask, len(tasks))
+       var totalSize int64
+       for i, st := range tasks {
+               scanTasks[i] = st
+               totalSize += st.File.FileSizeBytes()
+       }
+       group := table.CompactionTaskGroup{
+               PartitionKey:   "single",
+               Tasks:          scanTasks,
+               TotalSizeBytes: totalSize,
+       }
+
+       gr, err := table.ExecuteCompactionGroup(ctx, tbl, group)
+       require.NoError(t, err)
+       require.Equal(t, 2, len(gr.OldDataFiles), "both source files should be 
replaced")
+       require.GreaterOrEqual(t, len(gr.NewDataFiles), 1, "compaction should 
produce at least one output file")
+
+       tx := tbl.NewTransaction()
+       rewrite := tx.NewRewrite(nil)
+       rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes)
+       require.NoError(t, rewrite.Commit(ctx))
+       tbl, err = tx.Commit(ctx)
+       require.NoError(t, err)
+
+       // Read back with row lineage projected; every row should retain its
+       // pre-compaction _row_id (0,1,2,3).
+       _, itr, err := tbl.Scan(table.WithRowLineage()).ToArrowRecords(ctx)
+       require.NoError(t, err)
+
+       got := map[int64]int64{}
+       for rec, err := range itr {
+               require.NoError(t, err)
+               idIdx := rec.Schema().FieldIndices("id")
+               require.NotEmpty(t, idIdx)
+               rowIDIdx := rec.Schema().FieldIndices(iceberg.RowIDColumnName)
+               require.NotEmpty(t, rowIDIdx, "_row_id must be projected")
+
+               idCol := rec.Column(idIdx[0]).(*array.Int64)
+               rowIDCol := rec.Column(rowIDIdx[0]).(*array.Int64)
+               for i := 0; i < int(rec.NumRows()); i++ {
+                       got[idCol.Value(i)] = rowIDCol.Value(i)
+               }
+               rec.Release()
+       }
+
+       assert.Equal(t,
+               map[int64]int64{1: 0, 2: 1, 3: 2, 4: 3},
+               got,
+               "compaction must preserve every row's _row_id")
+}
diff --git a/table/scanner.go b/table/scanner.go
index 3e9c6e0e..8e94088f 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -194,6 +194,8 @@ type Scan struct {
        options        iceberg.Properties
        limit          int64
 
+       includeRowLineage bool
+
        partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
        concurrency      int
 }
@@ -243,7 +245,6 @@ func (scan *Scan) Snapshot() *Snapshot {
 func (scan *Scan) Projection() (*iceberg.Schema, error) {
        curSchema := scan.metadata.CurrentSchema()
        curVersion := scan.metadata.Version()
-       caseSensitive := scan.caseSensitive
        if scan.snapshotID != nil {
                snap := scan.metadata.SnapshotByID(*scan.snapshotID)
                if snap == nil {
@@ -261,26 +262,100 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
                }
        }
 
-       if slices.Contains(scan.selectedFields, "*") {
-               return curSchema, nil
+       if scan.includeRowLineage && curVersion < minFormatVersionRowLineage {
+               return nil, fmt.Errorf("%w: row lineage requires format version 
%d, table is v%d",
+                       ErrInvalidOperation, minFormatVersionRowLineage, 
curVersion)
        }
 
-       selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields, 
caseSensitive)
-       schemaMeta := metaFieldsFromSchema(curSchema)
-       synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
-       if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+       var schema *iceberg.Schema
+       if slices.Contains(scan.selectedFields, "*") {
+               schema = curSchema
+       } else {
+               // Intercept row-lineage metadata column names (_row_id,
+               // _last_updated_sequence_number) before calling Select: they 
are
+               // reserved and never appear in the user schema's fields, so
+               // Select would fail with "could not find column" on v3 tables
+               // where they are otherwise legal to project. The scanner reads
+               // them from file metadata (or synthesizes them) at scan time;
+               // here we just need to ensure they survive into the projection.
+               userFields, lineageFields := 
splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive)
+               if len(lineageFields) > 0 && curVersion < 
minFormatVersionRowLineage {
+                       // Reject explicitly so the contract lives in the code 
rather
+                       // than emerging from Select's "could not find column" 
path —
+                       // a future v2 schema field literally named _row_id 
should not
+                       // silently succeed here.
+                       return nil, fmt.Errorf("%w: row lineage column %q 
requires format version %d, table is v%d",
+                               ErrInvalidOperation, lineageFields[0].Name, 
minFormatVersionRowLineage, curVersion)
+               }
 
-               // synthesis path
-               removedMetaSlice, missingMetaFields := 
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
-               sch, err := curSchema.Select(scan.caseSensitive, 
removedMetaSlice...)
+               var err error
+               schema, err = curSchema.Select(scan.caseSensitive, 
userFields...)
                if err != nil {
                        return nil, err
                }
+               // Skip the per-name append when scan.includeRowLineage is set: 
the
+               // SchemaWithRowLineage call below adds both lineage columns
+               // unconditionally, and appendMissingLineageFields would just be
+               // redundant work whose result is overwritten.
+               if len(lineageFields) > 0 && !scan.includeRowLineage {
+                       schema = appendMissingLineageFields(schema, 
lineageFields)
+               }
+       }
+
+       if scan.includeRowLineage {
+               schema = iceberg.SchemaWithRowLineage(schema)
+       }
+
+       return schema, nil
+}
+
+// splitLineageMetadataFields partitions selectedFields into user fields and
+// row-lineage metadata fields (_row_id, _last_updated_sequence_number). The
+// returned lineage slice contains the canonical NestedField for each
+// metadata column name found, in the order encountered.
+func splitLineageMetadataFields(selectedFields []string, caseSensitive bool) 
(userFields []string, lineageFields []iceberg.NestedField) {
+       matches := func(field, target string) bool {
+               if caseSensitive {
+                       return field == target
+               }
+
+               return strings.EqualFold(field, target)
+       }
+
+       userFields = make([]string, 0, len(selectedFields))
+       for _, field := range selectedFields {
+               switch {
+               case matches(field, iceberg.RowIDColumnName):
+                       lineageFields = append(lineageFields, iceberg.RowID())
+               case matches(field, 
iceberg.LastUpdatedSequenceNumberColumnName):
+                       lineageFields = append(lineageFields, 
iceberg.LastUpdatedSequenceNumber())
+               default:
+                       userFields = append(userFields, field)
+               }
+       }
+
+       return userFields, lineageFields
+}
 
-               return iceberg.NewSchemaWithIdentifiers(sch.ID, 
sch.IdentifierFieldIDs, append(sch.Fields(), missingMetaFields...)...), nil
+// appendMissingLineageFields returns a new schema with each lineage field
+// appended only if no field with that ID is already present. Idempotent so
+// callers can pass schemas that already declare the reserved fields.
+func appendMissingLineageFields(s *iceberg.Schema, lineageFields 
[]iceberg.NestedField) *iceberg.Schema {
+       existing := make(map[int]struct{}, len(s.Fields()))
+       for _, f := range s.Fields() {
+               existing[f.ID] = struct{}{}
        }
 
-       return curSchema.Select(scan.caseSensitive, scan.selectedFields...)
+       fields := slices.Clone(s.Fields())
+       for _, f := range lineageFields {
+               if _, ok := existing[f.ID]; ok {
+                       continue
+               }
+               fields = append(fields, f)
+               existing[f.ID] = struct{}{}
+       }
+
+       return iceberg.NewSchemaWithIdentifiers(s.ID, s.IdentifierFieldIDs, 
fields...)
 }
 
 func (scan *Scan) buildPartitionProjection(specID int) 
(iceberg.BooleanExpression, error) {
@@ -621,10 +696,15 @@ func (scan *Scan) PlanFiles(ctx context.Context) 
([]FileScanTask, error) {
                        Length:              e.DataFile().FileSizeBytes(),
                }
                // Row lineage constants: readers use these to synthesize 
_row_id and
-               // _last_updated_sequence_number when requested.
+               // _last_updated_sequence_number when requested. Per spec the
+               // synthesized _last_updated_sequence_number is the manifest 
entry's
+               // data sequence number (field id 3), not file sequence number
+               // (field id 4); back-dated EXISTING entries can have the two
+               // diverge and Java/iceberg-rust use the data sequence number.
                task.FirstRowID = e.DataFile().FirstRowID()
-               if fseq := e.FileSequenceNum(); fseq != nil {
-                       task.DataSequenceNumber = fseq
+               if seq := e.SequenceNum(); seq >= 0 {
+                       s := seq
+                       task.DataSequenceNumber = &s
                }
                results = append(results, task)
        }
@@ -721,80 +801,3 @@ func (scan *Scan) ToArrowTable(ctx context.Context) 
(arrow.Table, error) {
 
        return array.NewTableFromRecords(schema, records), nil
 }
-
-// Removes metaFields from selectedField if it exists. Returns a []string 
representing the filtered selectedFields
-// and an iceberg.NestedField[] representing the removed metadata. Note that 
metaFields is passed in
-// after being validated from metaFieldsFromSelectedFields.
-func removeMetadataFromSelectedFields(selectedFields []string, metaFields 
[]string) ([]string, []iceberg.NestedField) {
-       filteredFields := []string{}
-       meta := []iceberg.NestedField{}
-
-       for _, field := range selectedFields {
-               if slices.Contains(metaFields, strings.ToLower(field)) {
-
-                       switch strings.ToLower(field) {
-                       case iceberg.LastUpdatedSequenceNumberColumnName:
-                               meta = append(meta, 
iceberg.LastUpdatedSequenceNumber())
-                       case iceberg.RowIDColumnName:
-                               meta = append(meta, iceberg.RowID())
-                       }
-
-                       continue
-               }
-
-               filteredFields = append(filteredFields, field)
-       }
-
-       return filteredFields, meta
-}
-
-func metaFieldsFromSelectedFields(selectedFields []string, caseSensitive bool) 
[]string {
-       meta := []string{}
-       if !caseSensitive {
-               for _, field := range selectedFields {
-                       if strings.EqualFold(field, iceberg.RowIDColumnName) || 
strings.EqualFold(field, iceberg.LastUpdatedSequenceNumberColumnName) {
-                               meta = append(meta, strings.ToLower(field))
-                       }
-               }
-
-               return meta
-       }
-
-       for _, field := range selectedFields {
-               if field == iceberg.RowIDColumnName || field == 
iceberg.LastUpdatedSequenceNumberColumnName {
-                       meta = append(meta, strings.ToLower(field))
-               }
-       }
-
-       return meta
-}
-
-// Takes in a *iceberg.Schema and returns a []string representing the row 
lineage metadata present
-// in the schema.
-func metaFieldsFromSchema(sch *iceberg.Schema) []string {
-       meta := []string{}
-       _, hasRowIDMeta := sch.FindFieldByName(iceberg.RowIDColumnName)
-       _, hasSeqMeta := 
sch.FindFieldByName(iceberg.LastUpdatedSequenceNumberColumnName)
-
-       if hasRowIDMeta {
-               meta = append(meta, iceberg.RowIDColumnName)
-       }
-       if hasSeqMeta {
-               meta = append(meta, iceberg.LastUpdatedSequenceNumberColumnName)
-       }
-
-       return meta
-}
-
-// Any metadata which is in selectedFieldsMeta and not in schemaMeta is a 
synthesis meta
-func synthesizeMeta(selectedFieldsMeta []string, schemaMeta []string) []string 
{
-       synthesis := []string{}
-
-       for _, f := range selectedFieldsMeta {
-               if !slices.Contains(schemaMeta, f) {
-                       synthesis = append(synthesis, f)
-               }
-       }
-
-       return synthesis
-}
diff --git a/table/scanner_internal_test.go b/table/scanner_internal_test.go
index 789d9906..077f315b 100644
--- a/table/scanner_internal_test.go
+++ b/table/scanner_internal_test.go
@@ -19,8 +19,6 @@ package table
 
 import (
        "runtime"
-       "slices"
-       "strconv"
        "sync"
        "sync/atomic"
        "testing"
@@ -238,98 +236,146 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t 
*testing.T) {
        assert.ErrorContains(t, err, "id 999")
 }
 
-// TestProjectionV3PreLineageFile verifies that Projection() succeeds and 
returns
-// _row_id and _last_updated_sequence_number as nullable (all-null-capable) 
fields when
-// the table is v3 with next-row-id set but the data file predates row lineage 
(those
-// columns are absent from the schema).
-func TestProjectionV3PreLineageFile(t *testing.T) {
-       schema := iceberg.NewSchema(
-               1,
-               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
-               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
-       )
+// TestSynthesizeRowLineageColumns verifies that _row_id and 
_last_updated_sequence_number
+// are filled from task constants when those columns are present and null.
+func TestSynthesizeRowLineageColumns(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       ctx := compute.WithAllocator(t.Context(), mem)
+       defer mem.AssertSize(t, 0)
+       firstRowID := int64(1000)
+       dataSeqNum := int64(5)
+       task := FileScanTask{FirstRowID: &firstRowID, DataSequenceNumber: 
&dataSeqNum}
+       rowOffset := int64(0)
 
-       metadata, err := NewMetadata(
-               schema,
-               iceberg.UnpartitionedSpec,
-               UnsortedSortOrder,
-               "s3://test-bucket/test_table",
-               iceberg.Properties{"format-version": "3"},
+       // Build a batch with a data column plus _row_id and 
_last_updated_sequence_number (all nulls).
+       schema := arrow.NewSchema(
+               []arrow.Field{
+                       {Name: "x", Type: arrow.PrimitiveTypes.Int64, Nullable: 
true},
+                       {Name: iceberg.RowIDColumnName, Type: 
arrow.PrimitiveTypes.Int64, Nullable: true},
+                       {Name: iceberg.LastUpdatedSequenceNumberColumnName, 
Type: arrow.PrimitiveTypes.Int64, Nullable: true},
+               },
+               nil,
        )
+       const nrows = 3
+       xBldr := array.NewInt64Builder(mem)
+       defer xBldr.Release()
+       xBldr.AppendValues([]int64{1, 2, 3}, nil)
+       rowIDBldr := array.NewInt64Builder(mem)
+       defer rowIDBldr.Release()
+       rowIDBldr.AppendNulls(nrows)
+       seqBldr := array.NewInt64Builder(mem)
+       defer seqBldr.Release()
+       seqBldr.AppendNulls(nrows)
+
+       xArr := xBldr.NewArray()
+       rowIDArr := rowIDBldr.NewArray()
+       seqArr := seqBldr.NewArray()
+       batch := array.NewRecordBatch(schema, []arrow.Array{xArr, rowIDArr, 
seqArr}, nrows)
+       xArr.Release()
+       rowIDArr.Release()
+       seqArr.Release()
+       defer batch.Release()
+
+       out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch)
        require.NoError(t, err)
-       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+       defer out.Release()
 
-       // Request the two user columns plus both row-lineage metadata columns.
-       // These metadata columns do NOT exist in the physical schema of a 
pre-lineage file.
-       scan := &Scan{
-               metadata:       metadata,
-               selectedFields: []string{"id", "payload", 
iceberg.RowIDColumnName, iceberg.LastUpdatedSequenceNumberColumnName},
-               caseSensitive:  true,
+       // _row_id should be 1000, 1001, 1002
+       rowIDCol := out.Column(1).(*array.Int64)
+       require.Equal(t, nrows, rowIDCol.Len())
+       for i := 0; i < nrows; i++ {
+               assert.False(t, rowIDCol.IsNull(i), "row %d", i)
+               assert.EqualValues(t, 1000+int64(i), rowIDCol.Value(i), "row 
%d", i)
        }
+       // _last_updated_sequence_number should be 5 for all
+       seqCol := out.Column(2).(*array.Int64)
+       for i := 0; i < nrows; i++ {
+               assert.False(t, seqCol.IsNull(i), "row %d", i)
+               assert.EqualValues(t, 5, seqCol.Value(i), "row %d", i)
+       }
+       assert.EqualValues(t, 3, rowOffset)
+}
 
-       proj, err := scan.Projection()
-       require.NoError(t, err, "Projection must not error for pre-lineage 
metadata columns")
-       require.NotNil(t, proj)
-
-       fields := proj.Fields()
-       require.Len(t, fields, 4, "projected schema must contain all four 
requested fields")
+// TestSynthesizeRowLineageColumnsPreservesExplicit covers the spec's null-
+// coalescing rule: if a row already has an explicit (non-null) value in the
+// source file, that value MUST be preserved; only null entries inherit the
+// file-level constants. This is the case that arises when rewriting a file
+// that already carries explicit lineage from a prior rewrite.
+func TestSynthesizeRowLineageColumnsPreservesExplicit(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       ctx := compute.WithAllocator(t.Context(), mem)
+       defer mem.AssertSize(t, 0)
 
-       fieldByName := make(map[string]iceberg.NestedField, len(fields))
-       for _, f := range fields {
-               fieldByName[f.Name] = f
-       }
+       firstRowID := int64(1000)
+       dataSeqNum := int64(5)
+       task := FileScanTask{FirstRowID: &firstRowID, DataSequenceNumber: 
&dataSeqNum}
+       rowOffset := int64(0)
 
-       // Regular columns must survive unchanged.
-       idField, ok := fieldByName["id"]
-       require.True(t, ok, "id must be in projection")
-       assert.Equal(t, 1, idField.ID)
+       schema := arrow.NewSchema(
+               []arrow.Field{
+                       {Name: "x", Type: arrow.PrimitiveTypes.Int64, Nullable: 
true},
+                       {Name: iceberg.RowIDColumnName, Type: 
arrow.PrimitiveTypes.Int64, Nullable: true},
+                       {Name: iceberg.LastUpdatedSequenceNumberColumnName, 
Type: arrow.PrimitiveTypes.Int64, Nullable: true},
+               },
+               nil,
+       )
+       const nrows = 3
 
-       payloadField, ok := fieldByName["payload"]
-       require.True(t, ok, "payload must be in projection")
-       assert.Equal(t, 2, payloadField.ID)
+       xBldr := array.NewInt64Builder(mem)
+       defer xBldr.Release()
+       xBldr.AppendValues([]int64{1, 2, 3}, nil)
 
-       // Row lineage columns must be present as optional (nullable) fields — 
the scanner
-       // will return all-nulls for any data file that was written before row 
lineage existed.
-       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
-       require.True(t, ok, "_row_id must be in projection")
-       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
-       assert.False(t, rowIDField.Required, "_row_id must be optional 
(nullable) for pre-lineage files")
+       // Mixed _row_id: explicit 42, null, explicit 99. Non-null values must
+       // survive untouched. Null gets firstRowID + row position = 1000 + 1.
+       rowIDBldr := array.NewInt64Builder(mem)
+       defer rowIDBldr.Release()
+       rowIDBldr.AppendValues([]int64{42, 0, 99}, []bool{true, false, true})
 
-       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
-       require.True(t, ok, "_last_updated_sequence_number must be in 
projection")
-       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID, 
"_last_updated_sequence_number field ID")
-       assert.False(t, seqField.Required, "_last_updated_sequence_number must 
be optional (nullable) for pre-lineage files")
-}
+       // Mixed _last_updated_sequence_number: explicit 7, null, explicit 9.
+       // Null gets task.DataSequenceNumber = 5; others survive.
+       seqBldr := array.NewInt64Builder(mem)
+       defer seqBldr.Release()
+       seqBldr.AppendValues([]int64{7, 0, 9}, []bool{true, false, true})
 
-func TestProjectionV3PreLineageFileCaseSensitive(t *testing.T) {
-       schema := iceberg.NewSchema(
-               1,
-               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
-               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
-       )
+       xArr := xBldr.NewArray()
+       rowIDArr := rowIDBldr.NewArray()
+       seqArr := seqBldr.NewArray()
+       batch := array.NewRecordBatch(schema, []arrow.Array{xArr, rowIDArr, 
seqArr}, nrows)
+       xArr.Release()
+       rowIDArr.Release()
+       seqArr.Release()
+       defer batch.Release()
 
-       metadata, err := NewMetadata(
-               schema,
-               iceberg.UnpartitionedSpec,
-               UnsortedSortOrder,
-               "s3://test-bucket/test_table",
-               iceberg.Properties{"format-version": "3"},
-       )
+       out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch)
        require.NoError(t, err)
-       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+       defer out.Release()
 
-       scan := &Scan{
-               metadata:       metadata,
-               selectedFields: []string{"id", "payload", "_Row_Id"},
-               caseSensitive:  true,
-       }
+       rowIDCol := out.Column(1).(*array.Int64)
+       require.Equal(t, nrows, rowIDCol.Len())
+       assert.False(t, rowIDCol.IsNull(0))
+       assert.EqualValues(t, 42, rowIDCol.Value(0), "explicit _row_id must 
survive")
+       assert.False(t, rowIDCol.IsNull(1))
+       assert.EqualValues(t, 1001, rowIDCol.Value(1), "null _row_id must 
inherit firstRowID + position")
+       assert.False(t, rowIDCol.IsNull(2))
+       assert.EqualValues(t, 99, rowIDCol.Value(2), "explicit _row_id must 
survive even with a different value")
 
-       _, err = scan.Projection()
-       require.Error(t, err)
-       require.ErrorContains(t, err, "could not find column _Row_Id")
+       seqCol := out.Column(2).(*array.Int64)
+       require.Equal(t, nrows, seqCol.Len())
+       assert.False(t, seqCol.IsNull(0))
+       assert.EqualValues(t, 7, seqCol.Value(0), "explicit seq must survive")
+       assert.False(t, seqCol.IsNull(1))
+       assert.EqualValues(t, 5, seqCol.Value(1), "null seq must inherit 
task.DataSequenceNumber")
+       assert.False(t, seqCol.IsNull(2))
+       assert.EqualValues(t, 9, seqCol.Value(2), "explicit seq must survive 
even with a different value")
+
+       assert.EqualValues(t, 3, rowOffset)
 }
 
-func TestProjectionV3PreLineageFileCaseInsensitive(t *testing.T) {
+// TestProjectionV3SelectRowLineageColumns verifies that explicitly selecting
+// _row_id (and _last_updated_sequence_number) on a v3 table yields a 
projection
+// containing those metadata columns, even though they are not declared in the
+// user schema. This is the legacy "Select(_row_id)" escape hatch.
+func TestProjectionV3SelectRowLineageColumns(t *testing.T) {
        schema := iceberg.NewSchema(
                1,
                iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
@@ -348,44 +394,37 @@ func TestProjectionV3PreLineageFileCaseInsensitive(t 
*testing.T) {
 
        scan := &Scan{
                metadata:       metadata,
-               selectedFields: []string{"id", "payload", "_Row_Id", 
"_Last_Updated_SEQUENCE_number"},
-               caseSensitive:  false,
+               selectedFields: []string{"id", "payload", 
iceberg.RowIDColumnName, iceberg.LastUpdatedSequenceNumberColumnName},
+               caseSensitive:  true,
        }
 
        proj, err := scan.Projection()
-       require.NoError(t, err)
+       require.NoError(t, err, "Projection must accept lineage column names on 
v3")
        require.NotNil(t, proj)
 
        fields := proj.Fields()
-       require.Len(t, fields, 4, "projected schema must contain all four 
requested fields")
+       require.Len(t, fields, 4, "projected schema must contain id, payload, 
_row_id, _last_updated_sequence_number")
 
        fieldByName := make(map[string]iceberg.NestedField, len(fields))
        for _, f := range fields {
                fieldByName[f.Name] = f
        }
 
-       idField, ok := fieldByName["id"]
-       require.True(t, ok, "id must be in projection")
-       assert.Equal(t, 1, idField.ID)
-
-       payloadField, ok := fieldByName["payload"]
-       require.True(t, ok, "payload must be in projection")
-       assert.Equal(t, 2, payloadField.ID)
-
        rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
        require.True(t, ok, "_row_id must be in projection")
-       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
-       assert.False(t, rowIDField.Required, "_row_id must be optional 
(nullable) for pre-lineage files")
+       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID)
+       assert.False(t, rowIDField.Required, "_row_id must be optional")
 
        seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
        require.True(t, ok, "_last_updated_sequence_number must be in 
projection")
-       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID, 
"_last_updated_sequence_number field ID")
-       assert.False(t, seqField.Required, "_last_updated_sequence_number must 
be optional (nullable) for pre-lineage files")
+       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID)
+       assert.False(t, seqField.Required, "_last_updated_sequence_number must 
be optional")
 }
 
-// TestProjectionV2RowLineage asserts that requesting row-lineage metadata 
columns on a v1 or v2
-// table does not use the v3-only synthesis path
-func TestProjectionV2RowLineage(t *testing.T) {
+// TestProjectionRowLineageRejectedOnV1V2 asserts that requesting a row-lineage
+// metadata column via Select on a v1 or v2 table is an error (those format
+// versions do not support row lineage).
+func TestProjectionRowLineageRejectedOnV1V2(t *testing.T) {
        schema := iceberg.NewSchema(
                1,
                iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
@@ -394,10 +433,10 @@ func TestProjectionV2RowLineage(t *testing.T) {
 
        for _, tc := range []struct {
                name string
-               ver  int
+               ver  string
        }{
-               {name: "v1", ver: 1},
-               {name: "v2", ver: 2},
+               {name: "v1", ver: "1"},
+               {name: "v2", ver: "2"},
        } {
                t.Run(tc.name, func(t *testing.T) {
                        metadata, err := NewMetadata(
@@ -405,10 +444,9 @@ func TestProjectionV2RowLineage(t *testing.T) {
                                iceberg.UnpartitionedSpec,
                                UnsortedSortOrder,
                                "s3://test-bucket/test_table",
-                               iceberg.Properties{PropertyFormatVersion: 
strconv.Itoa(tc.ver)},
+                               iceberg.Properties{PropertyFormatVersion: 
tc.ver},
                        )
                        require.NoError(t, err)
-                       assert.Equal(t, tc.ver, metadata.Version(), "sanity: 
metadata format version")
 
                        scan := &Scan{
                                metadata:       metadata,
@@ -417,21 +455,20 @@ func TestProjectionV2RowLineage(t *testing.T) {
                        }
 
                        _, err = scan.Projection()
-                       require.Error(t, err)
-                       assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+                       require.Error(t, err, "Projection must reject lineage 
columns on pre-v3 tables")
+                       assert.ErrorIs(t, err, ErrInvalidOperation)
                        assert.ErrorContains(t, err, iceberg.RowIDColumnName)
+                       assert.ErrorContains(t, err, "format version")
                })
        }
 }
 
-// TestProjectionV3SchemaWithRowIDOnly covers a v3 table whose schema
-// already declares _row_id (reserved field id) but does not declare 
_last_updated_sequence_number.
-func TestProjectionV3SchemaWithRowIDOnly(t *testing.T) {
+// TestProjectionWithRowLineageRequiresV3 asserts that the WithRowLineage scan
+// option errors out on Projection() when the table format version is below 3.
+func TestProjectionWithRowLineageRequiresV3(t *testing.T) {
        schema := iceberg.NewSchema(
                1,
                iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
-               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
-               iceberg.RowID(),
        )
 
        metadata, err := NewMetadata(
@@ -439,67 +476,31 @@ func TestProjectionV3SchemaWithRowIDOnly(t *testing.T) {
                iceberg.UnpartitionedSpec,
                UnsortedSortOrder,
                "s3://test-bucket/test_table",
-               iceberg.Properties{"format-version": "3"},
+               iceberg.Properties{PropertyFormatVersion: "2"},
        )
        require.NoError(t, err)
-       assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
 
        scan := &Scan{
-               metadata: metadata,
-               selectedFields: []string{
-                       "id", "payload",
-                       iceberg.RowIDColumnName,
-                       iceberg.LastUpdatedSequenceNumberColumnName,
-               },
-               caseSensitive: true,
+               metadata:          metadata,
+               selectedFields:    []string{"*"},
+               caseSensitive:     true,
+               includeRowLineage: true,
        }
 
-       var proj *iceberg.Schema
-       require.NotPanics(t, func() {
-               var perr error
-               proj, perr = scan.Projection()
-               require.NoError(t, perr)
-       })
-       require.NotNil(t, proj)
-
-       fields := proj.Fields()
-       require.Len(t, fields, 4, "projection must include id, payload, 
_row_id, _last_updated_sequence_number")
-
-       fieldByName := make(map[string]iceberg.NestedField, len(fields))
-       idsSeen := make(map[int]string, len(fields))
-       for _, f := range fields {
-               if prev, dup := idsSeen[f.ID]; dup {
-                       t.Fatalf("duplicate field id %d: %q and %q", f.ID, 
prev, f.Name)
-               }
-               idsSeen[f.ID] = f.Name
-               fieldByName[f.Name] = f
-       }
-
-       idField, ok := fieldByName["id"]
-       require.True(t, ok)
-       assert.Equal(t, 1, idField.ID)
-
-       payloadField, ok := fieldByName["payload"]
-       require.True(t, ok)
-       assert.Equal(t, 2, payloadField.ID)
-
-       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
-       require.True(t, ok)
-       assert.NotEqual(t, iceberg.RowIDFieldID, rowIDField.ID) // NewMetadata 
reorders schema field numbers
-       assert.False(t, rowIDField.Required)
-
-       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
-       require.True(t, ok)
-       assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID)
-       assert.False(t, seqField.Required)
+       _, err = scan.Projection()
+       require.Error(t, err, "WithRowLineage on a v2 table must be an explicit 
error")
+       assert.ErrorContains(t, err, "row lineage")
 }
 
-func TestProjectionV3SchemaWithLastUpdatedSequenceNumberOnly(t *testing.T) {
+// TestProjectionV3SchemaAlreadyHasRowID covers the case where the user schema
+// already declares _row_id (a reserved field id, but legal in v3). The
+// projection helper must be idempotent and not panic on the duplicate ID.
+func TestProjectionV3SchemaAlreadyHasRowID(t *testing.T) {
        schema := iceberg.NewSchema(
                1,
                iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
                iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
-               iceberg.LastUpdatedSequenceNumber(),
+               iceberg.RowID(),
        )
 
        metadata, err := NewMetadata(
@@ -513,154 +514,23 @@ func 
TestProjectionV3SchemaWithLastUpdatedSequenceNumberOnly(t *testing.T) {
        assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
 
        scan := &Scan{
-               metadata: metadata,
-               selectedFields: []string{
-                       "id", "payload",
-                       iceberg.RowIDColumnName,
-                       iceberg.LastUpdatedSequenceNumberColumnName,
-               },
-               caseSensitive: true,
+               metadata:          metadata,
+               selectedFields:    []string{"*"},
+               caseSensitive:     true,
+               includeRowLineage: true,
        }
 
-       var proj *iceberg.Schema
        require.NotPanics(t, func() {
-               var perr error
-               proj, perr = scan.Projection()
+               proj, perr := scan.Projection()
                require.NoError(t, perr)
-       })
-       require.NotNil(t, proj)
+               require.NotNil(t, proj)
 
-       fields := proj.Fields()
-       require.Len(t, fields, 4, "projection must include id, payload, 
_row_id, _last_updated_sequence_number")
-
-       fieldByName := make(map[string]iceberg.NestedField, len(fields))
-       idsSeen := make(map[int]string, len(fields))
-       for _, f := range fields {
-               if prev, dup := idsSeen[f.ID]; dup {
-                       t.Fatalf("duplicate field id %d: %q and %q", f.ID, 
prev, f.Name)
+               seen := make(map[int]string, len(proj.Fields()))
+               for _, f := range proj.Fields() {
+                       if prev, dup := seen[f.ID]; dup {
+                               t.Fatalf("duplicate field id %d: %q and %q", 
f.ID, prev, f.Name)
+                       }
+                       seen[f.ID] = f.Name
                }
-               idsSeen[f.ID] = f.Name
-               fieldByName[f.Name] = f
-       }
-
-       idField, ok := fieldByName["id"]
-       require.True(t, ok)
-       assert.Equal(t, 1, idField.ID)
-
-       payloadField, ok := fieldByName["payload"]
-       require.True(t, ok)
-       assert.Equal(t, 2, payloadField.ID)
-
-       rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
-       require.True(t, ok)
-       assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID)
-       assert.False(t, rowIDField.Required)
-
-       seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
-       require.True(t, ok)
-       assert.NotEqual(t, iceberg.LastUpdatedSequenceNumberFieldID, 
seqField.ID) // NewMetadata reorders schema field numbers
-       assert.False(t, seqField.Required)
-}
-
-// TestSynthesizeRowLineageColumns verifies that _row_id and 
_last_updated_sequence_number
-// are filled from task constants when those columns are present and null.
-func TestSynthesizeRowLineageColumns(t *testing.T) {
-       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
-       ctx := compute.WithAllocator(t.Context(), mem)
-       defer mem.AssertSize(t, 0)
-       firstRowID := int64(1000)
-       dataSeqNum := int64(5)
-       task := FileScanTask{FirstRowID: &firstRowID, DataSequenceNumber: 
&dataSeqNum}
-       rowOffset := int64(0)
-
-       // Build a batch with a data column plus _row_id and 
_last_updated_sequence_number (all nulls).
-       schema := arrow.NewSchema(
-               []arrow.Field{
-                       {Name: "x", Type: arrow.PrimitiveTypes.Int64, Nullable: 
true},
-                       {Name: iceberg.RowIDColumnName, Type: 
arrow.PrimitiveTypes.Int64, Nullable: true},
-                       {Name: iceberg.LastUpdatedSequenceNumberColumnName, 
Type: arrow.PrimitiveTypes.Int64, Nullable: true},
-               },
-               nil,
-       )
-       const nrows = 3
-       xBldr := array.NewInt64Builder(mem)
-       defer xBldr.Release()
-       xBldr.AppendValues([]int64{1, 2, 3}, nil)
-       rowIDBldr := array.NewInt64Builder(mem)
-       defer rowIDBldr.Release()
-       rowIDBldr.AppendNulls(nrows)
-       seqBldr := array.NewInt64Builder(mem)
-       defer seqBldr.Release()
-       seqBldr.AppendNulls(nrows)
-
-       xArr := xBldr.NewArray()
-       rowIDArr := rowIDBldr.NewArray()
-       seqArr := seqBldr.NewArray()
-       batch := array.NewRecordBatch(schema, []arrow.Array{xArr, rowIDArr, 
seqArr}, nrows)
-       xArr.Release()
-       rowIDArr.Release()
-       seqArr.Release()
-       defer batch.Release()
-
-       out, err := synthesizeRowLineageColumns(ctx, &rowOffset, task, batch)
-       require.NoError(t, err)
-       defer out.Release()
-
-       // _row_id should be 1000, 1001, 1002
-       rowIDCol := out.Column(1).(*array.Int64)
-       require.Equal(t, nrows, rowIDCol.Len())
-       for i := 0; i < nrows; i++ {
-               assert.False(t, rowIDCol.IsNull(i), "row %d", i)
-               assert.EqualValues(t, 1000+int64(i), rowIDCol.Value(i), "row 
%d", i)
-       }
-       // _last_updated_sequence_number should be 5 for all
-       seqCol := out.Column(2).(*array.Int64)
-       for i := 0; i < nrows; i++ {
-               assert.False(t, seqCol.IsNull(i), "row %d", i)
-               assert.EqualValues(t, 5, seqCol.Value(i), "row %d", i)
-       }
-       assert.EqualValues(t, 3, rowOffset)
-}
-
-func TestRemoveMetadataFromSelectedFields(t *testing.T) {
-       selectedFields := []string{
-               "id",
-               "payload",
-       }
-
-       metaFields := []string{
-               "_row_id",
-       }
-
-       sf, mf := removeMetadataFromSelectedFields(selectedFields, metaFields)
-
-       assert.Equal(t, 2, len(sf))
-       assert.Equal(t, 0, len(mf))
-
-       assert.True(t, slices.Contains(sf, "id"))
-       assert.True(t, slices.Contains(sf, "payload"))
-}
-
-func TestRemoveMetadataFromSelectedFieldsCasing(t *testing.T) {
-       selectedFields := []string{
-               "id",
-               "payload",
-               "_ROW_Id",
-               "lastupdatedsequence_number",
-       }
-
-       metaFields := []string{
-               "_row_id",
-               "_last_updated_sequence_number",
-       }
-
-       sf, mf := removeMetadataFromSelectedFields(selectedFields, metaFields)
-
-       assert.Equal(t, 3, len(sf))
-       assert.Equal(t, 1, len(mf))
-
-       assert.True(t, slices.Contains(sf, "id"))
-       assert.True(t, slices.Contains(sf, "payload"))
-       assert.True(t, slices.Contains(sf, "lastupdatedsequence_number"))
-       assert.True(t, slices.Contains(mf, iceberg.RowID()))
+       })
 }
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index 4e98a822..05430710 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -917,6 +917,11 @@ func (sp *snapshotProducer) commit(ctx context.Context) (_ 
[]Update, _ []Require
                        return nil, nil, err
                }
                if writer.NextRowID() != nil {
+                       // addedRows counts ALL rows in new manifests (existing 
+ added), even
+                       // for rewrites where survivors preserve old _row_id 
values. This
+                       // "wastes" ID space but doesn't violate uniqueness: 
actual row IDs come
+                       // from the explicit Parquet column, not the global 
counter. Java's
+                       // ManifestListWriter.V3Writer uses the same accounting.
                        addedRows = *writer.NextRowID() - firstRowID
                }
        } else {
diff --git a/table/table.go b/table/table.go
index 3dd78057..a9a725af 100644
--- a/table/table.go
+++ b/table/table.go
@@ -816,6 +816,17 @@ func WithOptions(opts iceberg.Properties) ScanOption {
        }
 }
 
+// WithRowLineage projects the row-lineage metadata columns (_row_id and
+// _last_updated_sequence_number) so that row identity and per-row update
+// sequence are preserved through rewrites and compactions. Requires a v3
+// table — calling Scan.Projection on a v1/v2 table after applying this
+// option returns an error.
+func WithRowLineage() ScanOption {
+       return func(scan *Scan) {
+               scan.includeRowLineage = true
+       }
+}
+
 func (t Table) Scan(opts ...ScanOption) *Scan {
        s := &Scan{
                metadata:       t.metadata,
diff --git a/table/transaction.go b/table/transaction.go
index 72d887dd..526bed6c 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -523,7 +523,15 @@ func validateDataFilePartitionData(df iceberg.DataFile, 
spec *iceberg.PartitionS
 
 // validateDataFilesToAdd performs metadata-only validation for caller-provided
 // DataFiles and returns a set of paths that passed validation.
-func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile, 
operation string) (map[string]struct{}, error) {
+//
+// requireFirstRowID gates the v3 first_row_id check: callers handing in
+// externally-written parquet files (e.g. [Transaction.AddDataFiles]) must set
+// first_row_id explicitly because the library cannot fabricate row IDs for
+// files it did not write. Library-internal rewrite paths (compaction via
+// [withRewriteSemantics]) pass false because the manifest-list writer assigns
+// first_row_id via inheritance at write time and the output files carry
+// explicit _row_id columns that win on read regardless.
+func (t *Transaction) validateDataFilesToAdd(dataFiles []iceberg.DataFile, 
operation string, requireFirstRowID bool) (map[string]struct{}, error) {
        currentSpec, err := t.meta.CurrentSpec()
        if err != nil {
                return nil, fmt.Errorf("could not get current partition spec: 
%w", err)
@@ -569,7 +577,7 @@ func (t *Transaction) validateDataFilesToAdd(dataFiles 
[]iceberg.DataFile, opera
                        return nil, fmt.Errorf("data file %s has invalid 
partition data for %s: %w", path, operation, err)
                }
 
-               if t.meta.formatVersion >= 3 && df.FirstRowID() == nil {
+               if requireFirstRowID && t.meta.formatVersion >= 3 && 
df.FirstRowID() == nil {
                        return nil, fmt.Errorf(
                                "data file %s is missing first_row_id which is 
required for v3 tables for %s: use DataFileBuilder.FirstRowID() to set it 
explicitly",
                                path, operation)
@@ -668,7 +676,7 @@ func (t *Transaction) AddDataFiles(ctx context.Context, 
dataFiles []iceberg.Data
                o(&cfg)
        }
 
-       setToAdd, err := t.validateDataFilesToAdd(dataFiles, "AddDataFiles")
+       setToAdd, err := t.validateDataFilesToAdd(dataFiles, "AddDataFiles", 
true)
        if err != nil {
                return err
        }
@@ -752,7 +760,7 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx 
context.Context, filesTo
                o(&cfg)
        }
 
-       setToAdd, err := t.validateDataFilesToAdd(filesToAdd, 
"ReplaceDataFilesWithDataFiles")
+       setToAdd, err := t.validateDataFilesToAdd(filesToAdd, 
"ReplaceDataFilesWithDataFiles", !cfg.rewriteSemantics)
        if err != nil {
                return err
        }
@@ -852,7 +860,7 @@ func (t *Transaction) ReplaceFiles(ctx context.Context, 
dataFilesToDelete, dataF
                o(&cfg)
        }
 
-       setToAdd, err := t.validateDataFilesToAdd(dataFilesToAdd, 
"ReplaceFiles")
+       setToAdd, err := t.validateDataFilesToAdd(dataFilesToAdd, 
"ReplaceFiles", !cfg.rewriteSemantics)
        if err != nil {
                return err
        }
@@ -1215,7 +1223,7 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx 
context.Context, operation
        commitUUID := uuid.New()
        updater := t.updateSnapshot(fs, snapshotProps, 
operation).mergeOverwrite(&commitUUID, filter)
 
-       filesToDelete, filesToRewrite, err := t.classifyFilesForDeletions(ctx, 
fs, filter, caseSensitive, concurrency)
+       filesToDelete, filesToRewrite, fileSeqByPath, err := 
t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency)
        if err != nil {
                return nil, err
        }
@@ -1225,7 +1233,7 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx 
context.Context, operation
        }
 
        if len(filesToRewrite) > 0 {
-               if err := t.rewriteFilesWithFilter(ctx, fs, updater, 
filesToRewrite, filter, caseSensitive, concurrency); err != nil {
+               if err := t.rewriteFilesWithFilter(ctx, fs, updater, 
filesToRewrite, fileSeqByPath, filter, caseSensitive, concurrency); err != nil {
                        return nil, err
                }
        }
@@ -1254,7 +1262,7 @@ func (t *Transaction) performMergeOnReadDeletion(ctx 
context.Context, snapshotPr
        commitUUID := uuid.New()
        updater := t.updateSnapshot(fs, snapshotProps, 
OpDelete).mergeOverwrite(&commitUUID, filter)
 
-       filesToDelete, withPartialDeletions, err := 
t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency)
+       filesToDelete, withPartialDeletions, _, err := 
t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency)
        if err != nil {
                return nil, err
        }
@@ -1355,24 +1363,26 @@ func (t *Transaction) Delete(ctx context.Context, 
filter iceberg.BooleanExpressi
 }
 
 // classifyFilesForDeletions classifies existing data files based on the 
provided filter.
-// Returns files to delete completely, files to rewrite partially, and any 
error.
-func (t *Transaction) classifyFilesForDeletions(ctx context.Context, fs io.IO, 
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) 
(filesToDelete, filesWithPartialDeletions []iceberg.DataFile, err error) {
+// Returns files to delete completely, files to rewrite partially, the
+// per-file file_sequence_number map for rewrite candidates (used to
+// synthesize row-lineage columns when reading), and any error.
+func (t *Transaction) classifyFilesForDeletions(ctx context.Context, fs io.IO, 
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int) 
(filesToDelete, filesWithPartialDeletions []iceberg.DataFile, fileSeqByPath 
map[string]*int64, err error) {
        s := t.meta.currentSnapshot()
        if s == nil {
-               return nil, nil, nil
+               return nil, nil, nil, nil
        }
 
        if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
                for df, err := range s.dataFiles(fs, nil) {
                        if err != nil {
-                               return nil, nil, err
+                               return nil, nil, nil, err
                        }
                        if df.ContentType() == iceberg.EntryContentData {
                                filesToDelete = append(filesToDelete, df)
                        }
                }
 
-               return filesToDelete, filesWithPartialDeletions, nil
+               return filesToDelete, filesWithPartialDeletions, nil, nil
        }
 
        return t.classifyFilesForFilteredDeletions(ctx, fs, filter, 
caseSensitive, concurrency)
@@ -1405,22 +1415,23 @@ func (t *fileClassificationTask) 
buildPartitionProjection(specID int) (iceberg.B
 }
 
 // classifyFilesForFilteredDeletions classifies files for filtered overwrite 
operations.
-// Returns files to delete completely, files to rewrite partially, and any 
error.
-func (t *Transaction) classifyFilesForFilteredDeletions(ctx context.Context, 
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency 
int) (filesToDelete, filesWithPartialDeletes []iceberg.DataFile, err error) {
+// Returns files to delete completely, files to rewrite partially, the
+// per-file file_sequence_number map for rewrite candidates, and any error.
+func (t *Transaction) classifyFilesForFilteredDeletions(ctx context.Context, 
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency 
int) (filesToDelete, filesWithPartialDeletes []iceberg.DataFile, fileSeqByPath 
map[string]*int64, err error) {
        schema := t.meta.CurrentSchema()
        meta, err := t.meta.Build()
        if err != nil {
-               return nil, nil, err
+               return nil, nil, nil, err
        }
 
        inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter, 
caseSensitive, false)
        if err != nil {
-               return nil, nil, fmt.Errorf("failed to create inclusive metrics 
evaluator: %w", err)
+               return nil, nil, nil, fmt.Errorf("failed to create inclusive 
metrics evaluator: %w", err)
        }
 
        strictEvaluator, err := newStrictMetricsEvaluator(schema, filter, 
caseSensitive, false)
        if err != nil {
-               return nil, nil, fmt.Errorf("failed to create strict metrics 
evaluator: %w", err)
+               return nil, nil, nil, fmt.Errorf("failed to create strict 
metrics evaluator: %w", err)
        }
 
        classificationTask := newFileClassificationTask(meta, filter, 
caseSensitive)
@@ -1431,11 +1442,12 @@ func (t *Transaction) 
classifyFilesForFilteredDeletions(ctx context.Context, fs
        if s != nil {
                manifests, err = s.Manifests(fs)
                if err != nil {
-                       return nil, nil, fmt.Errorf("failed to get manifests: 
%w", err)
+                       return nil, nil, nil, fmt.Errorf("failed to get 
manifests: %w", err)
                }
        }
 
        var mu sync.Mutex
+       fileSeqByPath = make(map[string]*int64)
 
        g, _ := errgroup.WithContext(ctx)
        g.SetLimit(min(concurrency, len(manifests)))
@@ -1456,6 +1468,7 @@ func (t *Transaction) 
classifyFilesForFilteredDeletions(ctx context.Context, fs
 
                        localDelete := make([]iceberg.DataFile, 0)
                        localRewrite := make([]iceberg.DataFile, 0)
+                       localSeqByPath := make(map[string]*int64)
 
                        for entry, err := range manifest.Entries(fs, false) {
                                if err != nil {
@@ -1488,6 +1501,19 @@ func (t *Transaction) 
classifyFilesForFilteredDeletions(ctx context.Context, fs
                                        localDelete = append(localDelete, df)
                                } else {
                                        localRewrite = append(localRewrite, df)
+                                       // Capture the entry's data sequence 
number so the
+                                       // rewrite path can synthesize
+                                       // _last_updated_sequence_number for 
source rows that
+                                       // have a null value (or no column) in 
the file. Per
+                                       // spec the synthesized value is the 
manifest entry's
+                                       // sequence_number (field id 3, the 
data sequence
+                                       // number) — not file_sequence_number 
(field id 4) —
+                                       // so back-dated EXISTING entries 
assign the correct
+                                       // value to surviving rows.
+                                       if seq := entry.SequenceNum(); seq >= 0 
{
+                                               s := seq
+                                               localSeqByPath[df.FilePath()] = 
&s
+                                       }
                                }
                        }
 
@@ -1495,6 +1521,9 @@ func (t *Transaction) 
classifyFilesForFilteredDeletions(ctx context.Context, fs
                                mu.Lock()
                                filesToDelete = append(filesToDelete, 
localDelete...)
                                filesWithPartialDeletes = 
append(filesWithPartialDeletes, localRewrite...)
+                               for k, v := range localSeqByPath {
+                                       fileSeqByPath[k] = v
+                               }
                                mu.Unlock()
                        }
 
@@ -1503,20 +1532,42 @@ func (t *Transaction) 
classifyFilesForFilteredDeletions(ctx context.Context, fs
        }
 
        if err := g.Wait(); err != nil {
-               return nil, nil, err
+               return nil, nil, nil, err
        }
 
-       return filesToDelete, filesWithPartialDeletes, nil
+       return filesToDelete, filesWithPartialDeletes, fileSeqByPath, nil
 }
 
-// rewriteFilesWithFilter rewrites data files by preserving only rows that do 
NOT match the filter
-func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO, 
updater *snapshotProducer, files []iceberg.DataFile, filter 
iceberg.BooleanExpression, caseSensitive bool, concurrency int) error {
+// rewriteFilesWithFilter rewrites data files by preserving only rows that do 
NOT match the filter.
+// fileSeqByPath maps each file's path to its file_sequence_number from the 
source manifest entry,
+// used to synthesize _last_updated_sequence_number when the source row's 
value is null. The
+// filter binding and substrait conversion are computed once here and reused 
across every file.
+func (t *Transaction) rewriteFilesWithFilter(ctx context.Context, fs io.IO, 
updater *snapshotProducer, files []iceberg.DataFile, fileSeqByPath 
map[string]*int64, filter iceberg.BooleanExpression, caseSensitive bool, 
concurrency int) error {
        complementFilter := iceberg.NewNot(filter)
 
+       // Bind + convert the complement filter once for the whole rewrite. The
+       // per-batch filter function is reused across every record batch from
+       // every file in this rewrite — substrait conversion in particular is
+       // non-trivial so doing it inside the iterator loop wastes work.
+       postFilter, err := prepareBatchFilter(complementFilter, 
t.meta.CurrentSchema(), caseSensitive)
+       if err != nil {
+               return err
+       }
+
        for _, originalFile := range files {
                // Use a separate UUID for rewrite operations to avoid filename 
collisions with new data files
                rewriteUUID := uuid.New()
-               rewrittenFiles, err := t.rewriteSingleFile(ctx, fs, 
originalFile, complementFilter, caseSensitive, rewriteUUID, concurrency)
+               args := rewriteSingleFileArgs{
+                       fs:            fs,
+                       originalFile:  originalFile,
+                       fileSeqNum:    fileSeqByPath[originalFile.FilePath()],
+                       filter:        complementFilter,
+                       postFilter:    postFilter,
+                       caseSensitive: caseSensitive,
+                       commitUUID:    rewriteUUID,
+                       concurrency:   concurrency,
+               }
+               rewrittenFiles, err := t.rewriteSingleFile(ctx, args)
                if err != nil {
                        return fmt.Errorf("failed to rewrite file %s: %w", 
originalFile.FilePath(), err)
                }
@@ -1530,17 +1581,82 @@ func (t *Transaction) rewriteFilesWithFilter(ctx 
context.Context, fs io.IO, upda
        return nil
 }
 
-// rewriteSingleFile reads a single data file, applies the filter, and writes 
new files with filtered data
-func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive 
bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+// rewriteSingleFileArgs bundles the parameters for 
[Transaction.rewriteSingleFile]
+// — there are several same-typed fields (two filter values, plus a UUID and an
+// int) so positional ordering offers no compile-time order protection.
+type rewriteSingleFileArgs struct {
+       fs           io.IO
+       originalFile iceberg.DataFile
+       // fileSeqNum is the source file's data sequence number from its
+       // manifest entry; required to synthesize _last_updated_sequence_number
+       // for rows whose value is null in the source file.
+       fileSeqNum *int64
+       // filter is the per-row complement (rows that survive the rewrite).
+       filter iceberg.BooleanExpression
+       // postFilter is the pre-bound, pre-substrait-converted version of
+       // filter. When preserveRowLineage applies it is the only place the
+       // filter is evaluated; the scanner's row filter is bypassed so
+       // _row_id positions stay correct.
+       postFilter    func(context.Context, arrow.RecordBatch) 
(arrow.RecordBatch, error)
+       caseSensitive bool
+       commitUUID    uuid.UUID
+       concurrency   int
+}
+
+// rewriteSingleFile reads a single data file, applies the filter, and writes
+// new files with filtered data. See [rewriteSingleFileArgs] for parameter
+// semantics.
+func (t *Transaction) rewriteSingleFile(ctx context.Context, args 
rewriteSingleFileArgs) ([]iceberg.DataFile, error) {
        scanTask := &FileScanTask{
-               File:   originalFile,
+               File:   args.originalFile,
                Start:  0,
-               Length: originalFile.FileSizeBytes(),
-       }
-
-       boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter, 
caseSensitive)
-       if err != nil {
-               return nil, fmt.Errorf("failed to bind filter: %w", err)
+               Length: args.originalFile.FileSizeBytes(),
+       }
+
+       // Preserve row lineage for v3 tables: include _row_id and
+       // _last_updated_sequence_number in the scan projection so they are read
+       // from the source file (or synthesized from file metadata when null) 
and
+       // written explicitly into the output Parquet. The Iceberg v3 spec says
+       // _last_updated_sequence_number on a data file row is the sequence
+       // number of the snapshot that last *logically* updated the row — for a
+       // pure CoW rewrite the surviving rows are unchanged, so they must keep
+       // their original value rather than inheriting the new commit's seq.
+       //
+       // When preserving lineage, the row filter must NOT be applied inside 
the
+       // scanner because _row_id synthesis depends on original file position. 
The
+       // filter is applied post-synthesis in the record iterator instead.
+       //
+       // TODO(perf): disabling the scan-time row filter forfeits both 
manifest-
+       // and file-level pushdown for the rewrite read. For selective deletes 
on
+       // wide partitions this means re-reading every survivor candidate 
end-to-
+       // end. File-level skipping (against the bound filter's residual on file
+       // stats) could be re-enabled here without breaking _row_id synthesis,
+       // since synthesis depends only on within-file row positions.
+       preserveRowLineage := t.meta.formatVersion >= 3 && 
args.originalFile.FirstRowID() != nil
+       projectedSchema := t.meta.CurrentSchema()
+       var factoryOpts []writerFactoryOption
+       if preserveRowLineage {
+               projectedSchema = iceberg.SchemaWithRowLineage(projectedSchema)
+               factoryOpts = append(factoryOpts, 
withFactoryFileSchema(projectedSchema))
+               scanTask.FirstRowID = args.originalFile.FirstRowID()
+               // fileSeqNum drives _last_updated_sequence_number synthesis for
+               // source rows whose value is null in the file (or for source
+               // files written before row lineage existed). When the column
+               // is non-null the existing per-row value wins, preserving the
+               // original update sequence across the rewrite.
+               scanTask.DataSequenceNumber = args.fileSeqNum
+       }
+
+       // When preserving row lineage, the bound scan-time filter is replaced
+       // with AlwaysTrue and the rebound version inside prepareBatchFilter is
+       // the one actually applied. Skip the wasted BindExpr in that case.
+       scanFilter := iceberg.BooleanExpression(iceberg.AlwaysTrue{})
+       if !preserveRowLineage {
+               boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), 
args.filter, args.caseSensitive)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to bind filter: %w", err)
+               }
+               scanFilter = boundFilter
        }
 
        meta, err := t.meta.Build()
@@ -1550,12 +1666,12 @@ func (t *Transaction) rewriteSingleFile(ctx 
context.Context, fs io.IO, originalF
 
        scanner := &arrowScan{
                metadata:        meta,
-               fs:              fs,
-               projectedSchema: t.meta.CurrentSchema(),
-               boundRowFilter:  boundFilter,
-               caseSensitive:   caseSensitive,
+               fs:              args.fs,
+               projectedSchema: projectedSchema,
+               boundRowFilter:  scanFilter,
+               caseSensitive:   args.caseSensitive,
                rowLimit:        -1, // No limit
-               concurrency:     concurrency,
+               concurrency:     args.concurrency,
        }
 
        arrowSchema, recordIter, err := scanner.GetRecords(ctx, 
[]FileScanTask{*scanTask})
@@ -1563,7 +1679,19 @@ func (t *Transaction) rewriteSingleFile(ctx 
context.Context, fs io.IO, originalF
                return nil, fmt.Errorf("failed to get records from original 
file: %w", err)
        }
 
-       // Wrap the iterator to release records after consumption
+       // When preserving row lineage, use an Arrow schema with field IDs so 
that
+       // recordsToDataFiles can resolve _row_id via field ID rather than the 
name
+       // mapping (which doesn't include metadata columns).
+       if preserveRowLineage {
+               arrowSchema, err = SchemaToArrowSchema(projectedSchema, nil, 
true, false)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to build arrow schema 
with field IDs: %w", err)
+               }
+       }
+
+       // Wrap the iterator to release records after consumption.
+       // When preserving row lineage, also apply the (pre-bound) filter here
+       // post-synthesis so _row_id positions stay correct.
        releaseIter := func(yield func(arrow.RecordBatch, error) bool) {
                for rec, err := range recordIter {
                        if err != nil {
@@ -1571,6 +1699,21 @@ func (t *Transaction) rewriteSingleFile(ctx 
context.Context, fs io.IO, originalF
 
                                return
                        }
+
+                       if preserveRowLineage {
+                               rec, err = args.postFilter(ctx, rec)
+                               if err != nil {
+                                       yield(nil, err)
+
+                                       return
+                               }
+                               if rec.NumRows() == 0 {
+                                       rec.Release()
+
+                                       continue
+                               }
+                       }
+
                        if !yield(rec, nil) {
                                rec.Release()
 
@@ -1582,10 +1725,11 @@ func (t *Transaction) rewriteSingleFile(ctx 
context.Context, fs io.IO, originalF
 
        var result []iceberg.DataFile
        itr := recordsToDataFiles(ctx, t.tbl.Location(), t.meta, 
recordWritingArgs{
-               sc:        arrowSchema,
-               itr:       releaseIter,
-               fs:        fs.(io.WriteFileIO),
-               writeUUID: &commitUUID,
+               sc:          arrowSchema,
+               itr:         releaseIter,
+               fs:          args.fs.(io.WriteFileIO),
+               writeUUID:   &args.commitUUID,
+               factoryOpts: factoryOpts,
        })
 
        for df, err := range itr {
@@ -1798,3 +1942,68 @@ func (s *StagedTable) Refresh(ctx context.Context) 
(*Table, error) {
 func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
        panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
 }
+
+// prepareBatchFilter binds the given Iceberg filter against schema and 
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes a per-call ctx so allocator/deadline values from
+// the call-site context propagate into compute (the extension-ID set is
+// attached internally on each call rather than baked into a captured ctx).
+//
+// Ownership: the returned function takes ownership of the input batch — the
+// AlwaysFalse fast-path and the bound-filter path both Release rec internally.
+// Callers must not Release the input batch separately. The function returns a
+// possibly-new batch with one reference owned by the caller; the caller is
+// responsible for releasing it.
+//
+// The bound substrait filter is evaluated against the input record's column
+// indices, so callers must pass the bound filter's schema verbatim — extra
+// trailing columns (e.g. row-lineage metadata) are tolerated only because
+// they live at indices past the filter's referenced fields.
+func prepareBatchFilter(filter iceberg.BooleanExpression, schema 
*iceberg.Schema, caseSensitive bool) (func(context.Context, arrow.RecordBatch) 
(arrow.RecordBatch, error), error) {
+       if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+               return func(_ context.Context, rec arrow.RecordBatch) 
(arrow.RecordBatch, error) {
+                       return rec, nil
+               }, nil
+       }
+
+       bound, err := iceberg.BindExpr(schema, filter, caseSensitive)
+       if err != nil {
+               return nil, fmt.Errorf("prepareBatchFilter: bind expression: 
%w", err)
+       }
+
+       if bound == nil {
+               return func(_ context.Context, rec arrow.RecordBatch) 
(arrow.RecordBatch, error) {
+                       return rec, nil
+               }, nil
+       }
+
+       if bound.Equals(iceberg.AlwaysFalse{}) {
+               // Return a record with the same schema and zero rows. 
NewSlice(0, 0)
+               // preserves the per-field arrays so downstream code that calls
+               // rec.Column(i) does not panic on the empty result.
+               return func(_ context.Context, rec arrow.RecordBatch) 
(arrow.RecordBatch, error) {
+                       defer rec.Release()
+
+                       return rec.NewSlice(0, 0), nil
+               }, nil
+       }
+
+       extSet, substraitFilter, err := substrait.ConvertExpr(schema, bound, 
caseSensitive)
+       if err != nil {
+               return nil, fmt.Errorf("prepareBatchFilter: convert expression: 
%w", err)
+       }
+
+       return func(ctx context.Context, rec arrow.RecordBatch) 
(arrow.RecordBatch, error) {
+               // Attach a fresh extension-ID set to the call-site ctx so 
allocator
+               // and deadline values flow through to compute, while the 
(immutable)
+               // extSet computed during prepare is reused across every batch.
+               ctx = exprs.WithExtensionIDSet(ctx, 
exprs.NewExtensionSetDefault(*extSet))
+
+               return filterRecords(ctx, substraitFilter)(rec)
+       }, nil
+}
diff --git a/table/write_records.go b/table/write_records.go
index 3db89ab5..9c215fbf 100644
--- a/table/write_records.go
+++ b/table/write_records.go
@@ -35,10 +35,12 @@ import (
 type WriteRecordOption func(*writeRecordConfig)
 
 type writeRecordConfig struct {
-       targetFileSize  int64
-       writeUUID       *uuid.UUID
-       maxWriteWorkers int
-       clustered       bool
+       targetFileSize     int64
+       writeUUID          *uuid.UUID
+       maxWriteWorkers    int
+       clustered          bool
+       fileSchema         *iceberg.Schema
+       preserveRowLineage bool
 }
 
 // WithTargetFileSize overrides the table's default target file size.
@@ -94,6 +96,24 @@ func WithClusteredWrite() WriteRecordOption {
        }
 }
 
+// WithPreserveRowLineage sets the output file schema to include the v3 row-
+// lineage metadata columns (_row_id, _last_updated_sequence_number) so that
+// row identity is preserved through rewrites and compactions. The input
+// records must already carry _row_id (e.g. from a scan that projected the
+// lineage columns).
+//
+// [WriteRecords] validates this option against the table state and the input
+// Arrow schema: it errors when applied to a v1/v2 table or when the input
+// records don't include the _row_id column. The schema parameter is the
+// projected Iceberg schema (typically [iceberg.SchemaWithRowLineage]) and is
+// used to write the output Parquet's field IDs.
+func WithPreserveRowLineage(schema *iceberg.Schema) WriteRecordOption {
+       return func(c *writeRecordConfig) {
+               c.fileSchema = schema
+               c.preserveRowLineage = true
+       }
+}
+
 // WriteRecords writes Arrow record batches to Parquet data files for the given
 // table, returning an iterator of the resulting DataFile objects.
 //
@@ -112,11 +132,6 @@ func WriteRecords(ctx context.Context, tbl *Table,
        records iter.Seq2[arrow.RecordBatch, error],
        opts ...WriteRecordOption,
 ) iter.Seq2[iceberg.DataFile, error] {
-       if err := checkArrowSchemaCompat(tbl.Schema(), schema, false); err != 
nil {
-               return internal.SingleErrorIter[iceberg.DataFile](
-                       fmt.Errorf("arrow schema is not compatible with the 
table schema: %w", err))
-       }
-
        cfg := writeRecordConfig{}
        for _, opt := range opts {
                opt(&cfg)
@@ -127,6 +142,30 @@ func WriteRecords(ctx context.Context, tbl *Table,
                        errors.New("WithClusteredWrite and WithMaxWriteWorkers 
are incompatible: the clustered write path is single-threaded"))
        }
 
+       if cfg.preserveRowLineage {
+               if v := tbl.metadata.Version(); v < 3 {
+                       return internal.SingleErrorIter[iceberg.DataFile](
+                               fmt.Errorf("WithPreserveRowLineage requires a 
v3+ table, got v%d", v))
+               }
+               if len(schema.FieldIndices(iceberg.RowIDColumnName)) == 0 {
+                       return internal.SingleErrorIter[iceberg.DataFile](
+                               fmt.Errorf("WithPreserveRowLineage requires 
input records to include the %q column", iceberg.RowIDColumnName))
+               }
+       }
+
+       // Validate the input arrow schema against the projected schema. When
+       // row lineage is being preserved, the projected schema includes the
+       // reserved metadata columns; using tbl.Schema() instead would reject
+       // the lineage columns since they're not declared in the user schema.
+       checkSchema := tbl.Schema()
+       if cfg.fileSchema != nil {
+               checkSchema = cfg.fileSchema
+       }
+       if err := checkArrowSchemaCompat(checkSchema, schema, false); err != 
nil {
+               return internal.SingleErrorIter[iceberg.DataFile](
+                       fmt.Errorf("arrow schema is not compatible with the 
table schema: %w", err))
+       }
+
        fs, err := tbl.fsF(ctx)
        if err != nil {
                return internal.SingleErrorIter[iceberg.DataFile](err)
@@ -174,5 +213,9 @@ func WriteRecords(ctx context.Context, tbl *Table,
                clustered:       cfg.clustered,
        }
 
+       if cfg.fileSchema != nil {
+               args.factoryOpts = append(args.factoryOpts, 
withFactoryFileSchema(cfg.fileSchema))
+       }
+
        return recordsToDataFiles(ctx, tbl.Location(), meta, args)
 }

Reply via email to