This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new a7c49126 feat(table): wire DVWriter into position-delete producer with
property gating (#1113)
a7c49126 is described below
commit a7c491261d495dcb055033583db34695eabe81c2
Author: Tanmay Rauth <[email protected]>
AuthorDate: Tue May 26 22:21:50 2026 +0530
feat(table): wire DVWriter into position-delete producer with property
gating (#1113)
Wires DVWriter into the position-delete producer with property gating.
Adds write.delete.format table property (v2 default: position, v3
default: dv). When set to "dv", positionDeleteRecordsToDataFiles routes
through DVWriter instead of the Parquet position-delete writer. The
existing v3 warning now only fires when write.delete.format is
explicitly set to "position".
Part 2 of #997
Out of scope (tracked as follow-ups):
- In-snapshot DV merging when multiple writes target the same data file.
- v2 Parquet pos-delete → v3 DV upgrade path within a single snapshot.
- Full cross-client scan test lands in PR #3.
---
table/arrow_utils.go | 61 ++++++++-
table/dv_write_path_test.go | 286 +++++++++++++++++++++++++++++++++++++++
table/pos_delete_v3_warn_test.go | 44 +++---
3 files changed, 361 insertions(+), 30 deletions(-)
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 55507c92..9f4fcad2 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -40,6 +40,7 @@ import (
"github.com/apache/iceberg-go/config"
"github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table/dv"
tblutils "github.com/apache/iceberg-go/table/internal"
"github.com/geoarrow/geoarrow-go"
"github.com/google/uuid"
@@ -1664,10 +1665,18 @@ func positionDeleteRecordsToDataFiles(ctx
context.Context, rootLocation string,
}
}
+ // V3+ unpartitioned tables write deletion vectors via Puffin, matching
the
+ // Java reference implementation, which hardcodes DV for v3 and does not
+ // expose a property to opt out. DV+partitioned support is deferred to a
+ // follow-up; partitioned v3 writes fall through to the Parquet writer
+ // below and emit the deprecation warning.
+ if latestMetadata.Version() >= 3 &&
latestMetadata.PartitionSpec().IsUnpartitioned() {
+ return positionDeleteRecordsToDataFilesDV(ctx, rootLocation,
args)
+ }
+
// V3 and later prefer deletion vectors over Parquet position-delete
files;
- // warn so users migrate when DV-write support lands. The check is `>=
3`
- // rather than `== 3` so the warning carries forward to v4+ without
churn.
- // See apache/iceberg#12048.
+ // warn so users migrate. The check is `>= 3` rather than `== 3` so the
+ // warning carries forward to v4+ without churn. See
apache/iceberg#12048.
if latestMetadata.Version() >= 3 {
slog.Warn("writing Parquet position-delete file on a v3 table;
prefer deletion vectors",
"table_location", latestMetadata.Location())
@@ -1722,3 +1731,49 @@ func positionDeleteRecordsToDataFiles(ctx
context.Context, rootLocation string,
return partitionWriter.Write(ctx, workers)
}
+
+func positionDeleteRecordsToDataFilesDV(ctx context.Context, rootLocation
string, args recordWritingArgs) iter.Seq2[iceberg.DataFile, error] {
+ return func(yield func(iceberg.DataFile, error) bool) {
+ writer := dv.NewDVWriter(args.fs)
+
+ hasEntries := false
+ for batch, err := range args.itr {
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+
+ filePaths := batch.Column(0).(*array.String)
+ positions := batch.Column(1).(*array.Int64)
+
+ for i := range batch.NumRows() {
+ writer.Add(filePaths.Value(int(i)),
[]int64{positions.Value(int(i))})
+ hasEntries = true
+ }
+ }
+
+ if !hasEntries {
+ return
+ }
+
+ if args.writeUUID == nil {
+ u := uuid.Must(uuid.NewRandom())
+ args.writeUUID = &u
+ }
+ location := rootLocation + "/data/" +
fmt.Sprintf("00000-0-%s-deletes.puffin", *args.writeUUID)
+
+ dataFiles, err := writer.Flush(ctx, location)
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+
+ for _, df := range dataFiles {
+ if !yield(df, nil) {
+ return
+ }
+ }
+ }
+}
diff --git a/table/dv_write_path_test.go b/table/dv_write_path_test.go
new file mode 100644
index 00000000..cdd88d2c
--- /dev/null
+++ b/table/dv_write_path_test.go
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "errors"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table/dv"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestDVWritePathProducesReadableOutput(t *testing.T) {
+ mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+ fs := iceio.NewMemFS()
+
+ batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+ {"file_path": "s3://bucket/data/file-001.parquet", "pos": 1},
+ {"file_path": "s3://bucket/data/file-001.parquet", "pos": 3},
+ {"file_path": "s3://bucket/data/file-001.parquet", "pos": 5},
+ {"file_path": "s3://bucket/data/file-002.parquet", "pos": 0},
+ {"file_path": "s3://bucket/data/file-002.parquet", "pos": 10}
+ ]`)
+ itr := func(yield func(arrow.RecordBatch, error) bool) {
+ batch.Retain()
+ yield(batch, nil)
+ }
+
+ seq := positionDeleteRecordsToDataFiles(context.Background(),
"mem://test", mb, nil,
+ recordWritingArgs{
+ sc: PositionalDeleteArrowSchema,
+ itr: itr,
+ fs: fs,
+ })
+
+ var dataFiles []iceberg.DataFile
+ for df, err := range seq {
+ require.NoError(t, err)
+ dataFiles = append(dataFiles, df)
+ }
+
+ require.Len(t, dataFiles, 2)
+
+ df1 := dataFiles[0]
+ assert.Equal(t, iceberg.PuffinFile, df1.FileFormat())
+ assert.Equal(t, int64(3), df1.Count())
+ assert.Equal(t, "s3://bucket/data/file-001.parquet",
*df1.ReferencedDataFile())
+ assert.NotNil(t, df1.ContentOffset())
+ assert.NotNil(t, df1.ContentSizeInBytes())
+
+ df2 := dataFiles[1]
+ assert.Equal(t, iceberg.PuffinFile, df2.FileFormat())
+ assert.Equal(t, int64(2), df2.Count())
+ assert.Equal(t, "s3://bucket/data/file-002.parquet",
*df2.ReferencedDataFile())
+
+ bm1, err := dv.ReadDV(fs, df1)
+ require.NoError(t, err)
+ assert.Equal(t, int64(3), bm1.Cardinality())
+ assert.True(t, bm1.Contains(1))
+ assert.True(t, bm1.Contains(3))
+ assert.True(t, bm1.Contains(5))
+ assert.False(t, bm1.Contains(2))
+
+ bm2, err := dv.ReadDV(fs, df2)
+ require.NoError(t, err)
+ assert.Equal(t, int64(2), bm2.Cardinality())
+ assert.True(t, bm2.Contains(0))
+ assert.True(t, bm2.Contains(10))
+}
+
+func TestDVWritePathV2UsesParquetPositionDeletes(t *testing.T) {
+ mb := newPositionDeleteUnpartitionedMetadata(t, 2)
+
+ batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+ {"file_path": "s3://bucket/data/file-001.parquet", "pos": 1}
+ ]`)
+ itr := func(yield func(arrow.RecordBatch, error) bool) {
+ batch.Retain()
+ yield(batch, nil)
+ }
+
+ seq := positionDeleteRecordsToDataFiles(context.Background(),
t.TempDir(), mb, nil,
+ recordWritingArgs{
+ sc: PositionalDeleteArrowSchema,
+ itr: itr,
+ fs: iceio.LocalFS{},
+ })
+
+ var dataFiles []iceberg.DataFile
+ for df, err := range seq {
+ require.NoError(t, err)
+ dataFiles = append(dataFiles, df)
+ }
+
+ require.NotEmpty(t, dataFiles, "v2 position-delete write should produce
at least one DataFile")
+ assert.Equal(t, iceberg.ParquetFile, dataFiles[0].FileFormat(),
+ "v2 must use Parquet position-delete files, not Puffin DVs")
+}
+
+func TestDVWritePathV3PartitionedUsesParquetPositionDeletes(t *testing.T) {
+ // DV+partitioned support is deferred; v3 partitioned writes must still
+ // route through the Parquet position-delete writer.
+ mb := newPositionDeletePartitionedMetadata(t, 3)
+
+ const path = "file://namespace/age_bucket=1/test.parquet"
+ partitions := map[string]partitionContext{
+ path: {partitionData: map[int]any{iceberg.PartitionDataIDStart:
1}, specID: 0},
+ }
+ batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+ `[{"file_path": "`+path+`", "pos": 0}]`)
+ itr := func(yield func(arrow.RecordBatch, error) bool) {
+ batch.Retain()
+ yield(batch, nil)
+ }
+
+ seq := positionDeleteRecordsToDataFiles(context.Background(),
t.TempDir(), mb, partitions,
+ recordWritingArgs{
+ sc: PositionalDeleteArrowSchema,
+ itr: itr,
+ fs: iceio.LocalFS{},
+ })
+
+ var dataFiles []iceberg.DataFile
+ for df, err := range seq {
+ require.NoError(t, err)
+ dataFiles = append(dataFiles, df)
+ }
+
+ require.NotEmpty(t, dataFiles, "v3 partitioned write should still
produce Parquet position deletes")
+ assert.Equal(t, iceberg.ParquetFile, dataFiles[0].FileFormat(),
+ "v3 partitioned write must fall back to Parquet until
DV+partitioned support lands")
+}
+
+// TestDVWritePathCancelledContext verifies that when the iterator surfaces a
+// context cancellation error (e.g., the upstream scan honored a cancelled
+// ctx), the DV producer propagates it instead of silently producing partial
+// output.
+func TestDVWritePathCancelledContext(t *testing.T) {
+ mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+ fs := iceio.NewMemFS()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ itr := func(yield func(arrow.RecordBatch, error) bool) {
+ yield(nil, ctx.Err())
+ }
+
+ seq := positionDeleteRecordsToDataFiles(ctx, "mem://test", mb, nil,
+ recordWritingArgs{
+ sc: PositionalDeleteArrowSchema,
+ itr: itr,
+ fs: fs,
+ })
+
+ var sawErr bool
+ var dataFiles []iceberg.DataFile
+ for df, err := range seq {
+ if err != nil {
+ sawErr = true
+ assert.ErrorIs(t, err, context.Canceled)
+
+ continue
+ }
+ dataFiles = append(dataFiles, df)
+ }
+ assert.True(t, sawErr, "expected cancellation error from iterator to
propagate")
+ assert.Empty(t, dataFiles, "no DataFiles should be produced when the
iterator errors")
+}
+
+// TestDVWritePathPropagatesMidStreamError verifies that an error surfaced
+// after at least one good batch is propagated to the consumer rather than
+// being swallowed and producing a partial Puffin file.
+func TestDVWritePathPropagatesMidStreamError(t *testing.T) {
+ mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+ fs := iceio.NewMemFS()
+
+ good := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+ `[{"file_path": "s3://bucket/data/file-001.parquet", "pos":
1}]`)
+
+ wantErr := errors.New("simulated upstream failure")
+ itr := func(yield func(arrow.RecordBatch, error) bool) {
+ good.Retain()
+ if !yield(good, nil) {
+ return
+ }
+ yield(nil, wantErr)
+ }
+
+ seq := positionDeleteRecordsToDataFiles(context.Background(),
"mem://test", mb, nil,
+ recordWritingArgs{
+ sc: PositionalDeleteArrowSchema,
+ itr: itr,
+ fs: fs,
+ })
+
+ var sawErr bool
+ var dataFiles []iceberg.DataFile
+ for df, err := range seq {
+ if err != nil {
+ sawErr = true
+ assert.ErrorIs(t, err, wantErr)
+
+ continue
+ }
+ dataFiles = append(dataFiles, df)
+ }
+ assert.True(t, sawErr, "expected mid-stream error to propagate to the
consumer")
+ assert.Empty(t, dataFiles,
+ "DV producer must not flush a partial Puffin when the iterator
errors mid-stream")
+}
+
+// TestDVWritePathMergesMultiBatchSameFile verifies that positions for the
+// same data file arriving across multiple batches are merged into a single
+// DV blob (one DataFile, one bitmap with the union of positions).
+func TestDVWritePathMergesMultiBatchSameFile(t *testing.T) {
+ mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+ fs := iceio.NewMemFS()
+
+ const path = "s3://bucket/data/file-001.parquet"
+ batch1 := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+ {"file_path": "`+path+`", "pos": 1},
+ {"file_path": "`+path+`", "pos": 3}
+ ]`)
+ batch2 := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+ {"file_path": "`+path+`", "pos": 5},
+ {"file_path": "`+path+`", "pos": 7}
+ ]`)
+
+ itr := func(yield func(arrow.RecordBatch, error) bool) {
+ batch1.Retain()
+ if !yield(batch1, nil) {
+ return
+ }
+ batch2.Retain()
+ yield(batch2, nil)
+ }
+
+ seq := positionDeleteRecordsToDataFiles(context.Background(),
"mem://test", mb, nil,
+ recordWritingArgs{
+ sc: PositionalDeleteArrowSchema,
+ itr: itr,
+ fs: fs,
+ })
+
+ var dataFiles []iceberg.DataFile
+ for df, err := range seq {
+ require.NoError(t, err)
+ dataFiles = append(dataFiles, df)
+ }
+
+ require.Len(t, dataFiles, 1,
+ "two batches targeting the same file should merge into one DV
blob")
+ df := dataFiles[0]
+ assert.Equal(t, iceberg.PuffinFile, df.FileFormat())
+ assert.Equal(t, path, *df.ReferencedDataFile())
+ assert.Equal(t, int64(4), df.Count(),
+ "merged DV cardinality should be the union across batches")
+
+ bm, err := dv.ReadDV(fs, df)
+ require.NoError(t, err)
+ assert.Equal(t, int64(4), bm.Cardinality())
+ for _, pos := range []uint64{1, 3, 5, 7} {
+ assert.True(t, bm.Contains(pos), "merged bitmap should contain
pos %d", pos)
+ }
+}
diff --git a/table/pos_delete_v3_warn_test.go b/table/pos_delete_v3_warn_test.go
index de3fa629..4dfffe6a 100644
--- a/table/pos_delete_v3_warn_test.go
+++ b/table/pos_delete_v3_warn_test.go
@@ -123,45 +123,31 @@ func runPositionDeleteWrite(t *testing.T, mb
*MetadataBuilder, partitions map[st
}
// TestPositionDeleteV3Warning verifies the writer emits a single deduped
-// slog.Warn naming the table when position-deletes are written on a v3 table,
-// and stays silent on v2 where Parquet position-deletes are still canonical.
-//
-// The warning fires once at writer entry, before partition fanout, so a
-// partitioned write logs once total — not once per partition. The partitioned
-// subtest locks that contract: the issue specifically called out "deduped",
-// and a future change that moved the warning into per-partition writers would
-// silently regress this property without it.
+// slog.Warn naming the table when v3 position-deletes fall through to the
+// Parquet writer (currently: any partitioned v3 write, since DV+partitioned
+// support is deferred), and stays silent on v2 where Parquet position-
+// deletes are still canonical and on v3 unpartitioned writes that take the
+// DV path.
func TestPositionDeleteV3Warning(t *testing.T) {
emptyItr := func(yield func(arrow.RecordBatch, error) bool) {}
- t.Run("v3 unpartitioned warns once with table location", func(t
*testing.T) {
+ t.Run("v2 does not warn", func(t *testing.T) {
out := captureSlog(t, func() {
- runPositionDeleteWrite(t,
newPositionDeleteUnpartitionedMetadata(t, 3), nil, emptyItr)
+ runPositionDeleteWrite(t,
newPositionDeleteUnpartitionedMetadata(t, 2), nil, emptyItr)
})
- assert.Equal(t, 1, countWarnRecords(out),
- "expected exactly one WARN record, got: %s", out)
- assert.Contains(t, out, "Parquet position-delete")
- assert.Contains(t, out, "deletion vectors")
- assert.Contains(t, out, "table_location=file:///warn-test",
- "warning should name the table location")
+ assert.Equal(t, 0, countWarnRecords(out),
+ "v2 position-delete writes should not warn, got: %s",
out)
})
- t.Run("v2 does not warn", func(t *testing.T) {
+ t.Run("v3 unpartitioned uses DV path without warning", func(t
*testing.T) {
out := captureSlog(t, func() {
- runPositionDeleteWrite(t,
newPositionDeleteUnpartitionedMetadata(t, 2), nil, emptyItr)
+ runPositionDeleteWrite(t,
newPositionDeleteUnpartitionedMetadata(t, 3), nil, emptyItr)
})
assert.Equal(t, 0, countWarnRecords(out),
- "v2 position-delete writes should not warn, got: %s",
out)
+ "v3 unpartitioned writes should route through DV
without warning, got: %s", out)
})
- t.Run("v3 partitioned write warns exactly once across multiple
partitions", func(t *testing.T) {
- // Two batches each routed to a distinct partition. The fanout
- // writer's processBatch reads only the first row's file_path,
so a
- // per-partition regression of the warn (e.g. moved into
processBatch
- // or per-rolling-writer init) would emit 2 records here. One
batch
- // with two rows would not exercise the fanout — it would all
go to
- // one partition. Two batches force two processBatch invocations
- // targeting two different partition contexts.
+ t.Run("v3 partitioned write warns exactly once with table location",
func(t *testing.T) {
mb := newPositionDeletePartitionedMetadata(t, 3)
partitions := map[string]partitionContext{
"file://namespace/age_bucket=0/test.parquet": {
@@ -189,5 +175,9 @@ func TestPositionDeleteV3Warning(t *testing.T) {
})
assert.Equal(t, 1, countWarnRecords(out),
"expected exactly one WARN record across multiple
partitions, got: %s", out)
+ assert.Contains(t, out, "Parquet position-delete")
+ assert.Contains(t, out, "deletion vectors")
+ assert.Contains(t, out,
"table_location=file:///warn-test-partitioned",
+ "warning should name the table location")
})
}