This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 055eee54 feat(table): warn when writing Parquet position-deletes on v3 
(#1040)
055eee54 is described below

commit 055eee547baf6fc6e00d66a61479b22b471b92ca
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Sun May 10 20:12:54 2026 +0200

    feat(table): warn when writing Parquet position-deletes on v3 (#1040)
    
    Closes #1007.
    
    V3 prefers deletion vectors over Parquet position-delete files
    (apache/iceberg#12048). Java logs a warning when the legacy path is
    used; iceberg-go silently writes the deprecated path. This emits a
    single slog.Warn at the entry of positionDeleteRecordsToDataFiles when
    the table is v3+, naming the table location so users can identify which
    table is on the deprecated path.
    
    Once the DV writer (#997) lands and flips the default on v3, this
    becomes a backstop for users who explicitly opt back into Parquet
    position-deletes via write.delete.format=position.
    
    The warning fires at writer entry, before partition fanout, so a
    partitioned write logs once total — not once per partition. The
    partitioned regression test yields two batches (one per partition path)
    because positionDeletePartitionedFanoutWriter.processBatch reads only
    the first row's file_path and routes the entire batch to one partition,
    so a single batch with two rows would not actually exercise the fanout.
    Verified by temporarily moving the warn into processBatch and confirming
    the test fails with count==2.
---
 table/arrow_utils.go             |   9 ++
 table/pos_delete_v3_warn_test.go | 193 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 202 insertions(+)

diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 583ef254..873eb727 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1607,6 +1607,15 @@ func positionDeleteRecordsToDataFiles(ctx 
context.Context, rootLocation string,
                }
        }
 
+       // V3 and later prefer deletion vectors over Parquet position-delete 
files;
+       // warn so users migrate when DV-write support lands. The check is `>= 
3`
+       // rather than `== 3` so the warning carries forward to v4+ without 
churn.
+       // See apache/iceberg#12048.
+       if latestMetadata.Version() >= 3 {
+               slog.Warn("writing Parquet position-delete file on a v3 table; 
prefer deletion vectors",
+                       "table_location", latestMetadata.Location())
+       }
+
        if args.writeUUID == nil {
                u := uuid.Must(uuid.NewRandom())
                args.writeUUID = &u
diff --git a/table/pos_delete_v3_warn_test.go b/table/pos_delete_v3_warn_test.go
new file mode 100644
index 00000000..de3fa629
--- /dev/null
+++ b/table/pos_delete_v3_warn_test.go
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "bytes"
+       "context"
+       "log/slog"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// captureSlog redirects the default slog logger to an in-memory text handler
+// for the duration of fn and returns the captured output. Note that
+// slog.SetDefault mutates process-global state: callers must rely on the
+// package's tests not running parallel slog-emitters during the window.
+// Sibling test TestOverwriteRowCountWarning uses the same pattern.
+func captureSlog(t *testing.T, fn func()) string {
+       t.Helper()
+       buf := &bytes.Buffer{}
+       prev := slog.Default()
+       slog.SetDefault(slog.New(slog.NewTextHandler(buf, 
&slog.HandlerOptions{Level: slog.LevelDebug})))
+       t.Cleanup(func() { slog.SetDefault(prev) })
+       fn()
+
+       return buf.String()
+}
+
+// countWarnRecords counts WARN-level records in slog text-handler output.
+// Each record is one line, so we scan lines rather than coupling to the
+// exact key=value layout (which can shift if attributes are added).
+func countWarnRecords(out string) int {
+       n := 0
+       for line := range strings.SplitSeq(out, "\n") {
+               if strings.Contains(line, "level=WARN") {
+                       n++
+               }
+       }
+
+       return n
+}
+
+// addUnsortedSortOrder gives a metadata builder the unsorted default order
+// so v2+ "default sort order id must be set" validation passes. Position-
+// delete writes in these tests do not depend on sort order.
+func addUnsortedSortOrder(t *testing.T, mb *MetadataBuilder) {
+       t.Helper()
+       so := UnsortedSortOrder
+       require.NoError(t, mb.AddSortOrder(&so))
+       require.NoError(t, mb.SetDefaultSortOrderID(0))
+}
+
+func newPositionDeleteUnpartitionedMetadata(t *testing.T, formatVersion int) 
*MetadataBuilder {
+       t.Helper()
+       mb, err := NewMetadataBuilder(formatVersion)
+       require.NoError(t, err)
+       require.NoError(t, mb.AddSchema(iceberg.PositionalDeleteSchema))
+       require.NoError(t, mb.SetCurrentSchemaID(0))
+       require.NoError(t, mb.AddPartitionSpec(iceberg.UnpartitionedSpec, true))
+       require.NoError(t, mb.SetDefaultSpecID(0))
+       require.NoError(t, mb.SetLoc("file:///warn-test"))
+       addUnsortedSortOrder(t, mb)
+
+       return mb
+}
+
+func newPositionDeletePartitionedMetadata(t *testing.T, formatVersion int) 
*MetadataBuilder {
+       t.Helper()
+       mb, err := NewMetadataBuilder(formatVersion)
+       require.NoError(t, err)
+       require.NoError(t, mb.AddSchema(iceberg.NewSchema(0,
+               append(iceberg.PositionalDeleteSchema.Fields(),
+                       iceberg.NestedField{Name: "age", ID: 2, Type: 
iceberg.Int64Type{}})...)))
+       require.NoError(t, mb.SetCurrentSchemaID(0))
+       partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+               SourceIDs: []int{2},
+               Name:      "age_bucket",
+               Transform: iceberg.BucketTransform{NumBuckets: 2},
+       })
+       require.NoError(t, mb.AddPartitionSpec(&partitionSpec, true))
+       require.NoError(t, mb.SetDefaultSpecID(0))
+       require.NoError(t, mb.SetLoc("file:///warn-test-partitioned"))
+       addUnsortedSortOrder(t, mb)
+
+       return mb
+}
+
+// runPositionDeleteWrite drives positionDeleteRecordsToDataFiles to 
completion.
+// The for-range over seq is load-bearing: it drains the writer's goroutines.
+// Do not "simplify" by discarding the iterator.
+func runPositionDeleteWrite(t *testing.T, mb *MetadataBuilder, partitions 
map[string]partitionContext, itr func(yield func(arrow.RecordBatch, error) 
bool)) {
+       t.Helper()
+       seq := positionDeleteRecordsToDataFiles(context.Background(), 
t.TempDir(), mb, partitions,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: itr,
+                       fs:  io.LocalFS{},
+               })
+       for _, err := range seq {
+               require.NoError(t, err)
+       }
+}
+
+// TestPositionDeleteV3Warning verifies the writer emits a single deduped
+// slog.Warn naming the table when position-deletes are written on a v3 table,
+// and stays silent on v2 where Parquet position-deletes are still canonical.
+//
+// The warning fires once at writer entry, before partition fanout, so a
+// partitioned write logs once total — not once per partition. The partitioned
+// subtest locks that contract: the issue specifically called out "deduped",
+// and a future change that moved the warning into per-partition writers would
+// silently regress this property without it.
+func TestPositionDeleteV3Warning(t *testing.T) {
+       emptyItr := func(yield func(arrow.RecordBatch, error) bool) {}
+
+       t.Run("v3 unpartitioned warns once with table location", func(t 
*testing.T) {
+               out := captureSlog(t, func() {
+                       runPositionDeleteWrite(t, 
newPositionDeleteUnpartitionedMetadata(t, 3), nil, emptyItr)
+               })
+               assert.Equal(t, 1, countWarnRecords(out),
+                       "expected exactly one WARN record, got: %s", out)
+               assert.Contains(t, out, "Parquet position-delete")
+               assert.Contains(t, out, "deletion vectors")
+               assert.Contains(t, out, "table_location=file:///warn-test",
+                       "warning should name the table location")
+       })
+
+       t.Run("v2 does not warn", func(t *testing.T) {
+               out := captureSlog(t, func() {
+                       runPositionDeleteWrite(t, 
newPositionDeleteUnpartitionedMetadata(t, 2), nil, emptyItr)
+               })
+               assert.Equal(t, 0, countWarnRecords(out),
+                       "v2 position-delete writes should not warn, got: %s", 
out)
+       })
+
+       t.Run("v3 partitioned write warns exactly once across multiple 
partitions", func(t *testing.T) {
+               // Two batches each routed to a distinct partition. The fanout
+               // writer's processBatch reads only the first row's file_path, 
so a
+               // per-partition regression of the warn (e.g. moved into 
processBatch
+               // or per-rolling-writer init) would emit 2 records here. One 
batch
+               // with two rows would not exercise the fanout — it would all 
go to
+               // one partition. Two batches force two processBatch invocations
+               // targeting two different partition contexts.
+               mb := newPositionDeletePartitionedMetadata(t, 3)
+               partitions := map[string]partitionContext{
+                       "file://namespace/age_bucket=0/test.parquet": {
+                               partitionData: 
map[int]any{iceberg.PartitionDataIDStart: 0}, specID: 0,
+                       },
+                       "file://namespace/age_bucket=1/test.parquet": {
+                               partitionData: 
map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 0,
+                       },
+               }
+               batch0 := 
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+                       `[{"file_path": 
"file://namespace/age_bucket=0/test.parquet", "pos": 0}]`)
+               batch1 := 
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+                       `[{"file_path": 
"file://namespace/age_bucket=1/test.parquet", "pos": 0}]`)
+               itr := func(yield func(arrow.RecordBatch, error) bool) {
+                       batch0.Retain()
+                       if !yield(batch0, nil) {
+                               return
+                       }
+                       batch1.Retain()
+                       yield(batch1, nil)
+               }
+
+               out := captureSlog(t, func() {
+                       runPositionDeleteWrite(t, mb, partitions, itr)
+               })
+               assert.Equal(t, 1, countWarnRecords(out),
+                       "expected exactly one WARN record across multiple 
partitions, got: %s", out)
+       })
+}

Reply via email to