This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new bf990e88 feat(table): MoR equality-rewrite - insert/reinsert writer 
API (#1120)
bf990e88 is described below

commit bf990e88b23bb76669e06c813ac47e11d401bf2c
Author: Tanmay Rauth <[email protected]>
AuthorDate: Wed May 27 16:14:56 2026 +0530

    feat(table): MoR equality-rewrite - insert/reinsert writer API (#1120)
    
    Adds `table.PositionDeltaWriter` for the MoR insert/reinsert split on v3
    tables. Mirrors the data-file half of Java's `SparkPositionDeltaWrite`:
    
    - `Reinsert(batch)` — survivor rows from a position-delta rewrite, with
    explicit non-null `_row_id` values to preserve identity.
    - `Insert(batch)` — fresh rows; writer appends a null `_row_id` column
    so the reader synthesizes a new ID at scan time.
    
    Output schema is `SchemaWithRowID` (a new helper): `_row_id` written
    explicitly, `_last_updated_sequence_number` left absent so it inherits
    from the manifest entry's `data_sequence_number` — exactly the value the
    spec requires for rewritten rows.
    
    Scope: data files only. The position-delete entries that pair with
    reinserts (turning an UPDATE into delete-old + reinsert) are the engine
    driver's responsibility — the writer doc spells this out so a reader
    doesn't expect Java's full `SparkPositionDeltaWrite` semantics from the
    name alone.
    
    
    Part 2 of #999
    Closes: #999
---
 metadata_columns.go                          |  25 +++
 table/position_delta_writer.go               | 273 ++++++++++++++++++++++++++
 table/position_delta_writer_internal_test.go | 178 +++++++++++++++++
 table/position_delta_writer_test.go          | 274 +++++++++++++++++++++++++++
 4 files changed, 750 insertions(+)

diff --git a/metadata_columns.go b/metadata_columns.go
index fa9e2db4..52322025 100644
--- a/metadata_columns.go
+++ b/metadata_columns.go
@@ -100,3 +100,28 @@ func SchemaWithRowLineage(s *Schema) *Schema {
 
        return NewSchemaWithIdentifiers(s.ID, s.IdentifierFieldIDs, fields...)
 }
+
+// SchemaWithRowID returns a new schema with only the _row_id metadata column
+// appended. _last_updated_sequence_number is intentionally omitted: leaving it
+// absent in the written Parquet means readers synthesize it from the manifest
+// entry's data_sequence_number, which is the new file's snapshot sequence
+// number after the rewrite — exactly the value the spec requires for rewritten
+// rows without an explicit override.
+//
+// Idempotent on RowIDFieldID; allocates a fresh field slice.
+func SchemaWithRowID(s *Schema) *Schema {
+       if s == nil {
+               return nil
+       }
+       fields := slices.Clone(s.Fields())
+
+       for _, f := range fields {
+               if f.ID == RowIDFieldID {
+                       return NewSchemaWithIdentifiers(s.ID, 
s.IdentifierFieldIDs, fields...)
+               }
+       }
+
+       fields = append(fields, RowID())
+
+       return NewSchemaWithIdentifiers(s.ID, s.IdentifierFieldIDs, fields...)
+}
diff --git a/table/position_delta_writer.go b/table/position_delta_writer.go
new file mode 100644
index 00000000..8facfcb5
--- /dev/null
+++ b/table/position_delta_writer.go
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "iter"
+       "strconv"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       "github.com/google/uuid"
+)
+
+// PositionDeltaWriter writes data files for the position-delta MoR pattern,
+// distinguishing reinserted rows (survivors of a position-delta rewrite that
+// preserve their original _row_id) from fresh inserts (rows that get a new
+// _row_id synthesized at read time).
+//
+// Scope: this writer produces *data files only*. The position-delete entries
+// that pair with reinserts (and that turn an UPDATE into delete-old + 
reinsert)
+// are not emitted here — the engine driver is responsible for composing this
+// writer with a position-delete writer and committing both via a 
RowDelta-style
+// snapshot. This mirrors the data-file half of Java's
+// SparkPositionDeltaWrite.reinsert(meta, row) vs insert(row) split.
+//
+// _last_updated_sequence_number is intentionally not written: the row-id-only
+// file schema lets the reader synthesize it from the manifest entry's
+// data_sequence_number, which after the rewrite is the new snapshot's sequence
+// number — the value the spec requires.
+//
+// Memory model: Reinsert and Insert retain each batch and accumulate them in
+// memory until [PositionDeltaWriter.Close] flushes everything to Parquet in a
+// single WriteRecords pass. Peak memory therefore scales with the total Arrow
+// payload of all staged batches, which is appropriate for engine-driver UPDATE
+// flows but not for full-partition compaction. A streaming variant that takes
+// the iterator at construction time is left for the position-delete writer PR.
+//
+// Usage:
+//
+//     w, err := table.NewPositionDeltaWriter(tbl)
+//     w.Reinsert(survivorBatch)  // batch must include _row_id column with 
non-null values
+//     w.Insert(freshBatch)       // batch without _row_id (writer appends 
nulls)
+//     dataFiles, err := w.Close(ctx)
+type PositionDeltaWriter struct {
+       tbl       *Table
+       writeUUID uuid.UUID
+       // userOpts are caller-supplied [WriteRecordOption]s threaded through to
+       // the underlying [WriteRecords] call. WithWriteUUID and
+       // WithPreserveRowLineage are owned by this writer and are appended 
after
+       // userOpts so they take precedence.
+       userOpts []WriteRecordOption
+
+       // reinsertBatches hold records with explicit _row_id values 
(preserving lineage).
+       reinsertBatches []arrow.RecordBatch
+       // insertBatches hold fresh records without lineage.
+       insertBatches []arrow.RecordBatch
+
+       closed bool
+}
+
+// NewPositionDeltaWriter creates a writer for the position-delta MoR update
+// pattern on the given table. The table must be format version 3 or higher.
+//
+// Caller-supplied opts are forwarded to the underlying [WriteRecords] call at
+// [PositionDeltaWriter.Close]. WithWriteUUID and WithPreserveRowLineage are
+// reserved by this writer and will override any caller-supplied values for
+// those two options; everything else (target file size, worker count,
+// clustered write) flows through.
+func NewPositionDeltaWriter(tbl *Table, opts ...WriteRecordOption) 
(*PositionDeltaWriter, error) {
+       if tbl.metadata.Version() < 3 {
+               return nil, fmt.Errorf("%w: PositionDeltaWriter requires format 
version >= 3, got %d",
+                       iceberg.ErrInvalidArgument, tbl.metadata.Version())
+       }
+
+       return &PositionDeltaWriter{
+               tbl:       tbl,
+               writeUUID: uuid.New(),
+               userOpts:  opts,
+       }, nil
+}
+
+// Reinsert adds survivor rows that preserve their original _row_id. The batch
+// MUST contain a _row_id column (field name "_row_id") with non-null int64
+// values representing the preserved row identities.
+//
+// Per the Iceberg spec, only position-delete rewrites and CoW can preserve
+// lineage. Equality deletes cannot preserve lineage because the engine writes
+// without reading old identity.
+func (w *PositionDeltaWriter) Reinsert(batch arrow.RecordBatch) error {
+       if w.closed {
+               return fmt.Errorf("%w: writer is already closed", 
ErrInvalidOperation)
+       }
+
+       indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName)
+       if len(indices) == 0 {
+               return fmt.Errorf("%w: Reinsert batch must contain %s column",
+                       iceberg.ErrInvalidArgument, iceberg.RowIDColumnName)
+       }
+
+       col := batch.Column(indices[0])
+       if col.NullN() > 0 {
+               return fmt.Errorf("%w: Reinsert batch %s column must not 
contain null values",
+                       iceberg.ErrInvalidArgument, iceberg.RowIDColumnName)
+       }
+
+       batch.Retain()
+       w.reinsertBatches = append(w.reinsertBatches, batch)
+
+       return nil
+}
+
+// Insert adds fresh rows that get a new _row_id at read time. The batch should
+// NOT contain a _row_id column; if it does, all values must be null. These 
rows
+// represent genuinely new data (not survivors of a rewrite).
+func (w *PositionDeltaWriter) Insert(batch arrow.RecordBatch) error {
+       if w.closed {
+               return fmt.Errorf("%w: writer is already closed", 
ErrInvalidOperation)
+       }
+
+       if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName); 
len(indices) > 0 {
+               if batch.Column(indices[0]).NullN() != int(batch.NumRows()) {
+                       return fmt.Errorf("%w: Insert batch %s column must be 
all null (use Reinsert for preserved IDs)",
+                               iceberg.ErrInvalidArgument, 
iceberg.RowIDColumnName)
+               }
+       }
+
+       batch.Retain()
+       w.insertBatches = append(w.insertBatches, batch)
+
+       return nil
+}
+
+// Close finalizes the writer and returns the data files produced. The returned
+// files contain both reinserted and fresh rows, with the _row_id column 
written
+// explicitly for reinserted rows (non-null) and left null for fresh inserts.
+//
+// The caller is responsible for adding these files to a snapshot (typically 
via
+// a Transaction's snapshot producer) along with any position-delete entries
+// that pair with the reinserts.
+func (w *PositionDeltaWriter) Close(ctx context.Context) ([]iceberg.DataFile, 
error) {
+       if w.closed {
+               return nil, fmt.Errorf("%w: writer is already closed", 
ErrInvalidOperation)
+       }
+       w.closed = true
+
+       defer func() {
+               for _, b := range w.reinsertBatches {
+                       b.Release()
+               }
+               for _, b := range w.insertBatches {
+                       b.Release()
+               }
+       }()
+
+       if len(w.reinsertBatches) == 0 && len(w.insertBatches) == 0 {
+               return nil, nil
+       }
+
+       fileSchema := iceberg.SchemaWithRowID(w.tbl.Schema())
+       arrowSc, err := SchemaToArrowSchema(fileSchema, nil, true, false)
+       if err != nil {
+               return nil, fmt.Errorf("PositionDeltaWriter: build arrow 
schema: %w", err)
+       }
+
+       writeOpts := append(append([]WriteRecordOption{}, w.userOpts...),
+               WithPreserveRowLineage(fileSchema),
+               WithWriteUUID(w.writeUUID),
+       )
+
+       records := w.buildUnifiedIterator()
+
+       var result []iceberg.DataFile
+       for df, err := range WriteRecords(ctx, w.tbl, arrowSc, records, 
writeOpts...) {
+               if err != nil {
+                       return nil, fmt.Errorf("PositionDeltaWriter: write 
records: %w", err)
+               }
+               result = append(result, df)
+       }
+
+       return result, nil
+}
+
+// buildUnifiedIterator merges reinsert and insert batches into a single
+// iterator. Reinsert batches already have _row_id; insert batches get a null
+// _row_id column appended.
+//
+// Releases are owned by [WriteRecords]'s releasing wrapper: each batch yielded
+// here will be released exactly once by the consumer, on both the success and
+// the early-stop paths. Releasing again here would drive Arrow's refcount
+// below zero (undefined behavior — happy-path tests don't catch it; any IO
+// error mid-write would silently corrupt the allocator).
+func (w *PositionDeltaWriter) buildUnifiedIterator() 
iter.Seq2[arrow.RecordBatch, error] {
+       return func(yield func(arrow.RecordBatch, error) bool) {
+               alloc := memory.NewGoAllocator()
+
+               for _, batch := range w.reinsertBatches {
+                       batch.Retain()
+                       if !yield(batch, nil) {
+                               return
+                       }
+               }
+
+               for _, batch := range w.insertBatches {
+                       enriched, err := appendNullRowIDColumn(alloc, batch)
+                       if err != nil {
+                               yield(nil, err)
+
+                               return
+                       }
+                       if !yield(enriched, nil) {
+                               return
+                       }
+               }
+       }
+}
+
+// appendNullRowIDColumn appends a null-filled _row_id column to a batch that
+// doesn't have one, signaling that these are fresh inserts. If the batch
+// already has _row_id (validated to be all-null by Insert), it is returned
+// retained as-is.
+func appendNullRowIDColumn(alloc memory.Allocator, batch arrow.RecordBatch) 
(arrow.RecordBatch, error) {
+       if indices := batch.Schema().FieldIndices(iceberg.RowIDColumnName); 
len(indices) > 0 {
+               batch.Retain()
+
+               return batch, nil
+       }
+
+       nrows := batch.NumRows()
+       bldr := array.NewInt64Builder(alloc)
+       defer bldr.Release()
+       bldr.AppendNulls(int(nrows))
+       nullCol := bldr.NewArray()
+       defer nullCol.Release()
+
+       rowIDField := arrow.Field{
+               Name:     iceberg.RowIDColumnName,
+               Type:     arrow.PrimitiveTypes.Int64,
+               Nullable: true,
+               Metadata: arrow.MetadataFrom(map[string]string{
+                       ArrowParquetFieldIDKey: 
strconv.Itoa(iceberg.RowIDFieldID),
+               }),
+       }
+
+       // Three-index slice the source field/column slices: append into a
+       // shared backing array would silently mutate the source batch's
+       // internal arrs/fields when len < cap.
+       srcFields := batch.Schema().Fields()
+       srcCols := batch.Columns()
+       fields := append(srcFields[:len(srcFields):len(srcFields)], rowIDField)
+       cols := append(srcCols[:len(srcCols):len(srcCols)], nullCol)
+       newSchema := arrow.NewSchema(fields, nil)
+
+       return array.NewRecordBatch(newSchema, cols, nrows), nil
+}
diff --git a/table/position_delta_writer_internal_test.go 
b/table/position_delta_writer_internal_test.go
new file mode 100644
index 00000000..3222501a
--- /dev/null
+++ b/table/position_delta_writer_internal_test.go
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       "github.com/stretchr/testify/require"
+)
+
+// TestPositionDeltaWriter_BuildUnifiedIterator_AbortNoDoubleRelease verifies
+// that buildUnifiedIterator does not release batches it has already yielded
+// when the consumer aborts mid-stream.
+//
+// The releasing wrapper inside [WriteRecords] (write_records.go) owns the
+// release on both the success and the early-stop paths. If the producer also
+// releases on !yield, refcount would go negative — undefined behavior in
+// Arrow that happy-path tests miss because no IO error ever fires the abort
+// branch. This test simulates that wrapper contract directly on both the
+// reinsert path and the insert (enriched) path.
+func TestPositionDeltaWriter_BuildUnifiedIterator_AbortNoDoubleRelease(t 
*testing.T) {
+       cases := []struct {
+               name         string
+               numReinserts int
+               numInserts   int
+               // abortAfter yields are consumed and released by the test, 
then the
+               // loop breaks — triggering the producer's !yield branch on 
whichever
+               // phase the next yield would have come from.
+               abortAfter int
+               abortPhase string
+       }{
+               {name: "abort_on_reinsert", numReinserts: 2, numInserts: 1, 
abortAfter: 1, abortPhase: "reinsert"},
+               {name: "abort_on_insert", numReinserts: 0, numInserts: 2, 
abortAfter: 1, abortPhase: "insert"},
+               {name: "abort_at_phase_boundary", numReinserts: 1, numInserts: 
1, abortAfter: 1, abortPhase: "reinsert"},
+       }
+
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       mem := 
memory.NewCheckedAllocator(memory.NewGoAllocator())
+                       defer mem.AssertSize(t, 0)
+
+                       w := &PositionDeltaWriter{}
+
+                       reinserts := make([]arrow.RecordBatch, 0, 
tc.numReinserts)
+                       for i := 0; i < tc.numReinserts; i++ {
+                               b := buildIntPlusRowIDBatch(mem, 
[]int64{int64(i)}, []int64{int64(1<<40 + i)})
+                               reinserts = append(reinserts, b)
+                               require.NoError(t, w.Reinsert(b))
+                       }
+                       inserts := make([]arrow.RecordBatch, 0, tc.numInserts)
+                       for i := 0; i < tc.numInserts; i++ {
+                               b := buildIntBatch(mem, []int64{int64(100 + i)})
+                               inserts = append(inserts, b)
+                               require.NoError(t, w.Insert(b))
+                       }
+                       defer func() {
+                               for _, b := range reinserts {
+                                       b.Release()
+                               }
+                               for _, b := range inserts {
+                                       b.Release()
+                               }
+                       }()
+
+                       records := w.buildUnifiedIterator()
+                       count := 0
+                       for rec, err := range records {
+                               require.NoError(t, err)
+                               rec.Release()
+                               count++
+                               if count >= tc.abortAfter {
+                                       break
+                               }
+                       }
+                       require.Equal(t, tc.abortAfter, count, "unexpected 
yield count for %s", tc.abortPhase)
+
+                       // Run the slice-release deferred work that Close() 
would do, so
+                       // the CheckedAllocator's AssertSize(0) check is 
meaningful.
+                       for _, b := range w.reinsertBatches {
+                               b.Release()
+                       }
+                       for _, b := range w.insertBatches {
+                               b.Release()
+                       }
+               })
+       }
+}
+
+// TestPositionDeltaWriter_AppendNullRowIDColumn_NoSourceAliasing verifies that
+// appendNullRowIDColumn does not mutate the source batch's internal field /
+// column slices when those slices have spare capacity. A naive append into a
+// shared backing array would silently corrupt the caller's batch.
+func TestPositionDeltaWriter_AppendNullRowIDColumn_NoSourceAliasing(t 
*testing.T) {
+       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+       defer mem.AssertSize(t, 0)
+
+       // Build a batch whose schema fields and column slice both have capacity
+       // > length, so a plain append would write into the shared backing 
array.
+       idBldr := array.NewInt64Builder(mem)
+       defer idBldr.Release()
+       idBldr.AppendValues([]int64{1, 2}, nil)
+       idArr := idBldr.NewArray()
+       defer idArr.Release()
+
+       srcCols := append(make([]arrow.Array, 0, 4), idArr)
+       srcFields := append(make([]arrow.Field, 0, 4), arrow.Field{
+               Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false,
+       })
+       srcSchema := arrow.NewSchema(srcFields, nil)
+       src := array.NewRecordBatch(srcSchema, srcCols, 2)
+       defer src.Release()
+
+       enriched, err := appendNullRowIDColumn(mem, src)
+       require.NoError(t, err)
+       defer enriched.Release()
+
+       require.Equal(t, 1, src.Schema().NumFields(),
+               "appendNullRowIDColumn must not mutate source schema field 
count")
+       require.Equal(t, 1, len(src.Columns()),
+               "appendNullRowIDColumn must not mutate source columns")
+       require.Equal(t, 2, enriched.Schema().NumFields())
+       require.Equal(t, iceberg.RowIDColumnName, 
enriched.Schema().Field(1).Name)
+
+       fieldIDStr, ok := 
enriched.Schema().Field(1).Metadata.GetValue(ArrowParquetFieldIDKey)
+       require.True(t, ok, "appended _row_id field must carry %s metadata", 
ArrowParquetFieldIDKey)
+       require.Equal(t, "2147483540", fieldIDStr)
+}
+
+func buildIntPlusRowIDBatch(mem memory.Allocator, ids, rowIDs []int64) 
arrow.RecordBatch {
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: iceberg.RowIDColumnName, Type: 
arrow.PrimitiveTypes.Int64, Nullable: true},
+       }, nil)
+       idBldr := array.NewInt64Builder(mem)
+       defer idBldr.Release()
+       idBldr.AppendValues(ids, nil)
+       rowIDBldr := array.NewInt64Builder(mem)
+       defer rowIDBldr.Release()
+       rowIDBldr.AppendValues(rowIDs, nil)
+       idArr := idBldr.NewArray()
+       defer idArr.Release()
+       rowIDArr := rowIDBldr.NewArray()
+       defer rowIDArr.Release()
+
+       return array.NewRecordBatch(schema, []arrow.Array{idArr, rowIDArr}, 
int64(len(ids)))
+}
+
+func buildIntBatch(mem memory.Allocator, ids []int64) arrow.RecordBatch {
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+       }, nil)
+       idBldr := array.NewInt64Builder(mem)
+       defer idBldr.Release()
+       idBldr.AppendValues(ids, nil)
+       idArr := idBldr.NewArray()
+       defer idArr.Release()
+
+       return array.NewRecordBatch(schema, []arrow.Array{idArr}, 
int64(len(ids)))
+}
diff --git a/table/position_delta_writer_test.go 
b/table/position_delta_writer_test.go
new file mode 100644
index 00000000..f4289798
--- /dev/null
+++ b/table/position_delta_writer_test.go
@@ -0,0 +1,274 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestPositionDeltaWriter_ReinsertPreservesRowID(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+
+       tbl := newV3RowLineageTestTable(t)
+
+       // Append initial data: id=1, id=2 → _row_id 0, 1 in the initial file.
+       dataSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+       initial, err := array.TableFromJSON(mem, dataSchema, []string{
+               `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}]`,
+       })
+       require.NoError(t, err)
+       defer initial.Release()
+
+       tbl, err = tbl.Append(ctx, array.NewTableReader(initial, -1), nil)
+       require.NoError(t, err)
+
+       // Capture the initial data file so we can replace it via NewRewrite.
+       initialFiles, err := planFilesAll(ctx, tbl)
+       require.NoError(t, err)
+       require.Len(t, initialFiles, 1)
+       initialFile := initialFiles[0]
+
+       // Create a PositionDeltaWriter and stage:
+       //   - Reinsert(id=1, _row_id=0) — survivor with updated value.
+       //   - Reinsert(id=2, _row_id=1) — unchanged survivor.
+       //   - Insert(id=99) — fresh row, gets synthesized _row_id at read time.
+       w, err := table.NewPositionDeltaWriter(tbl)
+       require.NoError(t, err)
+
+       // Use _row_id values far outside any plausible synthesized first_row_id
+       // range so the assertions actually prove preservation: if the writer
+       // silently dropped _row_id and the reader fell back to synthesizing IDs
+       // from the new file's first_row_id, the survivors would *not* land on
+       // these high values. Picking small IDs (e.g. 0, 1) would be
+       // indistinguishable from a synthesized 0/1 in the rewritten file.
+       const (
+               preservedRowID1 = int64(1) << 40
+               preservedRowID2 = preservedRowID1 + 1
+       )
+
+       reinsertBatch := buildReinsertBatch(mem,
+               []int64{1, 2},
+               []string{"a_updated", "b"},
+               []int64{preservedRowID1, preservedRowID2},
+       )
+       defer reinsertBatch.Release()
+       require.NoError(t, w.Reinsert(reinsertBatch))
+
+       insertBatch := buildInsertBatch(mem, []int64{99}, []string{"new"})
+       defer insertBatch.Release()
+       require.NoError(t, w.Insert(insertBatch))
+
+       dataFiles, err := w.Close(ctx)
+       require.NoError(t, err)
+       require.NotEmpty(t, dataFiles)
+
+       // Atomically replace the initial file with the writer's output. This is
+       // a library-internal rewrite (NewRewrite skips the explicit 
first_row_id
+       // requirement that AddDataFiles enforces for externally-written files).
+       tx := tbl.NewTransaction()
+       rw := tx.NewRewrite(nil)
+       rw.DeleteFile(initialFile)
+       for _, df := range dataFiles {
+               rw.AddDataFile(df)
+       }
+       require.NoError(t, rw.Commit(ctx))
+       tbl, err = tx.Commit(ctx)
+       require.NoError(t, err)
+
+       // Scan with lineage — survivors must keep their original _row_ids; the
+       // fresh row gets a new synthesized _row_id from the new file's 
first_row_id.
+       scan := tbl.Scan(table.WithRowLineage())
+       _, itr, err := scan.ToArrowRecords(ctx)
+       require.NoError(t, err)
+
+       rowsByID := map[int64]int64{}
+       for rec, err := range itr {
+               require.NoError(t, err)
+               idCol := 
rec.Column(rec.Schema().FieldIndices("id")[0]).(*array.Int64)
+               rowIDCol := 
rec.Column(rec.Schema().FieldIndices(iceberg.RowIDColumnName)[0]).(*array.Int64)
+               for i := 0; i < int(rec.NumRows()); i++ {
+                       rowsByID[idCol.Value(i)] = rowIDCol.Value(i)
+               }
+               rec.Release()
+       }
+
+       require.Contains(t, rowsByID, int64(1))
+       require.Contains(t, rowsByID, int64(2))
+       require.Contains(t, rowsByID, int64(99))
+
+       assert.Equal(t, preservedRowID1, rowsByID[1], "reinserted id=1 must 
preserve original _row_id")
+       assert.Equal(t, preservedRowID2, rowsByID[2], "reinserted id=2 must 
preserve original _row_id")
+       // The fresh row's synthesized _row_id must not collide with the 
preserved
+       // survivors. With first_row_id allocated from a fresh assign-counter, 
it
+       // will sit in a low range and the high preserved values are 
unreachable.
+       assert.NotEqual(t, preservedRowID1, rowsByID[99])
+       assert.NotEqual(t, preservedRowID2, rowsByID[99])
+}
+
+// buildReinsertBatch creates an Arrow record batch with id, data, and _row_id
+// columns suitable for PositionDeltaWriter.Reinsert.
+func buildReinsertBatch(mem memory.Allocator, ids []int64, data []string, 
rowIDs []int64) arrow.RecordBatch {
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: iceberg.RowIDColumnName, Type: 
arrow.PrimitiveTypes.Int64, Nullable: true},
+       }, nil)
+
+       idBldr := array.NewInt64Builder(mem)
+       defer idBldr.Release()
+       idBldr.AppendValues(ids, nil)
+
+       dataBldr := array.NewStringBuilder(mem)
+       defer dataBldr.Release()
+       for _, s := range data {
+               dataBldr.Append(s)
+       }
+
+       rowIDBldr := array.NewInt64Builder(mem)
+       defer rowIDBldr.Release()
+       rowIDBldr.AppendValues(rowIDs, nil)
+
+       idArr := idBldr.NewArray()
+       defer idArr.Release()
+       dataArr := dataBldr.NewArray()
+       defer dataArr.Release()
+       rowIDArr := rowIDBldr.NewArray()
+       defer rowIDArr.Release()
+
+       return array.NewRecordBatch(schema, []arrow.Array{idArr, dataArr, 
rowIDArr}, int64(len(ids)))
+}
+
+// buildInsertBatch creates an Arrow record batch with id and data columns.
+func buildInsertBatch(mem memory.Allocator, ids []int64, data []string) 
arrow.RecordBatch {
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       idBldr := array.NewInt64Builder(mem)
+       defer idBldr.Release()
+       idBldr.AppendValues(ids, nil)
+
+       dataBldr := array.NewStringBuilder(mem)
+       defer dataBldr.Release()
+       for _, s := range data {
+               dataBldr.Append(s)
+       }
+
+       idArr := idBldr.NewArray()
+       defer idArr.Release()
+       dataArr := dataBldr.NewArray()
+       defer dataArr.Release()
+
+       return array.NewRecordBatch(schema, []arrow.Array{idArr, dataArr}, 
int64(len(ids)))
+}
+
+// planFilesAll returns every data file currently visible in the table's
+// scan plan — used by tests that want to take a single existing data file
+// and feed it back into NewRewrite.
+func planFilesAll(ctx context.Context, tbl *table.Table) ([]iceberg.DataFile, 
error) {
+       tasks, err := tbl.Scan().PlanFiles(ctx)
+       if err != nil {
+               return nil, err
+       }
+       files := make([]iceberg.DataFile, 0, len(tasks))
+       for _, t := range tasks {
+               files = append(files, t.File)
+       }
+
+       return files, nil
+}
+
+func TestPositionDeltaWriter_RequiresV3(t *testing.T) {
+       tbl := newV3RowLineageTestTable(t)
+       // newV3RowLineageTestTable already creates a v3 table, so this should 
succeed.
+       w, err := table.NewPositionDeltaWriter(tbl)
+       require.NoError(t, err)
+       require.NotNil(t, w)
+}
+
+func TestPositionDeltaWriter_ReinsertRejectsNullRowID(t *testing.T) {
+       mem := memory.DefaultAllocator
+       tbl := newV3RowLineageTestTable(t)
+
+       w, err := table.NewPositionDeltaWriter(tbl)
+       require.NoError(t, err)
+
+       // Build a batch with a null _row_id.
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: iceberg.RowIDColumnName, Type: 
arrow.PrimitiveTypes.Int64, Nullable: true},
+       }, nil)
+
+       idBldr := array.NewInt64Builder(mem)
+       defer idBldr.Release()
+       idBldr.Append(1)
+       rowIDBldr := array.NewInt64Builder(mem)
+       defer rowIDBldr.Release()
+       rowIDBldr.AppendNull()
+
+       idArr := idBldr.NewArray()
+       defer idArr.Release()
+       rowIDArr := rowIDBldr.NewArray()
+       defer rowIDArr.Release()
+
+       batch := array.NewRecordBatch(schema, []arrow.Array{idArr, rowIDArr}, 1)
+       defer batch.Release()
+
+       err = w.Reinsert(batch)
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "must not contain null")
+}
+
+func TestPositionDeltaWriter_ReinsertRejectsMissingColumn(t *testing.T) {
+       mem := memory.DefaultAllocator
+       tbl := newV3RowLineageTestTable(t)
+
+       w, err := table.NewPositionDeltaWriter(tbl)
+       require.NoError(t, err)
+
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+       }, nil)
+
+       idBldr := array.NewInt64Builder(mem)
+       defer idBldr.Release()
+       idBldr.Append(1)
+       idArr := idBldr.NewArray()
+       defer idArr.Release()
+
+       batch := array.NewRecordBatch(schema, []arrow.Array{idArr}, 1)
+       defer batch.Release()
+
+       err = w.Reinsert(batch)
+       require.Error(t, err)
+       assert.Contains(t, err.Error(), "must contain _row_id")
+}

Reply via email to