This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new a59f2a27 fix(table): derive position-delete partition paths from table 
schema (#1089)
a59f2a27 is described below

commit a59f2a276087562ea2d02dcfbb92e44e07d8dac0
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Mon May 18 03:41:43 2026 +0200

    fix(table): derive position-delete partition paths from table schema (#1089)
    
    The partitioned position-delete writer was building partition paths from
    `iceberg.PositionalDeleteSchema`, which only contains `file_path` and
    `pos`.
    
    That breaks for partition specs based on real table columns.
    `PartitionSpec.PartitionType` silently skips fields whose source IDs are
    not present in the schema, so those partitions collapse to an empty
    struct and `PartitionToPath` returns `""`.
    
    The downstream effect is pretty bad:
    `writerFactory.getOrCreateRollingDataWriter` keys writers by that path,
    so every partition ends up sharing the same rolling writer. Since the
    writer keeps the first partition’s `partitionData`, the resulting delete
    file is tagged as belonging to the first partition while containing
    position deletes for files from multiple partitions.
    
    This PR fixes that by deriving the partition path from
    `metadata.CurrentSchema()`, matching what the data-side
    `partitionedFanoutWriter` already does.
    
    The actual file contents schema is unchanged: position-delete files
    still use `PositionalDeleteSchema`, which is passed separately to the
    writer factory via `withFactoryFileSchema`.
    
    Fixes #1082.
---
 table/pos_delete_partitioned_fanout_writer.go      |  14 +-
 table/pos_delete_partitioned_fanout_writer_test.go | 169 ++++++++++++++++++++-
 2 files changed, 175 insertions(+), 8 deletions(-)

diff --git a/table/pos_delete_partitioned_fanout_writer.go 
b/table/pos_delete_partitioned_fanout_writer.go
index fe54dba9..fbf3f476 100644
--- a/table/pos_delete_partitioned_fanout_writer.go
+++ b/table/pos_delete_partitioned_fanout_writer.go
@@ -34,19 +34,17 @@ import (
 type positionDeletePartitionedFanoutWriter struct {
        partitionContextByFilePath map[string]partitionContext
        metadata                   Metadata
-       schema                     *iceberg.Schema
        itr                        iter.Seq2[arrow.RecordBatch, error]
        writerFactory              *writerFactory
        concurrentDataFileWriter   *concurrentDataFileWriter
 }
 
 // newPositionDeletePartitionedFanoutWriter creates a new 
PartitionedFanoutWriter with the specified
-// partition specification, schema, and record iterator.
+// metadata, partition context, and record iterator.
 func newPositionDeletePartitionedFanoutWriter(metadata Metadata, 
concurrentWriter *concurrentDataFileWriter, partitionContextByFilePath 
map[string]partitionContext, itr iter.Seq2[arrow.RecordBatch, error], 
writerFactory *writerFactory) *positionDeletePartitionedFanoutWriter {
        return &positionDeletePartitionedFanoutWriter{
                partitionContextByFilePath: partitionContextByFilePath,
                metadata:                   metadata,
-               schema:                     iceberg.PositionalDeleteSchema,
                itr:                        itr,
                writerFactory:              writerFactory,
                concurrentDataFileWriter:   concurrentWriter,
@@ -135,9 +133,15 @@ func (p *positionDeletePartitionedFanoutWriter) 
partitionPath(partitionContext p
                return "", fmt.Errorf("unexpected missing partition spec in 
metadata for spec id %d", partitionContext.specID)
        }
 
-       data := newPartitionRecord(partitionContext.partitionData, 
spec.PartitionType(p.schema))
+       // Resolve the schema at call time, not at construction. Unlike 
partitionedFanoutWriter
+       // (which always writes against the current spec), this writer fans out 
across historic
+       // specIDs from the target data files; callers must ensure those 
specIDs resolve against
+       // fields still present in CurrentSchema. Dropped partition source 
columns are assumed to
+       // be void-transformed per the Iceberg spec.
+       schema := p.metadata.CurrentSchema()
+       data := newPartitionRecord(partitionContext.partitionData, 
spec.PartitionType(schema))
 
-       return spec.PartitionToPath(data, p.schema), nil
+       return spec.PartitionToPath(data, schema), nil
 }
 
 func (p *positionDeletePartitionedFanoutWriter) yieldDataFiles(fanoutWorkers 
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile) 
iter.Seq2[iceberg.DataFile, error] {
diff --git a/table/pos_delete_partitioned_fanout_writer_test.go 
b/table/pos_delete_partitioned_fanout_writer_test.go
index 0b379700..83b6fc42 100644
--- a/table/pos_delete_partitioned_fanout_writer_test.go
+++ b/table/pos_delete_partitioned_fanout_writer_test.go
@@ -215,7 +215,6 @@ func 
TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *te
 
        writer := &positionDeletePartitionedFanoutWriter{
                metadata: latestMeta,
-               schema:   iceberg.PositionalDeleteSchema,
        }
 
        ctx := partitionContext{
@@ -231,7 +230,7 @@ func 
TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *te
                ctx.partitionData[1000],
                ctx.partitionData[1001],
                ctx.partitionData[1002],
-       }, iceberg.PositionalDeleteSchema)
+       }, latestMeta.CurrentSchema())
 
        // run multiple times to ensure it consistently
        // produces the same output for the same input context
@@ -246,6 +245,169 @@ func 
TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *te
        require.Contains(t, seen, expectedPath)
 }
 
+// TestPositionDeletePartitionedFanoutWriterPartitionPathUsesTableSchema is a
+// regression test for https://github.com/apache/iceberg-go/issues/1082.
+//
+// The partitioned position-delete writer used to derive partition paths from
+// iceberg.PositionalDeleteSchema. That schema only contains file_path/pos, so
+// any partition field whose source column lives on the table data schema was
+// silently dropped by PartitionSpec.PartitionType, collapsing distinct target
+// partitions into a single empty partition path. Downstream, the rolling
+// writer factory keys writers by that path, so position-delete rows for
+// different target partitions ended up in the same delete file.
+//
+// This test pins the fix: when the partition spec references a real table
+// data column, two partitionContexts with different partition values must
+// produce distinct, non-empty partition paths.
+func TestPositionDeletePartitionedFanoutWriterPartitionPathUsesTableSchema(t 
*testing.T) {
+       t.Parallel()
+
+       // Table schema: id int + data string. Partition identity on `id`.
+       tableSchema := iceberg.NewSchema(
+               0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+               FieldID:   1000,
+               SourceIDs: []int{1},
+               Name:      "id",
+               Transform: iceberg.IdentityTransform{},
+       })
+
+       metadataBuilder, err := NewMetadataBuilder(2)
+       require.NoError(t, err)
+       require.NoError(t, metadataBuilder.AddSchema(tableSchema))
+       require.NoError(t, metadataBuilder.SetCurrentSchemaID(0))
+       require.NoError(t, metadataBuilder.AddPartitionSpec(&partitionSpec, 
true))
+       require.NoError(t, metadataBuilder.SetDefaultSpecID(0))
+       sortOrder, err := NewSortOrder(1, []SortField{{
+               SourceIDs: []int{1},
+               Direction: SortASC,
+               Transform: iceberg.IdentityTransform{},
+               NullOrder: NullsFirst,
+       }})
+       require.NoError(t, err)
+       require.NoError(t, metadataBuilder.AddSortOrder(&sortOrder))
+       require.NoError(t, metadataBuilder.SetDefaultSortOrderID(1))
+
+       latestMeta, err := metadataBuilder.Build()
+       require.NoError(t, err)
+
+       writer := &positionDeletePartitionedFanoutWriter{
+               metadata: latestMeta,
+       }
+
+       ctxA := partitionContext{specID: 0, partitionData: map[int]any{1000: 
int32(1)}}
+       ctxB := partitionContext{specID: 0, partitionData: map[int]any{1000: 
int32(2)}}
+
+       pathA, err := writer.partitionPath(ctxA)
+       require.NoError(t, err)
+       pathB, err := writer.partitionPath(ctxB)
+       require.NoError(t, err)
+
+       // Distinct, non-empty, table-schema-derived paths. If the writer 
reverts to using
+       // PositionalDeleteSchema, both calls collapse to "" and the 
rolling-writer factory
+       // keys them onto the same writer — the bug from #1082.
+       assert.Equal(t, "id=1", pathA)
+       assert.Equal(t, "id=2", pathB)
+}
+
+// TestPositionDeletePartitionedFanoutWriterRoutesPartitionsIndependently is 
the end-to-end
+// regression for https://github.com/apache/iceberg-go/issues/1082. It drives 
processBatch
+// with two batches whose target data files live in distinct partitions and 
asserts that
+// the writer factory produced two separate delete files, each tagged with the 
right
+// partition. Pre-fix, both batches collapsed onto one writer (one output 
file, wrong
+// partition metadata for the second batch's rows).
+func TestPositionDeletePartitionedFanoutWriterRoutesPartitionsIndependently(t 
*testing.T) {
+       t.Parallel()
+
+       tableSchema := iceberg.NewSchema(
+               0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+
+       partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+               FieldID:   1000,
+               SourceIDs: []int{1},
+               Name:      "id",
+               Transform: iceberg.IdentityTransform{},
+       })
+
+       metadataBuilder, err := NewMetadataBuilder(2)
+       require.NoError(t, err)
+       require.NoError(t, metadataBuilder.AddSchema(tableSchema))
+       require.NoError(t, metadataBuilder.SetCurrentSchemaID(0))
+       require.NoError(t, metadataBuilder.AddPartitionSpec(&partitionSpec, 
true))
+       require.NoError(t, metadataBuilder.SetDefaultSpecID(0))
+       sortOrder, err := NewSortOrder(1, []SortField{{
+               SourceIDs: []int{1},
+               Direction: SortASC,
+               Transform: iceberg.IdentityTransform{},
+               NullOrder: NullsFirst,
+       }})
+       require.NoError(t, err)
+       require.NoError(t, metadataBuilder.AddSortOrder(&sortOrder))
+       require.NoError(t, metadataBuilder.SetDefaultSortOrderID(1))
+       latestMeta, err := metadataBuilder.Build()
+       require.NoError(t, err)
+
+       const (
+               pathA = "file://t/id=1/a.parquet"
+               pathB = "file://t/id=2/b.parquet"
+       )
+       pathToCtx := map[string]partitionContext{
+               pathA: {partitionData: map[int]any{1000: int32(1)}, specID: 0},
+               pathB: {partitionData: map[int]any{1000: int32(2)}, specID: 0},
+       }
+
+       writeUUID := uuid.New()
+       cw := newConcurrentDataFileWriter(func(rootLocation string, fs 
io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts 
...dataFileWriterOption) (dataFileWriter, error) {
+               return newPositionDeleteWriter(rootLocation, fs, meta, props, 
opts...)
+       })
+       factory, err := newWriterFactory(t.TempDir(), recordWritingArgs{
+               fs:        &io.LocalFS{},
+               sc:        PositionalDeleteArrowSchema,
+               writeUUID: &writeUUID,
+               counter:   internal.Counter(0),
+       }, metadataBuilder, iceberg.PositionalDeleteSchema, 1024*1024,
+               withContentType(iceberg.EntryContentPosDeletes),
+               withFactoryFileSchema(iceberg.PositionalDeleteSchema))
+       require.NoError(t, err)
+       writer := newPositionDeletePartitionedFanoutWriter(latestMeta, cw, 
pathToCtx, nil, factory)
+
+       dataFileCh := make(chan iceberg.DataFile, 4)
+
+       batchA := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+               fmt.Sprintf(`[{"file_path": %q, "pos": 0},{"file_path": %q, 
"pos": 1}]`, pathA, pathA))
+       batchB := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+               fmt.Sprintf(`[{"file_path": %q, "pos": 5}]`, pathB))
+
+       require.NoError(t, writer.processBatch(t.Context(), batchA, dataFileCh))
+       require.NoError(t, writer.processBatch(t.Context(), batchB, dataFileCh))
+       require.NoError(t, factory.closeAll())
+       close(dataFileCh)
+
+       var files []iceberg.DataFile
+       for df := range dataFileCh {
+               files = append(files, df)
+       }
+       require.Len(t, files, 2, "expected one delete file per target 
partition; pre-fix collapse produces only 1")
+
+       byPart := make(map[int32]iceberg.DataFile, 2)
+       for _, df := range files {
+               part, ok := df.Partition()[1000].(int32)
+               require.True(t, ok, "DataFile.Partition()[1000] must be int32, 
got %T", df.Partition()[1000])
+               byPart[part] = df
+       }
+       require.Contains(t, byPart, int32(1))
+       require.Contains(t, byPart, int32(2))
+       assert.Equal(t, int64(2), byPart[1].Count(), "id=1 delete file must 
contain only the two rows targeting pathA")
+       assert.Equal(t, int64(1), byPart[2].Count(), "id=2 delete file must 
contain only the one row targeting pathB")
+}
+
 func TestPositionDeletePartitionedNoGoroutineLeak(t *testing.T) {
        t.Parallel()
 
@@ -569,7 +731,8 @@ func TestPositionDeleteUnpartitionedSortOrderID(t 
*testing.T) {
 func TestEqualityDeleteUnpartitionedSortOrderID(t *testing.T) {
        t.Parallel()
 
-       delSchema := iceberg.NewSchema(0,
+       delSchema := iceberg.NewSchema(
+               0,
                iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
        )
 

Reply via email to