This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new a7c49126 feat(table): wire DVWriter into position-delete producer with 
property gating (#1113)
a7c49126 is described below

commit a7c491261d495dcb055033583db34695eabe81c2
Author: Tanmay Rauth <[email protected]>
AuthorDate: Tue May 26 22:21:50 2026 +0530

    feat(table): wire DVWriter into position-delete producer with property 
gating (#1113)
    
    Wires DVWriter into the position-delete producer with property gating.
    
    Adds write.delete.format table property (v2 default: position, v3
    default: dv). When set to "dv", positionDeleteRecordsToDataFiles routes
    through DVWriter instead of the Parquet position-delete writer. The
    existing v3 warning now only fires when write.delete.format is
    explicitly set to "position".
    
      Part 2 of #997
    
      Out of scope (tracked as follow-ups):
    - In-snapshot DV merging when multiple writes target the same data file.
      - v2 Parquet pos-delete → v3 DV upgrade path within a single snapshot.
      - Full cross-client scan test lands in PR #3.
---
 table/arrow_utils.go             |  61 ++++++++-
 table/dv_write_path_test.go      | 286 +++++++++++++++++++++++++++++++++++++++
 table/pos_delete_v3_warn_test.go |  44 +++---
 3 files changed, 361 insertions(+), 30 deletions(-)

diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 55507c92..9f4fcad2 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -40,6 +40,7 @@ import (
        "github.com/apache/iceberg-go/config"
        "github.com/apache/iceberg-go/internal"
        iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table/dv"
        tblutils "github.com/apache/iceberg-go/table/internal"
        "github.com/geoarrow/geoarrow-go"
        "github.com/google/uuid"
@@ -1664,10 +1665,18 @@ func positionDeleteRecordsToDataFiles(ctx 
context.Context, rootLocation string,
                }
        }
 
+       // V3+ unpartitioned tables write deletion vectors via Puffin, matching 
the
+       // Java reference implementation, which hardcodes DV for v3 and does not
+       // expose a property to opt out. DV+partitioned support is deferred to a
+       // follow-up; partitioned v3 writes fall through to the Parquet writer
+       // below and emit the deprecation warning.
+       if latestMetadata.Version() >= 3 && 
latestMetadata.PartitionSpec().IsUnpartitioned() {
+               return positionDeleteRecordsToDataFilesDV(ctx, rootLocation, 
args)
+       }
+
        // V3 and later prefer deletion vectors over Parquet position-delete 
files;
-       // warn so users migrate when DV-write support lands. The check is `>= 
3`
-       // rather than `== 3` so the warning carries forward to v4+ without 
churn.
-       // See apache/iceberg#12048.
+       // warn so users migrate. The check is `>= 3` rather than `== 3` so the
+       // warning carries forward to v4+ without churn. See 
apache/iceberg#12048.
        if latestMetadata.Version() >= 3 {
                slog.Warn("writing Parquet position-delete file on a v3 table; 
prefer deletion vectors",
                        "table_location", latestMetadata.Location())
@@ -1722,3 +1731,49 @@ func positionDeleteRecordsToDataFiles(ctx 
context.Context, rootLocation string,
 
        return partitionWriter.Write(ctx, workers)
 }
+
+func positionDeleteRecordsToDataFilesDV(ctx context.Context, rootLocation 
string, args recordWritingArgs) iter.Seq2[iceberg.DataFile, error] {
+       return func(yield func(iceberg.DataFile, error) bool) {
+               writer := dv.NewDVWriter(args.fs)
+
+               hasEntries := false
+               for batch, err := range args.itr {
+                       if err != nil {
+                               yield(nil, err)
+
+                               return
+                       }
+
+                       filePaths := batch.Column(0).(*array.String)
+                       positions := batch.Column(1).(*array.Int64)
+
+                       for i := range batch.NumRows() {
+                               writer.Add(filePaths.Value(int(i)), 
[]int64{positions.Value(int(i))})
+                               hasEntries = true
+                       }
+               }
+
+               if !hasEntries {
+                       return
+               }
+
+               if args.writeUUID == nil {
+                       u := uuid.Must(uuid.NewRandom())
+                       args.writeUUID = &u
+               }
+               location := rootLocation + "/data/" + 
fmt.Sprintf("00000-0-%s-deletes.puffin", *args.writeUUID)
+
+               dataFiles, err := writer.Flush(ctx, location)
+               if err != nil {
+                       yield(nil, err)
+
+                       return
+               }
+
+               for _, df := range dataFiles {
+                       if !yield(df, nil) {
+                               return
+                       }
+               }
+       }
+}
diff --git a/table/dv_write_path_test.go b/table/dv_write_path_test.go
new file mode 100644
index 00000000..cdd88d2c
--- /dev/null
+++ b/table/dv_write_path_test.go
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table/dv"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestDVWritePathProducesReadableOutput(t *testing.T) {
+       mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+       fs := iceio.NewMemFS()
+
+       batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+               {"file_path": "s3://bucket/data/file-001.parquet", "pos": 1},
+               {"file_path": "s3://bucket/data/file-001.parquet", "pos": 3},
+               {"file_path": "s3://bucket/data/file-001.parquet", "pos": 5},
+               {"file_path": "s3://bucket/data/file-002.parquet", "pos": 0},
+               {"file_path": "s3://bucket/data/file-002.parquet", "pos": 10}
+       ]`)
+       itr := func(yield func(arrow.RecordBatch, error) bool) {
+               batch.Retain()
+               yield(batch, nil)
+       }
+
+       seq := positionDeleteRecordsToDataFiles(context.Background(), 
"mem://test", mb, nil,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: itr,
+                       fs:  fs,
+               })
+
+       var dataFiles []iceberg.DataFile
+       for df, err := range seq {
+               require.NoError(t, err)
+               dataFiles = append(dataFiles, df)
+       }
+
+       require.Len(t, dataFiles, 2)
+
+       df1 := dataFiles[0]
+       assert.Equal(t, iceberg.PuffinFile, df1.FileFormat())
+       assert.Equal(t, int64(3), df1.Count())
+       assert.Equal(t, "s3://bucket/data/file-001.parquet", 
*df1.ReferencedDataFile())
+       assert.NotNil(t, df1.ContentOffset())
+       assert.NotNil(t, df1.ContentSizeInBytes())
+
+       df2 := dataFiles[1]
+       assert.Equal(t, iceberg.PuffinFile, df2.FileFormat())
+       assert.Equal(t, int64(2), df2.Count())
+       assert.Equal(t, "s3://bucket/data/file-002.parquet", 
*df2.ReferencedDataFile())
+
+       bm1, err := dv.ReadDV(fs, df1)
+       require.NoError(t, err)
+       assert.Equal(t, int64(3), bm1.Cardinality())
+       assert.True(t, bm1.Contains(1))
+       assert.True(t, bm1.Contains(3))
+       assert.True(t, bm1.Contains(5))
+       assert.False(t, bm1.Contains(2))
+
+       bm2, err := dv.ReadDV(fs, df2)
+       require.NoError(t, err)
+       assert.Equal(t, int64(2), bm2.Cardinality())
+       assert.True(t, bm2.Contains(0))
+       assert.True(t, bm2.Contains(10))
+}
+
+func TestDVWritePathV2UsesParquetPositionDeletes(t *testing.T) {
+       mb := newPositionDeleteUnpartitionedMetadata(t, 2)
+
+       batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+               {"file_path": "s3://bucket/data/file-001.parquet", "pos": 1}
+       ]`)
+       itr := func(yield func(arrow.RecordBatch, error) bool) {
+               batch.Retain()
+               yield(batch, nil)
+       }
+
+       seq := positionDeleteRecordsToDataFiles(context.Background(), 
t.TempDir(), mb, nil,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: itr,
+                       fs:  iceio.LocalFS{},
+               })
+
+       var dataFiles []iceberg.DataFile
+       for df, err := range seq {
+               require.NoError(t, err)
+               dataFiles = append(dataFiles, df)
+       }
+
+       require.NotEmpty(t, dataFiles, "v2 position-delete write should produce 
at least one DataFile")
+       assert.Equal(t, iceberg.ParquetFile, dataFiles[0].FileFormat(),
+               "v2 must use Parquet position-delete files, not Puffin DVs")
+}
+
+func TestDVWritePathV3PartitionedUsesParquetPositionDeletes(t *testing.T) {
+       // DV+partitioned support is deferred; v3 partitioned writes must still
+       // route through the Parquet position-delete writer.
+       mb := newPositionDeletePartitionedMetadata(t, 3)
+
+       const path = "file://namespace/age_bucket=1/test.parquet"
+       partitions := map[string]partitionContext{
+               path: {partitionData: map[int]any{iceberg.PartitionDataIDStart: 
1}, specID: 0},
+       }
+       batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+               `[{"file_path": "`+path+`", "pos": 0}]`)
+       itr := func(yield func(arrow.RecordBatch, error) bool) {
+               batch.Retain()
+               yield(batch, nil)
+       }
+
+       seq := positionDeleteRecordsToDataFiles(context.Background(), 
t.TempDir(), mb, partitions,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: itr,
+                       fs:  iceio.LocalFS{},
+               })
+
+       var dataFiles []iceberg.DataFile
+       for df, err := range seq {
+               require.NoError(t, err)
+               dataFiles = append(dataFiles, df)
+       }
+
+       require.NotEmpty(t, dataFiles, "v3 partitioned write should still 
produce Parquet position deletes")
+       assert.Equal(t, iceberg.ParquetFile, dataFiles[0].FileFormat(),
+               "v3 partitioned write must fall back to Parquet until 
DV+partitioned support lands")
+}
+
+// TestDVWritePathCancelledContext verifies that when the iterator surfaces a
+// context cancellation error (e.g., the upstream scan honored a cancelled
+// ctx), the DV producer propagates it instead of silently producing partial
+// output.
+func TestDVWritePathCancelledContext(t *testing.T) {
+       mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+       fs := iceio.NewMemFS()
+
+       ctx, cancel := context.WithCancel(context.Background())
+       cancel()
+
+       itr := func(yield func(arrow.RecordBatch, error) bool) {
+               yield(nil, ctx.Err())
+       }
+
+       seq := positionDeleteRecordsToDataFiles(ctx, "mem://test", mb, nil,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: itr,
+                       fs:  fs,
+               })
+
+       var sawErr bool
+       var dataFiles []iceberg.DataFile
+       for df, err := range seq {
+               if err != nil {
+                       sawErr = true
+                       assert.ErrorIs(t, err, context.Canceled)
+
+                       continue
+               }
+               dataFiles = append(dataFiles, df)
+       }
+       assert.True(t, sawErr, "expected cancellation error from iterator to 
propagate")
+       assert.Empty(t, dataFiles, "no DataFiles should be produced when the 
iterator errors")
+}
+
+// TestDVWritePathPropagatesMidStreamError verifies that an error surfaced
+// after at least one good batch is propagated to the consumer rather than
+// being swallowed and producing a partial Puffin file.
+func TestDVWritePathPropagatesMidStreamError(t *testing.T) {
+       mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+       fs := iceio.NewMemFS()
+
+       good := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+               `[{"file_path": "s3://bucket/data/file-001.parquet", "pos": 
1}]`)
+
+       wantErr := errors.New("simulated upstream failure")
+       itr := func(yield func(arrow.RecordBatch, error) bool) {
+               good.Retain()
+               if !yield(good, nil) {
+                       return
+               }
+               yield(nil, wantErr)
+       }
+
+       seq := positionDeleteRecordsToDataFiles(context.Background(), 
"mem://test", mb, nil,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: itr,
+                       fs:  fs,
+               })
+
+       var sawErr bool
+       var dataFiles []iceberg.DataFile
+       for df, err := range seq {
+               if err != nil {
+                       sawErr = true
+                       assert.ErrorIs(t, err, wantErr)
+
+                       continue
+               }
+               dataFiles = append(dataFiles, df)
+       }
+       assert.True(t, sawErr, "expected mid-stream error to propagate to the 
consumer")
+       assert.Empty(t, dataFiles,
+               "DV producer must not flush a partial Puffin when the iterator 
errors mid-stream")
+}
+
+// TestDVWritePathMergesMultiBatchSameFile verifies that positions for the
+// same data file arriving across multiple batches are merged into a single
+// DV blob (one DataFile, one bitmap with the union of positions).
+func TestDVWritePathMergesMultiBatchSameFile(t *testing.T) {
+       mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+       fs := iceio.NewMemFS()
+
+       const path = "s3://bucket/data/file-001.parquet"
+       batch1 := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+               {"file_path": "`+path+`", "pos": 1},
+               {"file_path": "`+path+`", "pos": 3}
+       ]`)
+       batch2 := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+               {"file_path": "`+path+`", "pos": 5},
+               {"file_path": "`+path+`", "pos": 7}
+       ]`)
+
+       itr := func(yield func(arrow.RecordBatch, error) bool) {
+               batch1.Retain()
+               if !yield(batch1, nil) {
+                       return
+               }
+               batch2.Retain()
+               yield(batch2, nil)
+       }
+
+       seq := positionDeleteRecordsToDataFiles(context.Background(), 
"mem://test", mb, nil,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: itr,
+                       fs:  fs,
+               })
+
+       var dataFiles []iceberg.DataFile
+       for df, err := range seq {
+               require.NoError(t, err)
+               dataFiles = append(dataFiles, df)
+       }
+
+       require.Len(t, dataFiles, 1,
+               "two batches targeting the same file should merge into one DV 
blob")
+       df := dataFiles[0]
+       assert.Equal(t, iceberg.PuffinFile, df.FileFormat())
+       assert.Equal(t, path, *df.ReferencedDataFile())
+       assert.Equal(t, int64(4), df.Count(),
+               "merged DV cardinality should be the union across batches")
+
+       bm, err := dv.ReadDV(fs, df)
+       require.NoError(t, err)
+       assert.Equal(t, int64(4), bm.Cardinality())
+       for _, pos := range []uint64{1, 3, 5, 7} {
+               assert.True(t, bm.Contains(pos), "merged bitmap should contain 
pos %d", pos)
+       }
+}
diff --git a/table/pos_delete_v3_warn_test.go b/table/pos_delete_v3_warn_test.go
index de3fa629..4dfffe6a 100644
--- a/table/pos_delete_v3_warn_test.go
+++ b/table/pos_delete_v3_warn_test.go
@@ -123,45 +123,31 @@ func runPositionDeleteWrite(t *testing.T, mb 
*MetadataBuilder, partitions map[st
 }
 
 // TestPositionDeleteV3Warning verifies the writer emits a single deduped
-// slog.Warn naming the table when position-deletes are written on a v3 table,
-// and stays silent on v2 where Parquet position-deletes are still canonical.
-//
-// The warning fires once at writer entry, before partition fanout, so a
-// partitioned write logs once total — not once per partition. The partitioned
-// subtest locks that contract: the issue specifically called out "deduped",
-// and a future change that moved the warning into per-partition writers would
-// silently regress this property without it.
+// slog.Warn naming the table when v3 position-deletes fall through to the
+// Parquet writer (currently: any partitioned v3 write, since DV+partitioned
+// support is deferred), and stays silent on v2 where Parquet position-
+// deletes are still canonical and on v3 unpartitioned writes that take the
+// DV path.
 func TestPositionDeleteV3Warning(t *testing.T) {
        emptyItr := func(yield func(arrow.RecordBatch, error) bool) {}
 
-       t.Run("v3 unpartitioned warns once with table location", func(t 
*testing.T) {
+       t.Run("v2 does not warn", func(t *testing.T) {
                out := captureSlog(t, func() {
-                       runPositionDeleteWrite(t, 
newPositionDeleteUnpartitionedMetadata(t, 3), nil, emptyItr)
+                       runPositionDeleteWrite(t, 
newPositionDeleteUnpartitionedMetadata(t, 2), nil, emptyItr)
                })
-               assert.Equal(t, 1, countWarnRecords(out),
-                       "expected exactly one WARN record, got: %s", out)
-               assert.Contains(t, out, "Parquet position-delete")
-               assert.Contains(t, out, "deletion vectors")
-               assert.Contains(t, out, "table_location=file:///warn-test",
-                       "warning should name the table location")
+               assert.Equal(t, 0, countWarnRecords(out),
+                       "v2 position-delete writes should not warn, got: %s", 
out)
        })
 
-       t.Run("v2 does not warn", func(t *testing.T) {
+       t.Run("v3 unpartitioned uses DV path without warning", func(t 
*testing.T) {
                out := captureSlog(t, func() {
-                       runPositionDeleteWrite(t, 
newPositionDeleteUnpartitionedMetadata(t, 2), nil, emptyItr)
+                       runPositionDeleteWrite(t, 
newPositionDeleteUnpartitionedMetadata(t, 3), nil, emptyItr)
                })
                assert.Equal(t, 0, countWarnRecords(out),
-                       "v2 position-delete writes should not warn, got: %s", 
out)
+                       "v3 unpartitioned writes should route through DV 
without warning, got: %s", out)
        })
 
-       t.Run("v3 partitioned write warns exactly once across multiple 
partitions", func(t *testing.T) {
-               // Two batches each routed to a distinct partition. The fanout
-               // writer's processBatch reads only the first row's file_path, 
so a
-               // per-partition regression of the warn (e.g. moved into 
processBatch
-               // or per-rolling-writer init) would emit 2 records here. One 
batch
-               // with two rows would not exercise the fanout — it would all 
go to
-               // one partition. Two batches force two processBatch invocations
-               // targeting two different partition contexts.
+       t.Run("v3 partitioned write warns exactly once with table location", 
func(t *testing.T) {
                mb := newPositionDeletePartitionedMetadata(t, 3)
                partitions := map[string]partitionContext{
                        "file://namespace/age_bucket=0/test.parquet": {
@@ -189,5 +175,9 @@ func TestPositionDeleteV3Warning(t *testing.T) {
                })
                assert.Equal(t, 1, countWarnRecords(out),
                        "expected exactly one WARN record across multiple 
partitions, got: %s", out)
+               assert.Contains(t, out, "Parquet position-delete")
+               assert.Contains(t, out, "deletion vectors")
+               assert.Contains(t, out, 
"table_location=file:///warn-test-partitioned",
+                       "warning should name the table location")
        })
 }

Reply via email to