This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new bf990e88 feat(table): MoR equality-rewrite - insert/reinsert writer
API (#1120)
bf990e88 is described below
commit bf990e88b23bb76669e06c813ac47e11d401bf2c
Author: Tanmay Rauth <[email protected]>
AuthorDate: Wed May 27 16:14:56 2026 +0530
feat(table): MoR equality-rewrite - insert/reinsert writer API (#1120)
Adds `table.PositionDeltaWriter` for the MoR insert/reinsert split on v3
tables. Mirrors the data-file half of Java's `SparkPositionDeltaWrite`:
- `Reinsert(batch)` — survivor rows from a position-delta rewrite, with
explicit non-null `_row_id` values to preserve identity.
- `Insert(batch)` — fresh rows; writer appends a null `_row_id` column
so the reader synthesizes a new ID at scan time.
Output schema is `SchemaWithRowID` (a new helper): `_row_id` written
explicitly, `_last_updated_sequence_number` left absent so it inherits
from the manifest entry's `data_sequence_number` — exactly the value the
spec requires for rewritten rows.
Scope: data files only. The position-delete entries that pair with
reinserts (turning an UPDATE into delete-old + reinsert) are the engine
driver's responsibility — the writer doc spells this out so a reader
doesn't expect Java's full `SparkPositionDeltaWrite` semantics from the
name alone.
Part 2 of #999
Closes: #999
---
metadata_columns.go | 25 +++
table/position_delta_writer.go | 273 ++++++++++++++++++++++++++
table/position_delta_writer_internal_test.go | 178 +++++++++++++++++
table/position_delta_writer_test.go | 274 +++++++++++++++++++++++++++
4 files changed, 750 insertions(+)
diff --git a/metadata_columns.go b/metadata_columns.go
index fa9e2db4..52322025 100644
--- a/metadata_columns.go
+++ b/metadata_columns.go
@@ -100,3 +100,28 @@ func SchemaWithRowLineage(s *Schema) *Schema {
return NewSchemaWithIdentifiers(s.ID, s.IdentifierFieldIDs, fields...)
}
+
+// SchemaWithRowID returns a new schema with only the _row_id metadata column
+// appended. _last_updated_sequence_number is intentionally omitted: leaving it
+// absent in the written Parquet means readers synthesize it from the manifest
+// entry's data_sequence_number, which is the new file's snapshot sequence
+// number after the rewrite — exactly the value the spec requires for rewritten
+// rows without an explicit override.
+//
+// Idempotent on RowIDFieldID; allocates a fresh field slice.
+func SchemaWithRowID(s *Schema) *Schema {
+ if s == nil {
+ return nil
+ }
+ fields := slices.Clone(s.Fields())
+
+ for _, f := range fields {
+ if f.ID == RowIDFieldID {
+ return NewSchemaWithIdentifiers(s.ID,
s.IdentifierFieldIDs, fields...)
+ }
+ }
+
+ fields = append(fields, RowID())
+
+ return NewSchemaWithIdentifiers(s.ID, s.IdentifierFieldIDs, fields...)
+}
diff --git a/table/position_delta_writer.go b/table/position_delta_writer.go
new file mode 100644
index 00000000..8facfcb5
--- /dev/null
+++ b/table/position_delta_writer.go
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "fmt"
+ "iter"
+ "strconv"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ "github.com/google/uuid"
+)
+
+// PositionDeltaWriter writes data files for the position-delta MoR pattern,
+// distinguishing reinserted rows (survivors of a position-delta rewrite that
+// preserve their original _row_id) from fresh inserts (rows that get a new
+// _row_id synthesized at read time).
+//
+// Scope: this writer produces *data files only*. The position-delete entries
+// that pair with reinserts (and that turn an UPDATE into delete-old +
reinsert)
+// are not emitted here — the engine driver is responsible for composing this
+// writer with a position-delete writer and committing both via a
RowDelta-style
+// snapshot. This mirrors the data-file half of Java's
+// SparkPositionDeltaWrite.reinsert(meta, row) vs insert(row) split.
+//
+// _last_updated_sequence_number is intentionally not written: the row-id-only
+// file schema lets the reader synthesize it from the manifest entry's
+// data_sequence_number, which after the rewrite is the new snapshot's sequence
+// number — the value the spec requires.
+//
+// Memory model: Reinsert and Insert retain each batch and accumulate them in
+// memory until [PositionDeltaWriter.Close] flushes everything to Parquet in a
+// single WriteRecords pass. Peak memory therefore scales with the total Arrow
+// payload of all staged batches, which is appropriate for engine-driver UPDATE
+// flows but not for full-partition compaction. A streaming variant that takes
+// the iterator at construction time is left for the position-delete writer PR.
+//
+// Usage:
+//
+// w, err := table.NewPositionDeltaWriter(tbl)
+// w.Reinsert(survivorBatch) // batch must include _row_id column with
non-null values
+// w.Insert(freshBatch) // batch without _row_id (writer appends
nulls)
+// dataFiles, err := w.Close(ctx)
+type PositionDeltaWriter struct {
+ tbl *Table
+ writeUUID uuid.UUID
+ // userOpts are caller-supplied [WriteRecordOption]s threaded through to
+ // the underlying [WriteRecords] call. WithWriteUUID and
+ // WithPreserveRowLineage are owned by this writer and are appended
after
+ // userOpts so they take precedence.
+ userOpts []WriteRecordOption
+
+ // reinsertBatches hold records with explicit _row_id values
(preserving lineage).
+ reinsertBatches []arrow.RecordBatch
+ // insertBatches hold fresh records without lineage.
+ insertBatches []arrow.RecordBatch
+
+ closed bool
+}
+
+// NewPositionDeltaWriter creates a writer for the position-delta MoR update
+// pattern on the given table. The table must be format version 3 or higher.
+//
+// Caller-supplied opts are forwarded to the underlying [WriteRecords] call at
+// [PositionDeltaWriter.Close]. WithWriteUUID and WithPreserveRowLineage are
+// reserved by this writer and will override any caller-supplied values for
+// those two options; everything else (target file size, worker count,
+// clustered write) flows through.
+func NewPositionDeltaWriter(tbl *Table, opts ...WriteRecordOption)
(*PositionDeltaWriter, error) {
+ if tbl.metadata.Version() < 3 {
+ return nil, fmt.Errorf("%w: PositionDeltaWriter requires format
version >= 3, got %d",
+ iceberg.ErrInvalidArgument, tbl.metadata.Version())
+ }
+
+ return &PositionDeltaWriter{
+ tbl: tbl,
+ writeUUID: uuid.New(),
+ userOpts: opts,
+ }, nil
+}
+
+// Reinsert adds survivor rows that preserve their original _row_id. The batch
+// MUST contain a _row_id column (field name "_row_id") with non-null int64
+// values representing the preserved row identities.
+//
+// Per the Iceberg spec, only position-delete rewrites and CoW can preserve
+// lineage. Equality deletes cannot preserve lineage because the engine writes
+// without reading old identity.
+func (w *PositionDeltaWriter) Reinsert(batch arrow.RecordBatch) error {
+ if w.closed {
+ return fmt.Errorf("%w: writer is already closed",
ErrInvalidOperation)
+ }
+
+ indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName)
+ if len(indices) == 0 {
+ return fmt.Errorf("%w: Reinsert batch must contain %s column",
+ iceberg.ErrInvalidArgument, iceberg.RowIDColumnName)
+ }
+
+ col := batch.Column(indices[0])
+ if col.NullN() > 0 {
+ return fmt.Errorf("%w: Reinsert batch %s column must not
contain null values",
+ iceberg.ErrInvalidArgument, iceberg.RowIDColumnName)
+ }
+
+ batch.Retain()
+ w.reinsertBatches = append(w.reinsertBatches, batch)
+
+ return nil
+}
+
+// Insert adds fresh rows that get a new _row_id at read time. The batch should
+// NOT contain a _row_id column; if it does, all values must be null. These
rows
+// represent genuinely new data (not survivors of a rewrite).
+func (w *PositionDeltaWriter) Insert(batch arrow.RecordBatch) error {
+ if w.closed {
+ return fmt.Errorf("%w: writer is already closed",
ErrInvalidOperation)
+ }
+
+ if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName);
len(indices) > 0 {
+ if batch.Column(indices[0]).NullN() != int(batch.NumRows()) {
+ return fmt.Errorf("%w: Insert batch %s column must be
all null (use Reinsert for preserved IDs)",
+ iceberg.ErrInvalidArgument,
iceberg.RowIDColumnName)
+ }
+ }
+
+ batch.Retain()
+ w.insertBatches = append(w.insertBatches, batch)
+
+ return nil
+}
+
+// Close finalizes the writer and returns the data files produced. The returned
+// files contain both reinserted and fresh rows, with the _row_id column
written
+// explicitly for reinserted rows (non-null) and left null for fresh inserts.
+//
+// The caller is responsible for adding these files to a snapshot (typically
via
+// a Transaction's snapshot producer) along with any position-delete entries
+// that pair with the reinserts.
+func (w *PositionDeltaWriter) Close(ctx context.Context) ([]iceberg.DataFile,
error) {
+ if w.closed {
+ return nil, fmt.Errorf("%w: writer is already closed",
ErrInvalidOperation)
+ }
+ w.closed = true
+
+ defer func() {
+ for _, b := range w.reinsertBatches {
+ b.Release()
+ }
+ for _, b := range w.insertBatches {
+ b.Release()
+ }
+ }()
+
+ if len(w.reinsertBatches) == 0 && len(w.insertBatches) == 0 {
+ return nil, nil
+ }
+
+ fileSchema := iceberg.SchemaWithRowID(w.tbl.Schema())
+ arrowSc, err := SchemaToArrowSchema(fileSchema, nil, true, false)
+ if err != nil {
+ return nil, fmt.Errorf("PositionDeltaWriter: build arrow
schema: %w", err)
+ }
+
+ writeOpts := append(append([]WriteRecordOption{}, w.userOpts...),
+ WithPreserveRowLineage(fileSchema),
+ WithWriteUUID(w.writeUUID),
+ )
+
+ records := w.buildUnifiedIterator()
+
+ var result []iceberg.DataFile
+ for df, err := range WriteRecords(ctx, w.tbl, arrowSc, records,
writeOpts...) {
+ if err != nil {
+ return nil, fmt.Errorf("PositionDeltaWriter: write
records: %w", err)
+ }
+ result = append(result, df)
+ }
+
+ return result, nil
+}
+
+// buildUnifiedIterator merges reinsert and insert batches into a single
+// iterator. Reinsert batches already have _row_id; insert batches get a null
+// _row_id column appended.
+//
+// Releases are owned by [WriteRecords]'s releasing wrapper: each batch yielded
+// here will be released exactly once by the consumer, on both the success and
+// the early-stop paths. Releasing again here would drive Arrow's refcount
+// below zero (undefined behavior — happy-path tests don't catch it; any IO
+// error mid-write would silently corrupt the allocator).
+func (w *PositionDeltaWriter) buildUnifiedIterator()
iter.Seq2[arrow.RecordBatch, error] {
+ return func(yield func(arrow.RecordBatch, error) bool) {
+ alloc := memory.NewGoAllocator()
+
+ for _, batch := range w.reinsertBatches {
+ batch.Retain()
+ if !yield(batch, nil) {
+ return
+ }
+ }
+
+ for _, batch := range w.insertBatches {
+ enriched, err := appendNullRowIDColumn(alloc, batch)
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+ if !yield(enriched, nil) {
+ return
+ }
+ }
+ }
+}
+
+// appendNullRowIDColumn appends a null-filled _row_id column to a batch that
+// doesn't have one, signaling that these are fresh inserts. If the batch
+// already has _row_id (validated to be all-null by Insert), it is returned
+// retained as-is.
+func appendNullRowIDColumn(alloc memory.Allocator, batch arrow.RecordBatch)
(arrow.RecordBatch, error) {
+ if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName);
len(indices) > 0 {
+ batch.Retain()
+
+ return batch, nil
+ }
+
+ nrows := batch.NumRows()
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+ bldr.AppendNulls(int(nrows))
+ nullCol := bldr.NewArray()
+ defer nullCol.Release()
+
+ rowIDField := arrow.Field{
+ Name: iceberg.RowIDColumnName,
+ Type: arrow.PrimitiveTypes.Int64,
+ Nullable: true,
+ Metadata: arrow.MetadataFrom(map[string]string{
+ ArrowParquetFieldIDKey:
strconv.Itoa(iceberg.RowIDFieldID),
+ }),
+ }
+
+ // Three-index slice the source field/column slices: append into a
+ // shared backing array would silently mutate the source batch's
+ // internal arrs/fields when len < cap.
+ srcFields := batch.Schema().Fields()
+ srcCols := batch.Columns()
+ fields := append(srcFields[:len(srcFields):len(srcFields)], rowIDField)
+ cols := append(srcCols[:len(srcCols):len(srcCols)], nullCol)
+ newSchema := arrow.NewSchema(fields, nil)
+
+ return array.NewRecordBatch(newSchema, cols, nrows), nil
+}
diff --git a/table/position_delta_writer_internal_test.go
b/table/position_delta_writer_internal_test.go
new file mode 100644
index 00000000..3222501a
--- /dev/null
+++ b/table/position_delta_writer_internal_test.go
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ "github.com/stretchr/testify/require"
+)
+
+// TestPositionDeltaWriter_BuildUnifiedIterator_AbortNoDoubleRelease verifies
+// that buildUnifiedIterator does not release batches it has already yielded
+// when the consumer aborts mid-stream.
+//
+// The releasing wrapper inside [WriteRecords] (write_records.go) owns the
+// release on both the success and the early-stop paths. If the producer also
+// releases on !yield, refcount would go negative — undefined behavior in
+// Arrow that happy-path tests miss because no IO error ever fires the abort
+// branch. This test simulates that wrapper contract directly on both the
+// reinsert path and the insert (enriched) path.
+func TestPositionDeltaWriter_BuildUnifiedIterator_AbortNoDoubleRelease(t
*testing.T) {
+ cases := []struct {
+ name string
+ numReinserts int
+ numInserts int
+ // abortAfter yields are consumed and released by the test,
then the
+ // loop breaks — triggering the producer's !yield branch on
whichever
+ // phase the next yield would have come from.
+ abortAfter int
+ abortPhase string
+ }{
+ {name: "abort_on_reinsert", numReinserts: 2, numInserts: 1,
abortAfter: 1, abortPhase: "reinsert"},
+ {name: "abort_on_insert", numReinserts: 0, numInserts: 2,
abortAfter: 1, abortPhase: "insert"},
+ {name: "abort_at_phase_boundary", numReinserts: 1, numInserts:
1, abortAfter: 1, abortPhase: "reinsert"},
+ }
+
+ for _, tc := range cases {
+ t.Run(tc.name, func(t *testing.T) {
+ mem :=
memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ w := &PositionDeltaWriter{}
+
+ reinserts := make([]arrow.RecordBatch, 0,
tc.numReinserts)
+ for i := 0; i < tc.numReinserts; i++ {
+ b := buildIntPlusRowIDBatch(mem,
[]int64{int64(i)}, []int64{int64(1<<40 + i)})
+ reinserts = append(reinserts, b)
+ require.NoError(t, w.Reinsert(b))
+ }
+ inserts := make([]arrow.RecordBatch, 0, tc.numInserts)
+ for i := 0; i < tc.numInserts; i++ {
+ b := buildIntBatch(mem, []int64{int64(100 + i)})
+ inserts = append(inserts, b)
+ require.NoError(t, w.Insert(b))
+ }
+ defer func() {
+ for _, b := range reinserts {
+ b.Release()
+ }
+ for _, b := range inserts {
+ b.Release()
+ }
+ }()
+
+ records := w.buildUnifiedIterator()
+ count := 0
+ for rec, err := range records {
+ require.NoError(t, err)
+ rec.Release()
+ count++
+ if count >= tc.abortAfter {
+ break
+ }
+ }
+ require.Equal(t, tc.abortAfter, count, "unexpected
yield count for %s", tc.abortPhase)
+
+ // Run the slice-release deferred work that Close()
would do, so
+ // the CheckedAllocator's AssertSize(0) check is
meaningful.
+ for _, b := range w.reinsertBatches {
+ b.Release()
+ }
+ for _, b := range w.insertBatches {
+ b.Release()
+ }
+ })
+ }
+}
+
+// TestPositionDeltaWriter_AppendNullRowIDColumn_NoSourceAliasing verifies that
+// appendNullRowIDColumn does not mutate the source batch's internal field /
+// column slices when those slices have spare capacity. A naive append into a
+// shared backing array would silently corrupt the caller's batch.
+func TestPositionDeltaWriter_AppendNullRowIDColumn_NoSourceAliasing(t
*testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ // Build a batch whose schema fields and column slice both have capacity
+ // > length, so a plain append would write into the shared backing
array.
+ idBldr := array.NewInt64Builder(mem)
+ defer idBldr.Release()
+ idBldr.AppendValues([]int64{1, 2}, nil)
+ idArr := idBldr.NewArray()
+ defer idArr.Release()
+
+ srcCols := append(make([]arrow.Array, 0, 4), idArr)
+ srcFields := append(make([]arrow.Field, 0, 4), arrow.Field{
+ Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false,
+ })
+ srcSchema := arrow.NewSchema(srcFields, nil)
+ src := array.NewRecordBatch(srcSchema, srcCols, 2)
+ defer src.Release()
+
+ enriched, err := appendNullRowIDColumn(mem, src)
+ require.NoError(t, err)
+ defer enriched.Release()
+
+ require.Equal(t, 1, src.Schema().NumFields(),
+ "appendNullRowIDColumn must not mutate source schema field
count")
+ require.Equal(t, 1, len(src.Columns()),
+ "appendNullRowIDColumn must not mutate source columns")
+ require.Equal(t, 2, enriched.Schema().NumFields())
+ require.Equal(t, iceberg.RowIDColumnName,
enriched.Schema().Field(1).Name)
+
+ fieldIDStr, ok :=
enriched.Schema().Field(1).Metadata.GetValue(ArrowParquetFieldIDKey)
+ require.True(t, ok, "appended _row_id field must carry %s metadata",
ArrowParquetFieldIDKey)
+ require.Equal(t, "2147483540", fieldIDStr)
+}
+
+func buildIntPlusRowIDBatch(mem memory.Allocator, ids, rowIDs []int64)
arrow.RecordBatch {
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: iceberg.RowIDColumnName, Type:
arrow.PrimitiveTypes.Int64, Nullable: true},
+ }, nil)
+ idBldr := array.NewInt64Builder(mem)
+ defer idBldr.Release()
+ idBldr.AppendValues(ids, nil)
+ rowIDBldr := array.NewInt64Builder(mem)
+ defer rowIDBldr.Release()
+ rowIDBldr.AppendValues(rowIDs, nil)
+ idArr := idBldr.NewArray()
+ defer idArr.Release()
+ rowIDArr := rowIDBldr.NewArray()
+ defer rowIDArr.Release()
+
+ return array.NewRecordBatch(schema, []arrow.Array{idArr, rowIDArr},
int64(len(ids)))
+}
+
+func buildIntBatch(mem memory.Allocator, ids []int64) arrow.RecordBatch {
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ }, nil)
+ idBldr := array.NewInt64Builder(mem)
+ defer idBldr.Release()
+ idBldr.AppendValues(ids, nil)
+ idArr := idBldr.NewArray()
+ defer idArr.Release()
+
+ return array.NewRecordBatch(schema, []arrow.Array{idArr},
int64(len(ids)))
+}
diff --git a/table/position_delta_writer_test.go
b/table/position_delta_writer_test.go
new file mode 100644
index 00000000..f4289798
--- /dev/null
+++ b/table/position_delta_writer_test.go
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestPositionDeltaWriter_ReinsertPreservesRowID(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+
+ tbl := newV3RowLineageTestTable(t)
+
+ // Append initial data: id=1, id=2 → _row_id 0, 1 in the initial file.
+ dataSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+ initial, err := array.TableFromJSON(mem, dataSchema, []string{
+ `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}]`,
+ })
+ require.NoError(t, err)
+ defer initial.Release()
+
+ tbl, err = tbl.Append(ctx, array.NewTableReader(initial, -1), nil)
+ require.NoError(t, err)
+
+ // Capture the initial data file so we can replace it via NewRewrite.
+ initialFiles, err := planFilesAll(ctx, tbl)
+ require.NoError(t, err)
+ require.Len(t, initialFiles, 1)
+ initialFile := initialFiles[0]
+
+ // Create a PositionDeltaWriter and stage:
+ // - Reinsert(id=1, _row_id=0) — survivor with updated value.
+ // - Reinsert(id=2, _row_id=1) — unchanged survivor.
+ // - Insert(id=99) — fresh row, gets synthesized _row_id at read time.
+ w, err := table.NewPositionDeltaWriter(tbl)
+ require.NoError(t, err)
+
+ // Use _row_id values far outside any plausible synthesized first_row_id
+ // range so the assertions actually prove preservation: if the writer
+ // silently dropped _row_id and the reader fell back to synthesizing IDs
+ // from the new file's first_row_id, the survivors would *not* land on
+ // these high values. Picking small IDs (e.g. 0, 1) would be
+ // indistinguishable from a synthesized 0/1 in the rewritten file.
+ const (
+ preservedRowID1 = int64(1) << 40
+ preservedRowID2 = preservedRowID1 + 1
+ )
+
+ reinsertBatch := buildReinsertBatch(mem,
+ []int64{1, 2},
+ []string{"a_updated", "b"},
+ []int64{preservedRowID1, preservedRowID2},
+ )
+ defer reinsertBatch.Release()
+ require.NoError(t, w.Reinsert(reinsertBatch))
+
+ insertBatch := buildInsertBatch(mem, []int64{99}, []string{"new"})
+ defer insertBatch.Release()
+ require.NoError(t, w.Insert(insertBatch))
+
+ dataFiles, err := w.Close(ctx)
+ require.NoError(t, err)
+ require.NotEmpty(t, dataFiles)
+
+ // Atomically replace the initial file with the writer's output. This is
+ // a library-internal rewrite (NewRewrite skips the explicit
first_row_id
+ // requirement that AddDataFiles enforces for externally-written files).
+ tx := tbl.NewTransaction()
+ rw := tx.NewRewrite(nil)
+ rw.DeleteFile(initialFile)
+ for _, df := range dataFiles {
+ rw.AddDataFile(df)
+ }
+ require.NoError(t, rw.Commit(ctx))
+ tbl, err = tx.Commit(ctx)
+ require.NoError(t, err)
+
+ // Scan with lineage — survivors must keep their original _row_ids; the
+ // fresh row gets a new synthesized _row_id from the new file's
first_row_id.
+ scan := tbl.Scan(table.WithRowLineage())
+ _, itr, err := scan.ToArrowRecords(ctx)
+ require.NoError(t, err)
+
+ rowsByID := map[int64]int64{}
+ for rec, err := range itr {
+ require.NoError(t, err)
+ idCol :=
rec.Column(rec.Schema().FieldIndices("id")[0]).(*array.Int64)
+ rowIDCol :=
rec.Column(rec.Schema().FieldIndices(iceberg.RowIDColumnName)[0]).(*array.Int64)
+ for i := 0; i < int(rec.NumRows()); i++ {
+ rowsByID[idCol.Value(i)] = rowIDCol.Value(i)
+ }
+ rec.Release()
+ }
+
+ require.Contains(t, rowsByID, int64(1))
+ require.Contains(t, rowsByID, int64(2))
+ require.Contains(t, rowsByID, int64(99))
+
+ assert.Equal(t, preservedRowID1, rowsByID[1], "reinserted id=1 must
preserve original _row_id")
+ assert.Equal(t, preservedRowID2, rowsByID[2], "reinserted id=2 must
preserve original _row_id")
+ // The fresh row's synthesized _row_id must not collide with the
preserved
+ // survivors. With first_row_id allocated from a fresh assign-counter,
it
+ // will sit in a low range and the high preserved values are
unreachable.
+ assert.NotEqual(t, preservedRowID1, rowsByID[99])
+ assert.NotEqual(t, preservedRowID2, rowsByID[99])
+}
+
+// buildReinsertBatch creates an Arrow record batch with id, data, and _row_id
+// columns suitable for PositionDeltaWriter.Reinsert.
+func buildReinsertBatch(mem memory.Allocator, ids []int64, data []string,
rowIDs []int64) arrow.RecordBatch {
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ {Name: iceberg.RowIDColumnName, Type:
arrow.PrimitiveTypes.Int64, Nullable: true},
+ }, nil)
+
+ idBldr := array.NewInt64Builder(mem)
+ defer idBldr.Release()
+ idBldr.AppendValues(ids, nil)
+
+ dataBldr := array.NewStringBuilder(mem)
+ defer dataBldr.Release()
+ for _, s := range data {
+ dataBldr.Append(s)
+ }
+
+ rowIDBldr := array.NewInt64Builder(mem)
+ defer rowIDBldr.Release()
+ rowIDBldr.AppendValues(rowIDs, nil)
+
+ idArr := idBldr.NewArray()
+ defer idArr.Release()
+ dataArr := dataBldr.NewArray()
+ defer dataArr.Release()
+ rowIDArr := rowIDBldr.NewArray()
+ defer rowIDArr.Release()
+
+ return array.NewRecordBatch(schema, []arrow.Array{idArr, dataArr,
rowIDArr}, int64(len(ids)))
+}
+
+// buildInsertBatch creates an Arrow record batch with id and data columns.
+func buildInsertBatch(mem memory.Allocator, ids []int64, data []string)
arrow.RecordBatch {
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ idBldr := array.NewInt64Builder(mem)
+ defer idBldr.Release()
+ idBldr.AppendValues(ids, nil)
+
+ dataBldr := array.NewStringBuilder(mem)
+ defer dataBldr.Release()
+ for _, s := range data {
+ dataBldr.Append(s)
+ }
+
+ idArr := idBldr.NewArray()
+ defer idArr.Release()
+ dataArr := dataBldr.NewArray()
+ defer dataArr.Release()
+
+ return array.NewRecordBatch(schema, []arrow.Array{idArr, dataArr},
int64(len(ids)))
+}
+
+// planFilesAll returns every data file currently visible in the table's
+// scan plan — used by tests that want to take a single existing data file
+// and feed it back into NewRewrite.
+func planFilesAll(ctx context.Context, tbl *table.Table) ([]iceberg.DataFile,
error) {
+ tasks, err := tbl.Scan().PlanFiles(ctx)
+ if err != nil {
+ return nil, err
+ }
+ files := make([]iceberg.DataFile, 0, len(tasks))
+ for _, t := range tasks {
+ files = append(files, t.File)
+ }
+
+ return files, nil
+}
+
+func TestPositionDeltaWriter_RequiresV3(t *testing.T) {
+ tbl := newV3RowLineageTestTable(t)
+ // newV3RowLineageTestTable already creates a v3 table, so this should
succeed.
+ w, err := table.NewPositionDeltaWriter(tbl)
+ require.NoError(t, err)
+ require.NotNil(t, w)
+}
+
+func TestPositionDeltaWriter_ReinsertRejectsNullRowID(t *testing.T) {
+ mem := memory.DefaultAllocator
+ tbl := newV3RowLineageTestTable(t)
+
+ w, err := table.NewPositionDeltaWriter(tbl)
+ require.NoError(t, err)
+
+ // Build a batch with a null _row_id.
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: iceberg.RowIDColumnName, Type:
arrow.PrimitiveTypes.Int64, Nullable: true},
+ }, nil)
+
+ idBldr := array.NewInt64Builder(mem)
+ defer idBldr.Release()
+ idBldr.Append(1)
+ rowIDBldr := array.NewInt64Builder(mem)
+ defer rowIDBldr.Release()
+ rowIDBldr.AppendNull()
+
+ idArr := idBldr.NewArray()
+ defer idArr.Release()
+ rowIDArr := rowIDBldr.NewArray()
+ defer rowIDArr.Release()
+
+ batch := array.NewRecordBatch(schema, []arrow.Array{idArr, rowIDArr}, 1)
+ defer batch.Release()
+
+ err = w.Reinsert(batch)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "must not contain null")
+}
+
+func TestPositionDeltaWriter_ReinsertRejectsMissingColumn(t *testing.T) {
+ mem := memory.DefaultAllocator
+ tbl := newV3RowLineageTestTable(t)
+
+ w, err := table.NewPositionDeltaWriter(tbl)
+ require.NoError(t, err)
+
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ }, nil)
+
+ idBldr := array.NewInt64Builder(mem)
+ defer idBldr.Release()
+ idBldr.Append(1)
+ idArr := idBldr.NewArray()
+ defer idArr.Release()
+
+ batch := array.NewRecordBatch(schema, []arrow.Array{idArr}, 1)
+ defer batch.Release()
+
+ err = w.Reinsert(batch)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "must contain _row_id")
+}