This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new a59f2a27 fix(table): derive position-delete partition paths from table
schema (#1089)
a59f2a27 is described below
commit a59f2a276087562ea2d02dcfbb92e44e07d8dac0
Author: Andrei Tserakhau <[email protected]>
AuthorDate: Mon May 18 03:41:43 2026 +0200
fix(table): derive position-delete partition paths from table schema (#1089)
The partitioned position-delete writer was building partition paths from
`iceberg.PositionalDeleteSchema`, which only contains `file_path` and
`pos`.
That breaks for partition specs based on real table columns.
`PartitionSpec.PartitionType` silently skips fields whose source IDs are
not present in the schema, so those partitions collapse to an empty
struct and `PartitionToPath` returns `""`.
The downstream effect is pretty bad:
`writerFactory.getOrCreateRollingDataWriter` keys writers by that path,
so every partition ends up sharing the same rolling writer. Since the
writer keeps the first partition’s `partitionData`, the resulting delete
file is tagged as belonging to the first partition while containing
position deletes for files from multiple partitions.
This PR fixes that by deriving the partition path from
`metadata.CurrentSchema()`, matching what the data-side
`partitionedFanoutWriter` already does.
The actual file contents schema is unchanged: position-delete files
still use `PositionalDeleteSchema`, which is passed separately to the
writer factory via `withFactoryFileSchema`.
Fixes #1082.
---
table/pos_delete_partitioned_fanout_writer.go | 14 +-
table/pos_delete_partitioned_fanout_writer_test.go | 169 ++++++++++++++++++++-
2 files changed, 175 insertions(+), 8 deletions(-)
diff --git a/table/pos_delete_partitioned_fanout_writer.go
b/table/pos_delete_partitioned_fanout_writer.go
index fe54dba9..fbf3f476 100644
--- a/table/pos_delete_partitioned_fanout_writer.go
+++ b/table/pos_delete_partitioned_fanout_writer.go
@@ -34,19 +34,17 @@ import (
type positionDeletePartitionedFanoutWriter struct {
partitionContextByFilePath map[string]partitionContext
metadata Metadata
- schema *iceberg.Schema
itr iter.Seq2[arrow.RecordBatch, error]
writerFactory *writerFactory
concurrentDataFileWriter *concurrentDataFileWriter
}
// newPositionDeletePartitionedFanoutWriter creates a new
PartitionedFanoutWriter with the specified
-// partition specification, schema, and record iterator.
+// metadata, partition context, and record iterator.
func newPositionDeletePartitionedFanoutWriter(metadata Metadata,
concurrentWriter *concurrentDataFileWriter, partitionContextByFilePath
map[string]partitionContext, itr iter.Seq2[arrow.RecordBatch, error],
writerFactory *writerFactory) *positionDeletePartitionedFanoutWriter {
return &positionDeletePartitionedFanoutWriter{
partitionContextByFilePath: partitionContextByFilePath,
metadata: metadata,
- schema: iceberg.PositionalDeleteSchema,
itr: itr,
writerFactory: writerFactory,
concurrentDataFileWriter: concurrentWriter,
@@ -135,9 +133,15 @@ func (p *positionDeletePartitionedFanoutWriter)
partitionPath(partitionContext p
return "", fmt.Errorf("unexpected missing partition spec in
metadata for spec id %d", partitionContext.specID)
}
- data := newPartitionRecord(partitionContext.partitionData,
spec.PartitionType(p.schema))
+ // Resolve the schema at call time, not at construction. Unlike
partitionedFanoutWriter
+ // (which always writes against the current spec), this writer fans out
across historic
+ // specIDs from the target data files; callers must ensure those
specIDs resolve against
+ // fields still present in CurrentSchema. Dropped partition source
columns are assumed to
+ // be void-transformed per the Iceberg spec.
+ schema := p.metadata.CurrentSchema()
+ data := newPartitionRecord(partitionContext.partitionData,
spec.PartitionType(schema))
- return spec.PartitionToPath(data, p.schema), nil
+ return spec.PartitionToPath(data, schema), nil
}
func (p *positionDeletePartitionedFanoutWriter) yieldDataFiles(fanoutWorkers
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile)
iter.Seq2[iceberg.DataFile, error] {
diff --git a/table/pos_delete_partitioned_fanout_writer_test.go
b/table/pos_delete_partitioned_fanout_writer_test.go
index 0b379700..83b6fc42 100644
--- a/table/pos_delete_partitioned_fanout_writer_test.go
+++ b/table/pos_delete_partitioned_fanout_writer_test.go
@@ -215,7 +215,6 @@ func
TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *te
writer := &positionDeletePartitionedFanoutWriter{
metadata: latestMeta,
- schema: iceberg.PositionalDeleteSchema,
}
ctx := partitionContext{
@@ -231,7 +230,7 @@ func
TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *te
ctx.partitionData[1000],
ctx.partitionData[1001],
ctx.partitionData[1002],
- }, iceberg.PositionalDeleteSchema)
+ }, latestMeta.CurrentSchema())
// run multiple times to ensure it consistently
// produces the same output for the same input context
@@ -246,6 +245,169 @@ func
TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *te
require.Contains(t, seen, expectedPath)
}
+// TestPositionDeletePartitionedFanoutWriterPartitionPathUsesTableSchema is a
+// regression test for https://github.com/apache/iceberg-go/issues/1082.
+//
+// The partitioned position-delete writer used to derive partition paths from
+// iceberg.PositionalDeleteSchema. That schema only contains file_path/pos, so
+// any partition field whose source column lives on the table data schema was
+// silently dropped by PartitionSpec.PartitionType, collapsing distinct target
+// partitions into a single empty partition path. Downstream, the rolling
+// writer factory keys writers by that path, so position-delete rows for
+// different target partitions ended up in the same delete file.
+//
+// This test pins the fix: when the partition spec references a real table
+// data column, two partitionContexts with different partition values must
+// produce distinct, non-empty partition paths.
+func TestPositionDeletePartitionedFanoutWriterPartitionPathUsesTableSchema(t
*testing.T) {
+ t.Parallel()
+
+ // Table schema: id int + data string. Partition identity on `id`.
+ tableSchema := iceberg.NewSchema(
+ 0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+ FieldID: 1000,
+ SourceIDs: []int{1},
+ Name: "id",
+ Transform: iceberg.IdentityTransform{},
+ })
+
+ metadataBuilder, err := NewMetadataBuilder(2)
+ require.NoError(t, err)
+ require.NoError(t, metadataBuilder.AddSchema(tableSchema))
+ require.NoError(t, metadataBuilder.SetCurrentSchemaID(0))
+ require.NoError(t, metadataBuilder.AddPartitionSpec(&partitionSpec,
true))
+ require.NoError(t, metadataBuilder.SetDefaultSpecID(0))
+ sortOrder, err := NewSortOrder(1, []SortField{{
+ SourceIDs: []int{1},
+ Direction: SortASC,
+ Transform: iceberg.IdentityTransform{},
+ NullOrder: NullsFirst,
+ }})
+ require.NoError(t, err)
+ require.NoError(t, metadataBuilder.AddSortOrder(&sortOrder))
+ require.NoError(t, metadataBuilder.SetDefaultSortOrderID(1))
+
+ latestMeta, err := metadataBuilder.Build()
+ require.NoError(t, err)
+
+ writer := &positionDeletePartitionedFanoutWriter{
+ metadata: latestMeta,
+ }
+
+ ctxA := partitionContext{specID: 0, partitionData: map[int]any{1000:
int32(1)}}
+ ctxB := partitionContext{specID: 0, partitionData: map[int]any{1000:
int32(2)}}
+
+ pathA, err := writer.partitionPath(ctxA)
+ require.NoError(t, err)
+ pathB, err := writer.partitionPath(ctxB)
+ require.NoError(t, err)
+
+ // Distinct, non-empty, table-schema-derived paths. If the writer
reverts to using
+ // PositionalDeleteSchema, both calls collapse to "" and the
rolling-writer factory
+ // keys them onto the same writer — the bug from #1082.
+ assert.Equal(t, "id=1", pathA)
+ assert.Equal(t, "id=2", pathB)
+}
+
+// TestPositionDeletePartitionedFanoutWriterRoutesPartitionsIndependently is
the end-to-end
+// regression for https://github.com/apache/iceberg-go/issues/1082. It drives
processBatch
+// with two batches whose target data files live in distinct partitions and
asserts that
+// the writer factory produced two separate delete files, each tagged with the
right
+// partition. Pre-fix, both batches collapsed onto one writer (one output
file, wrong
+// partition metadata for the second batch's rows).
+func TestPositionDeletePartitionedFanoutWriterRoutesPartitionsIndependently(t
*testing.T) {
+ t.Parallel()
+
+ tableSchema := iceberg.NewSchema(
+ 0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+ FieldID: 1000,
+ SourceIDs: []int{1},
+ Name: "id",
+ Transform: iceberg.IdentityTransform{},
+ })
+
+ metadataBuilder, err := NewMetadataBuilder(2)
+ require.NoError(t, err)
+ require.NoError(t, metadataBuilder.AddSchema(tableSchema))
+ require.NoError(t, metadataBuilder.SetCurrentSchemaID(0))
+ require.NoError(t, metadataBuilder.AddPartitionSpec(&partitionSpec,
true))
+ require.NoError(t, metadataBuilder.SetDefaultSpecID(0))
+ sortOrder, err := NewSortOrder(1, []SortField{{
+ SourceIDs: []int{1},
+ Direction: SortASC,
+ Transform: iceberg.IdentityTransform{},
+ NullOrder: NullsFirst,
+ }})
+ require.NoError(t, err)
+ require.NoError(t, metadataBuilder.AddSortOrder(&sortOrder))
+ require.NoError(t, metadataBuilder.SetDefaultSortOrderID(1))
+ latestMeta, err := metadataBuilder.Build()
+ require.NoError(t, err)
+
+ const (
+ pathA = "file://t/id=1/a.parquet"
+ pathB = "file://t/id=2/b.parquet"
+ )
+ pathToCtx := map[string]partitionContext{
+ pathA: {partitionData: map[int]any{1000: int32(1)}, specID: 0},
+ pathB: {partitionData: map[int]any{1000: int32(2)}, specID: 0},
+ }
+
+ writeUUID := uuid.New()
+ cw := newConcurrentDataFileWriter(func(rootLocation string, fs
io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts
...dataFileWriterOption) (dataFileWriter, error) {
+ return newPositionDeleteWriter(rootLocation, fs, meta, props,
opts...)
+ })
+ factory, err := newWriterFactory(t.TempDir(), recordWritingArgs{
+ fs: &io.LocalFS{},
+ sc: PositionalDeleteArrowSchema,
+ writeUUID: &writeUUID,
+ counter: internal.Counter(0),
+ }, metadataBuilder, iceberg.PositionalDeleteSchema, 1024*1024,
+ withContentType(iceberg.EntryContentPosDeletes),
+ withFactoryFileSchema(iceberg.PositionalDeleteSchema))
+ require.NoError(t, err)
+ writer := newPositionDeletePartitionedFanoutWriter(latestMeta, cw,
pathToCtx, nil, factory)
+
+ dataFileCh := make(chan iceberg.DataFile, 4)
+
+ batchA := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+ fmt.Sprintf(`[{"file_path": %q, "pos": 0},{"file_path": %q,
"pos": 1}]`, pathA, pathA))
+ batchB := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema,
+ fmt.Sprintf(`[{"file_path": %q, "pos": 5}]`, pathB))
+
+ require.NoError(t, writer.processBatch(t.Context(), batchA, dataFileCh))
+ require.NoError(t, writer.processBatch(t.Context(), batchB, dataFileCh))
+ require.NoError(t, factory.closeAll())
+ close(dataFileCh)
+
+ var files []iceberg.DataFile
+ for df := range dataFileCh {
+ files = append(files, df)
+ }
+ require.Len(t, files, 2, "expected one delete file per target
partition; pre-fix collapse produces only 1")
+
+ byPart := make(map[int32]iceberg.DataFile, 2)
+ for _, df := range files {
+ part, ok := df.Partition()[1000].(int32)
+ require.True(t, ok, "DataFile.Partition()[1000] must be int32,
got %T", df.Partition()[1000])
+ byPart[part] = df
+ }
+ require.Contains(t, byPart, int32(1))
+ require.Contains(t, byPart, int32(2))
+ assert.Equal(t, int64(2), byPart[1].Count(), "id=1 delete file must
contain only the two rows targeting pathA")
+ assert.Equal(t, int64(1), byPart[2].Count(), "id=2 delete file must
contain only the one row targeting pathB")
+}
+
func TestPositionDeletePartitionedNoGoroutineLeak(t *testing.T) {
t.Parallel()
@@ -569,7 +731,8 @@ func TestPositionDeleteUnpartitionedSortOrderID(t
*testing.T) {
func TestEqualityDeleteUnpartitionedSortOrderID(t *testing.T) {
t.Parallel()
- delSchema := iceberg.NewSchema(0,
+ delSchema := iceberg.NewSchema(
+ 0,
iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
)