This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 055eee54 feat(table): warn when writing Parquet position-deletes on v3
(#1040)
055eee54 is described below
commit 055eee547baf6fc6e00d66a61479b22b471b92ca
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Sun May 10 20:12:54 2026 +0200
feat(table): warn when writing Parquet position-deletes on v3 (#1040)
Closes #1007.
V3 prefers deletion vectors over Parquet position-delete files
(apache/iceberg#12048). Java logs a warning when the legacy path is
used; iceberg-go silently writes the deprecated path. This emits a
single slog.Warn at the entry of positionDeleteRecordsToDataFiles when
the table is v3+, naming the table location so users can identify which
table is on the deprecated path.
Once the DV writer (#997) lands and flips the default on v3, this
becomes a backstop for users who explicitly opt back into Parquet
position-deletes via write.delete.format=position.
The warning fires at writer entry, before partition fanout, so a
partitioned write logs once total — not once per partition. The
partitioned regression test yields two batches (one per partition path)
because positionDeletePartitionedFanoutWriter.processBatch reads only
the first row's file_path and routes the entire batch to one partition,
so a single batch with two rows would not actually exercise the fanout.
Verified by temporarily moving the warn into processBatch and confirming
the test fails with count==2.
---
table/arrow_utils.go | 9 ++
table/pos_delete_v3_warn_test.go | 193 +++++++++++++++++++++++++++++++++++++++
2 files changed, 202 insertions(+)
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 583ef254..873eb727 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1607,6 +1607,15 @@ func positionDeleteRecordsToDataFiles(ctx
context.Context, rootLocation string,
}
}
+ // V3 and later prefer deletion vectors over Parquet position-delete
files;
+ // warn so users migrate when DV-write support lands. The check is `>=
3`
+ // rather than `== 3` so the warning carries forward to v4+ without
churn.
+ // See apache/iceberg#12048.
+ if latestMetadata.Version() >= 3 {
+ slog.Warn("writing Parquet position-delete file on a v3 table;
prefer deletion vectors",
+ "table_location", latestMetadata.Location())
+ }
+
if args.writeUUID == nil {
u := uuid.Must(uuid.NewRandom())
args.writeUUID = &u
diff --git a/table/pos_delete_v3_warn_test.go b/table/pos_delete_v3_warn_test.go
new file mode 100644
index 00000000..de3fa629
--- /dev/null
+++ b/table/pos_delete_v3_warn_test.go
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "bytes"
+ "context"
+ "log/slog"
+ "strings"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/io"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// captureSlog redirects the default slog logger to an in-memory text handler
+// for the duration of fn and returns the captured output. Note that
+// slog.SetDefault mutates process-global state: callers must rely on the
+// package's tests not running parallel slog-emitters during the window.
+// Sibling test TestOverwriteRowCountWarning uses the same pattern.
+func captureSlog(t *testing.T, fn func()) string {
+ t.Helper()
+ buf := &bytes.Buffer{}
+ prev := slog.Default()
+ slog.SetDefault(slog.New(slog.NewTextHandler(buf,
&slog.HandlerOptions{Level: slog.LevelDebug})))
+ t.Cleanup(func() { slog.SetDefault(prev) })
+ fn()
+
+ return buf.String()
+}
+
+// countWarnRecords counts WARN-level records in slog text-handler output.
+// Each record is one line, so we scan lines rather than coupling to the
+// exact key=value layout (which can shift if attributes are added).
+func countWarnRecords(out string) int {
+ n := 0
+ for line := range strings.SplitSeq(out, "\n") {
+ if strings.Contains(line, "level=WARN") {
+ n++
+ }
+ }
+
+ return n
+}
+
+// addUnsortedSortOrder gives a metadata builder the unsorted default order
+// so v2+ "default sort order id must be set" validation passes. Position-
+// delete writes in these tests do not depend on sort order.
+func addUnsortedSortOrder(t *testing.T, mb *MetadataBuilder) {
+ t.Helper()
+ so := UnsortedSortOrder
+ require.NoError(t, mb.AddSortOrder(&so))
+ require.NoError(t, mb.SetDefaultSortOrderID(0))
+}
+
+func newPositionDeleteUnpartitionedMetadata(t *testing.T, formatVersion int)
*MetadataBuilder {
+ t.Helper()
+ mb, err := NewMetadataBuilder(formatVersion)
+ require.NoError(t, err)
+ require.NoError(t, mb.AddSchema(iceberg.PositionalDeleteSchema))
+ require.NoError(t, mb.SetCurrentSchemaID(0))
+ require.NoError(t, mb.AddPartitionSpec(iceberg.UnpartitionedSpec, true))
+ require.NoError(t, mb.SetDefaultSpecID(0))
+ require.NoError(t, mb.SetLoc("file:///warn-test"))
+ addUnsortedSortOrder(t, mb)
+
+ return mb
+}
+
+func newPositionDeletePartitionedMetadata(t *testing.T, formatVersion int)
*MetadataBuilder {
+ t.Helper()
+ mb, err := NewMetadataBuilder(formatVersion)
+ require.NoError(t, err)
+ require.NoError(t, mb.AddSchema(iceberg.NewSchema(0,
+ append(iceberg.PositionalDeleteSchema.Fields(),
+ iceberg.NestedField{Name: "age", ID: 2, Type:
iceberg.Int64Type{}})...)))
+ require.NoError(t, mb.SetCurrentSchemaID(0))
+ partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+ SourceIDs: []int{2},
+ Name: "age_bucket",
+ Transform: iceberg.BucketTransform{NumBuckets: 2},
+ })
+ require.NoError(t, mb.AddPartitionSpec(&partitionSpec, true))
+ require.NoError(t, mb.SetDefaultSpecID(0))
+ require.NoError(t, mb.SetLoc("file:///warn-test-partitioned"))
+ addUnsortedSortOrder(t, mb)
+
+ return mb
+}
+
+// runPositionDeleteWrite drives positionDeleteRecordsToDataFiles to
completion.
+// The for-range over seq is load-bearing: it drains the writer's goroutines.
+// Do not "simplify" by discarding the iterator.
+func runPositionDeleteWrite(t *testing.T, mb *MetadataBuilder, partitions
map[string]partitionContext, itr func(yield func(arrow.RecordBatch, error)
bool)) {
+ t.Helper()
+ seq := positionDeleteRecordsToDataFiles(context.Background(),
t.TempDir(), mb, partitions,
+ recordWritingArgs{
+ sc: PositionalDeleteArrowSchema,
+ itr: itr,
+ fs: io.LocalFS{},
+ })
+ for _, err := range seq {
+ require.NoError(t, err)
+ }
+}
+
+// TestPositionDeleteV3Warning verifies the writer emits a single deduped
+// slog.Warn naming the table when position-deletes are written on a v3 table,
+// and stays silent on v2 where Parquet position-deletes are still canonical.
+//
+// The warning fires once at writer entry, before partition fanout, so a
+// partitioned write logs once total — not once per partition. The partitioned
+// subtest locks that contract: the issue specifically called out "deduped",
+// and a future change that moved the warning into per-partition writers would
+// silently regress this property without it.
+func TestPositionDeleteV3Warning(t *testing.T) {
+ emptyItr := func(yield func(arrow.RecordBatch, error) bool) {}
+
+ t.Run("v3 unpartitioned warns once with table location", func(t
*testing.T) {
+ out := captureSlog(t, func() {
+ runPositionDeleteWrite(t,
newPositionDeleteUnpartitionedMetadata(t, 3), nil, emptyItr)
+ })
+ assert.Equal(t, 1, countWarnRecords(out),
+ "expected exactly one WARN record, got: %s", out)
+ assert.Contains(t, out, "Parquet position-delete")
+ assert.Contains(t, out, "deletion vectors")
+ assert.Contains(t, out, "table_location=file:///warn-test",
+ "warning should name the table location")
+ })
+
+ t.Run("v2 does not warn", func(t *testing.T) {
+ out := captureSlog(t, func() {
+ runPositionDeleteWrite(t,
newPositionDeleteUnpartitionedMetadata(t, 2), nil, emptyItr)
+ })
+ assert.Equal(t, 0, countWarnRecords(out),
+ "v2 position-delete writes should not warn, got: %s",
out)
+ })
+
+ t.Run("v3 partitioned write warns exactly once across multiple
partitions", func(t *testing.T) {
+ // Two batches each routed to a distinct partition. The fanout
+ // writer's processBatch reads only the first row's file_path,
so a
+ // per-partition regression of the warn (e.g. moved into
processBatch
+ // or per-rolling-writer init) would emit 2 records here. One
batch
+ // with two rows would not exercise the fanout — it would all
go to
+ // one partition. Two batches force two processBatch invocations
+ // targeting two different partition contexts.
+ mb := newPositionDeletePartitionedMetadata(t, 3)
+ partitions := map[string]partitionContext{
+ "file://namespace/age_bucket=0/test.parquet": {
+ partitionData:
map[int]any{iceberg.PartitionDataIDStart: 0}, specID: 0,
+ },
+ "file://namespace/age_bucket=1/test.parquet": {
+ partitionData:
map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 0,
+ },
+ }
+ batch0 :=
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+ `[{"file_path":
"file://namespace/age_bucket=0/test.parquet", "pos": 0}]`)
+ batch1 :=
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+ `[{"file_path":
"file://namespace/age_bucket=1/test.parquet", "pos": 0}]`)
+ itr := func(yield func(arrow.RecordBatch, error) bool) {
+ batch0.Retain()
+ if !yield(batch0, nil) {
+ return
+ }
+ batch1.Retain()
+ yield(batch1, nil)
+ }
+
+ out := captureSlog(t, func() {
+ runPositionDeleteWrite(t, mb, partitions, itr)
+ })
+ assert.Equal(t, 1, countWarnRecords(out),
+ "expected exactly one WARN record across multiple
partitions, got: %s", out)
+ })
+}