This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 8f3c302f feat(table): add support for merge-on-read delete (#721)
8f3c302f is described below
commit 8f3c302fa525a0914c261f28315e6c5f1c75c138
Author: Alex Normand <[email protected]>
AuthorDate: Thu Feb 26 08:10:17 2026 -0800
feat(table): add support for merge-on-read delete (#721)

This adds support for merge-on-read deletes. It offers an alternative to
the copy-on-write to generate position delete files instead of rewriting
existing data files.
I'm not very confident in the elegance of my solution as I'm still new
to the internals of iceberg-go but the high-level is:
* Reuse the classification code from the existing delete implementation
to get the list of files of dropped files vs files with partial deletes
* Reuse the arrow scanning facilities to filter records from the data
files with partial deletes and emit position delete records with file
path and position.
* This is done by reusing the pipeline code and function and making the
first stage in the pipeline one to enrich the `RecordBatch` with the
file Path and position before the original position is lost due to
filtering.
* After filtering, the RecordBatch is projected to the position delete
schema (i.e. the original schema fields are dropped)
* Once we have filtered PositionDelete records that need to be emitted,
we reuse the record to file writing to generate position delete files.
## Testing
Integration tests were added to exercise the partitioned and
unpartitioned paths and the data is such that it's meant to actually
produce a position delete file rather than just go through the quick
path that drops an entire file because all records are gone.
## Indirect fixes
While working on this change and adding the testing for the partitioned
table deletions, I realized that the manifest evaluation when the filter
affected a field that was part of a partition spec was not built
correctly. It needed to use similar code as what's done during scanning
to build projections and build a manifest evaluator per partition id.
This is fixed in this PR but this technically also applies to
copy-on-write and overwrite paths so the fix goes beyond the scope of
the `merge-on-read`.
Fixes #487.
---
README.md | 22 +-
manifest.go | 57 +++-
table/arrow_scanner.go | 122 +++++++
table/arrow_scanner_test.go | 127 +++++++
table/arrow_utils.go | 111 ++++++-
table/arrow_utils_internal_test.go | 2 +-
table/evaluators_test.go | 45 +--
table/internal/interfaces.go | 1 +
table/internal/parquet_files.go | 2 +-
table/internal/parquet_files_test.go | 4 +-
table/internal/utils.go | 4 +-
table/partitioned_fanout_writer.go | 36 +-
table/partitioned_fanout_writer_test.go | 7 +-
table/pos_delete_partitioned_fanout_writer.go | 146 +++++++++
table/pos_delete_partitioned_fanout_writer_test.go | 364 +++++++++++++++++++++
table/rolling_data_writer.go | 48 +--
table/scanner.go | 20 +-
table/snapshot_producers.go | 132 +++++---
table/table_test.go | 151 ++++++++-
table/transaction.go | 253 ++++++++++++--
table/transaction_test.go | 114 +++++++
table/writer.go | 133 ++++++--
22 files changed, 1657 insertions(+), 244 deletions(-)
diff --git a/README.md b/README.md
index a714bfb2..c02480bb 100644
--- a/README.md
+++ b/README.md
@@ -143,17 +143,17 @@ make lint-install
As long as the FileSystem is supported and the Catalog supports altering
the table, the following tracks the current write support:
-| Operation | Supported |
-|:-----------------------:|:---------:|
-| Append Stream | X |
-| Append Data Files | X |
-| Rewrite Files | |
-| Rewrite manifests | |
-| Overwrite Files | X |
-| Copy-On-Write Delete | X |
-| Write Pos Delete | |
-| Write Eq Delete | |
-| Row Delta | |
+| Operation | Supported |
+|:---------------------|:---------:|
+| Append Stream | X |
+| Append Data Files | X |
+| Rewrite Files | |
+| Rewrite manifests | |
+| Overwrite Files | X |
+| Copy-On-Write Delete | X |
+| Write Pos Delete | X |
+| Write Eq Delete | |
+| Row Delta | |
### CLI Usage
diff --git a/manifest.go b/manifest.go
index 68a7234a..d8c0e79e 100644
--- a/manifest.go
+++ b/manifest.go
@@ -605,6 +605,9 @@ func NewManifestReader(file ManifestFile, in io.Reader)
(*ManifestReader, error)
if err != nil {
return nil, err
}
+ defer func() {
+ _ = dec.Close()
+ }()
metadata := dec.Metadata()
sc := dec.Schema()
@@ -832,13 +835,11 @@ func ReadManifestList(in io.Reader) ([]ManifestFile,
error) {
}
type writerImpl interface {
- content() ManifestContent
prepareEntry(*manifestEntry, int64) (ManifestEntry, error)
}
type v1writerImpl struct{}
-func (v1writerImpl) content() ManifestContent { return ManifestContentData }
func (v1writerImpl) prepareEntry(entry *manifestEntry, sn int64)
(ManifestEntry, error) {
if entry.Snapshot != nil && *entry.Snapshot != sn {
if entry.EntryStatus != EntryStatusEXISTING {
@@ -855,7 +856,6 @@ func (v1writerImpl) prepareEntry(entry *manifestEntry, sn
int64) (ManifestEntry,
type v2writerImpl struct{}
-func (v2writerImpl) content() ManifestContent { return ManifestContentData }
func (v2writerImpl) prepareEntry(entry *manifestEntry, snapshotID int64)
(ManifestEntry, error) {
if entry.SeqNum == nil {
if entry.Snapshot != nil && *entry.Snapshot != snapshotID {
@@ -872,7 +872,6 @@ func (v2writerImpl) prepareEntry(entry *manifestEntry,
snapshotID int64) (Manife
type v3writerImpl struct{}
-func (v3writerImpl) content() ManifestContent { return ManifestContentData }
func (v3writerImpl) prepareEntry(entry *manifestEntry, snapshotID int64)
(ManifestEntry, error) {
if entry.SeqNum == nil {
if entry.Snapshot != nil && *entry.Snapshot != snapshotID {
@@ -1048,8 +1047,9 @@ type ManifestWriter struct {
output io.Writer
writer *ocf.Encoder
- spec PartitionSpec
- schema *Schema
+ spec PartitionSpec
+ schema *Schema
+ content ManifestContent
partFieldNameToID map[string]int
partFieldIDToType map[int]avro.LogicalType
@@ -1067,7 +1067,15 @@ type ManifestWriter struct {
reusedEntry manifestEntry
}
-func NewManifestWriter(version int, out io.Writer, spec PartitionSpec, schema
*Schema, snapshotID int64) (*ManifestWriter, error) {
+type ManifestWriterOption func(w *ManifestWriter)
+
+func WithManifestWriterContent(content ManifestContent) ManifestWriterOption {
+ return func(w *ManifestWriter) {
+ w.content = content
+ }
+}
+
+func NewManifestWriter(version int, out io.Writer, spec PartitionSpec, schema
*Schema, snapshotID int64, opts ...ManifestWriterOption) (*ManifestWriter,
error) {
var impl writerImpl
switch version {
@@ -1098,6 +1106,7 @@ func NewManifestWriter(version int, out io.Writer, spec
PartitionSpec, schema *S
version: version,
output: out,
spec: spec,
+ content: ManifestContentData,
schema: schema,
partFieldNameToID: nameToID,
partFieldIDToType: idToType,
@@ -1106,6 +1115,13 @@ func NewManifestWriter(version int, out io.Writer, spec
PartitionSpec, schema *S
partitions: make([]map[int]any, 0),
}
+ for _, apply := range opts {
+ apply(w)
+ }
+ if version < 2 && w.content != ManifestContentData {
+ return nil, fmt.Errorf("unsupported content '%s' for format
version '%d'", w.content, version)
+ }
+
md, err := w.meta()
if err != nil {
return nil, err
@@ -1136,7 +1152,17 @@ func (w *ManifestWriter) Close() error {
return w.writer.Close()
}
-func (w *ManifestWriter) ToManifestFile(location string, length int64)
(ManifestFile, error) {
+type ManifestFileOption func(mf *manifestFile)
+
+// WithManifestFileContent overrides the ManifestContent of a new manifest
file with the provided value
+// Default: ManifestContentData
+func WithManifestFileContent(content ManifestContent) ManifestFileOption {
+ return func(mf *manifestFile) {
+ mf.Content = content
+ }
+}
+
+func (w *ManifestWriter) ToManifestFile(location string, length int64, opts
...ManifestFileOption) (ManifestFile, error) {
if err := w.Close(); err != nil {
return nil, err
}
@@ -1150,7 +1176,7 @@ func (w *ManifestWriter) ToManifestFile(location string,
length int64) (Manifest
return nil, err
}
- return &manifestFile{
+ mf := manifestFile{
version: w.version,
Path: location,
Len: length,
@@ -1167,7 +1193,12 @@ func (w *ManifestWriter) ToManifestFile(location string,
length int64) (Manifest
DeletedRowsCount: w.deletedRows,
PartitionList: &partitions,
Key: nil,
- }, nil
+ }
+ for _, apply := range opts {
+ apply(&mf)
+ }
+
+ return &mf, nil
}
func (w *ManifestWriter) meta() (map[string][]byte, error) {
@@ -1193,7 +1224,7 @@ func (w *ManifestWriter) meta() (map[string][]byte,
error) {
"partition-spec": specFieldsJson,
"partition-spec-id": []byte(strconv.Itoa(w.spec.ID())),
"format-version": []byte(strconv.Itoa(w.version)),
- "content": []byte(w.impl.content().String()),
+ "content": []byte(w.content.String()),
}, nil
}
@@ -1416,7 +1447,7 @@ func (m *ManifestListWriter) AddManifests(files
[]ManifestFile) error {
// if the sequence number is being assigned
here,
// then the manifest must be created by the
current
// operation.
- // to validate this, check the snapshot id
matches the current commmit
+ // to validate this, check the snapshot id
matches the current commit
if m.commitSnapshotID !=
wrapped.AddedSnapshotID {
return fmt.Errorf("found unassigned
sequence number for a manifest from snapshot %d != %d",
m.commitSnapshotID,
wrapped.AddedSnapshotID)
@@ -2315,5 +2346,5 @@ type ManifestEntry interface {
var PositionalDeleteSchema = NewSchema(0,
NestedField{ID: 2147483546, Type: PrimitiveTypes.String, Name:
"file_path", Required: true},
- NestedField{ID: 2147483545, Type: PrimitiveTypes.Int32, Name: "pos",
Required: true},
+ NestedField{ID: 2147483545, Type: PrimitiveTypes.Int64, Name: "pos",
Required: true},
)
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index 5e053388..6621f6a4 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -42,6 +42,8 @@ const (
ScanOptionArrowUseLargeTypes = "arrow.use_large_types"
)
+var PositionalDeleteArrowSchema, _ =
SchemaToArrowSchema(iceberg.PositionalDeleteSchema, nil, true, false)
+
type (
positionDeletes = []*arrow.Chunked
perFilePosDeletes = map[string]positionDeletes
@@ -189,6 +191,54 @@ func processPositionalDeletes(ctx context.Context, deletes
set[int64]) recProces
}
}
+// enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns
declared in the PositionalDeleteArrowSchema
+// so that during the pipeline filtering stages that sheds filtered out
records, we still have a way to
+// preserve the original position of those records.
+func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath
iceberg.DataFile) recProcessFn {
+ nextIdx, mem := int64(0), compute.GetAllocator(ctx)
+ filePathField, ok :=
PositionalDeleteArrowSchema.FieldsByName("file_path")
+ if !ok {
+ panic("position delete schema should have required field
'file_path'")
+ }
+ posField, ok := PositionalDeleteArrowSchema.FieldsByName("pos")
+ if !ok {
+ panic("position delete schema should have required field 'pos'")
+ }
+
+ return func(inData arrow.RecordBatch) (outData arrow.RecordBatch, err
error) {
+ defer inData.Release()
+
+ schema := inData.Schema()
+ fieldIdx := schema.NumFields()
+ schema, err = schema.AddField(fieldIdx, filePathField[0])
+ if err != nil {
+ return nil, err
+ }
+ schema, err = schema.AddField(fieldIdx+1, posField[0])
+ if err != nil {
+ return nil, err
+ }
+
+ filePathBuilder := array.NewStringBuilder(mem)
+ defer filePathBuilder.Release()
+ posBuilder := array.NewInt64Builder(mem)
+ defer posBuilder.Release()
+
+ startPos := nextIdx
+ nextIdx += inData.NumRows()
+
+ for i := startPos; i < nextIdx; i++ {
+ filePathBuilder.Append(filePath.FilePath())
+ posBuilder.Append(i)
+ }
+
+ columns := append(inData.Columns(), filePathBuilder.NewArray(),
posBuilder.NewArray())
+ outData = array.NewRecordBatch(schema, columns,
inData.NumRows())
+
+ return outData, err
+ }
+}
+
func filterRecords(ctx context.Context, recordFilter expr.Expression)
recProcessFn {
return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
defer rec.Release()
@@ -473,6 +523,78 @@ func (as *arrowScan) recordsFromTask(ctx context.Context,
task internal.Enumerat
return err
}
+func (as *arrowScan) producePosDeletesFromTask(ctx context.Context, task
internal.Enumerated[FileScanTask], positionalDeletes positionDeletes, out
chan<- enumeratedRecord) (err error) {
+ defer func() {
+ if err != nil {
+ out <- enumeratedRecord{Task: task, Err: err}
+ }
+ }()
+
+ var (
+ rdr internal.FileReader
+ iceSchema *iceberg.Schema
+ colIndices []int
+ filterFunc recProcessFn
+ dropFile bool
+ )
+
+ iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File)
+ if err != nil {
+ return err
+ }
+ defer iceinternal.CheckedClose(rdr, &err)
+
+ fields := append(iceSchema.Fields(),
iceberg.PositionalDeleteSchema.Fields()...)
+ enrichedIcebergSchema := iceberg.NewSchema(iceSchema.ID+1, fields...)
+
+ pipeline := make([]recProcessFn, 0, 2)
+ pipeline = append(pipeline, enrichRecordsWithPosDeleteFields(ctx,
task.Value.File))
+ if len(positionalDeletes) > 0 {
+ deletes := set[int64]{}
+ for _, chunk := range positionalDeletes {
+ for _, a := range chunk.Chunks() {
+ for _, v := range
a.(*array.Int64).Int64Values() {
+ deletes[v] = struct{}{}
+ }
+ }
+ }
+
+ pipeline = append(pipeline, processPositionalDeletes(ctx,
deletes))
+ }
+
+ filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema)
+ if err != nil {
+ return err
+ }
+
+ // Nothing to delete in a dropped file
+ if dropFile {
+ var emptySchema *arrow.Schema
+ emptySchema, err =
SchemaToArrowSchema(iceberg.PositionalDeleteSchema, nil, false,
as.useLargeTypes)
+ if err != nil {
+ return err
+ }
+ out <- enumeratedRecord{Task: task, Record:
internal.Enumerated[arrow.RecordBatch]{
+ Value: array.NewRecordBatch(emptySchema, nil, 0),
Index: 0, Last: true,
+ }}
+
+ return err
+ }
+
+ if filterFunc != nil {
+ pipeline = append(pipeline, filterFunc)
+ }
+ pipeline = append(pipeline, func(r arrow.RecordBatch)
(arrow.RecordBatch, error) {
+ defer r.Release()
+
+ return ToRequestedSchema(ctx, iceberg.PositionalDeleteSchema,
enrichedIcebergSchema, r, false, true, as.useLargeTypes)
+ })
+
+ err = as.processRecords(ctx, task, iceSchema, rdr, colIndices,
pipeline, out)
+
+ return err
+}
+
func createIterator(ctx context.Context, numWorkers uint, records <-chan
enumeratedRecord, deletesPerFile perFilePosDeletes, cancel
context.CancelCauseFunc, rowLimit int64) iter.Seq2[arrow.RecordBatch, error] {
isBeforeAny := func(batch enumeratedRecord) bool {
return batch.Task.Index < 0
diff --git a/table/arrow_scanner_test.go b/table/arrow_scanner_test.go
new file mode 100644
index 00000000..58e34322
--- /dev/null
+++ b/table/arrow_scanner_test.go
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "strconv"
+ "strings"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestEnrichRecordsWithPosDeleteFields(t *testing.T) {
+ testSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "first_name", Type: &arrow.StringType{}, Nullable:
false},
+ {Name: "last_name", Type: &arrow.StringType{}, Nullable: false},
+ {Name: "age", Type: &arrow.Int32Type{}, Nullable: true},
+ }, nil)
+ schemaWithPosDelete := arrow.NewSchema(append(testSchema.Fields(),
+ arrow.Field{Name: "file_path", Type: &arrow.StringType{},
Nullable: false, Metadata:
arrow.MetadataFrom(map[string]string{ArrowParquetFieldIDKey:
strconv.Itoa(2147483546)})},
+ arrow.Field{Name: "pos", Type: &arrow.Int64Type{}, Nullable:
false, Metadata: arrow.MetadataFrom(map[string]string{ArrowParquetFieldIDKey:
strconv.Itoa(2147483545)})},
+ ), nil)
+
+ testCases := []struct {
+ name string
+ inputBatches []arrow.RecordBatch
+ expectedOutputs []arrow.RecordBatch
+ }{
+ {
+ name: "one empty record batch",
+ inputBatches:
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(testSchema, `[]`)},
+ expectedOutputs:
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(schemaWithPosDelete, `[]`)},
+ },
+ {
+ name: "batch of one",
+ inputBatches:
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(testSchema, `[{"first_name":
"alan", "last_name": "gopher", "age": 7}]`)},
+ expectedOutputs:
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(schemaWithPosDelete,
`[{"first_name": "alan", "last_name": "gopher", "age": 7, "file_path":
"file://test_path.parquet", "pos": 0}]`)},
+ },
+ {
+ name: "batch of many",
+ inputBatches:
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(testSchema, `[{"first_name":
"alan", "last_name": "gopher", "age": 7},
+{"first_name": "steve", "last_name": "gopher", "age": 5},
+{"first_name": "dead", "last_name": "gopher", "age": 95}]`)},
+ expectedOutputs:
[]arrow.RecordBatch{mustLoadRecordBatchFromJSON(schemaWithPosDelete,
`[{"first_name": "alan", "last_name": "gopher", "age": 7, "file_path":
"file://test_path.parquet", "pos": 0},
+{"first_name": "steve", "last_name": "gopher", "age": 5, "file_path":
"file://test_path.parquet", "pos": 1},
+{"first_name": "dead", "last_name": "gopher", "age": 95, "file_path":
"file://test_path.parquet", "pos": 2}]`)},
+ },
+ {
+ name: "many batches",
+ inputBatches: []arrow.RecordBatch{
+ mustLoadRecordBatchFromJSON(testSchema,
`[{"first_name": "alan", "last_name": "gopher", "age": 7},
+{"first_name": "steve", "last_name": "gopher", "age": 5},
+{"first_name": "dead", "last_name": "gopher", "age": 95}]`),
+ mustLoadRecordBatchFromJSON(testSchema,
`[{"first_name": "matt", "last_name": "gopher", "age": 2},
+{"first_name": "alex", "last_name": "gopher", "age": 10}]`),
+ },
+ expectedOutputs: []arrow.RecordBatch{
+
mustLoadRecordBatchFromJSON(schemaWithPosDelete, `[{"first_name": "alan",
"last_name": "gopher", "age": 7, "file_path": "file://test_path.parquet",
"pos": 0},
+{"first_name": "steve", "last_name": "gopher", "age": 5, "file_path":
"file://test_path.parquet", "pos": 1},
+{"first_name": "dead", "last_name": "gopher", "age": 95, "file_path":
"file://test_path.parquet", "pos": 2}]`),
+
mustLoadRecordBatchFromJSON(schemaWithPosDelete, `[{"first_name": "matt",
"last_name": "gopher", "age": 2, "file_path": "file://test_path.parquet",
"pos": 3},
+{"first_name": "alex", "last_name": "gopher", "age": 10, "file_path":
"file://test_path.parquet", "pos": 4}]`),
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ defer func() {
+ for _, b := range tc.inputBatches {
+ b.Release()
+ }
+ }()
+
+ enrichFn :=
enrichRecordsWithPosDeleteFields(t.Context(), &mockDataFile{path:
"file://test_path.parquet"})
+ for i, b := range tc.inputBatches {
+ out, err := enrichFn(b)
+ require.NoError(t, err)
+ defer func() {
+ out.Release()
+ }()
+
+ assert.Equal(t, schemaWithPosDelete,
out.Schema())
+ assert.Equal(t, out.NumRows(), b.NumRows())
+
+ expectedOutputJSON, err :=
tc.expectedOutputs[i].MarshalJSON()
+ require.NoError(t, err)
+
+ outAsJSON, err := out.MarshalJSON()
+ require.NoError(t, err)
+
+ assert.Equal(t, string(expectedOutputJSON),
string(outAsJSON))
+ }
+ })
+ }
+}
+
+// mustLoadRecordBatchFromJSON is a convenience wrapper around
array.RecordFromJSON that returns the RecordBatch only
+// to make it friendlier to table-driven tests. In case of error parsing the
json content, it panics.
+func mustLoadRecordBatchFromJSON(schema *arrow.Schema, content string)
arrow.RecordBatch {
+ mem := memory.NewGoAllocator()
+ recordBatch, _, err := array.RecordFromJSON(mem, schema,
strings.NewReader(content))
+ if err != nil {
+ panic("failed to load test data from JSON: " + err.Error())
+ }
+
+ return recordBatch
+}
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index e599735a..66cdf29d 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1258,7 +1258,7 @@ func filesToDataFiles(ctx context.Context, fileIO
iceio.IO, meta *MetadataBuilde
}
}
- df := statistics.ToDataFile(currentSchema, currentSpec,
filePath, iceberg.ParquetFile, rdr.SourceFileSize(), partitionValues)
+ df := statistics.ToDataFile(currentSchema, currentSpec,
filePath, iceberg.ParquetFile, iceberg.EntryContentData, rdr.SourceFileSize(),
partitionValues)
if !yield(df, nil) {
return
}
@@ -1306,10 +1306,10 @@ func recordsToDataFiles(ctx context.Context,
rootLocation string, meta *Metadata
if r := recover(); r != nil {
var err error
switch e := r.(type) {
- case string:
- err = fmt.Errorf("error encountered during file
writing %s", e)
case error:
err = fmt.Errorf("error encountered during file
writing: %w", e)
+ default:
+ err = fmt.Errorf("error encountered during
position delete file writing: %v", e)
}
ret = func(yield func(iceberg.DataFile, error) bool) {
yield(nil, err)
@@ -1328,13 +1328,25 @@ func recordsToDataFiles(ctx context.Context,
rootLocation string, meta *Metadata
nameMapping := meta.CurrentSchema().NameMapping()
taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
if err != nil {
- panic(err)
+ return func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
}
currentSpec, err := meta.CurrentSpec()
- if err != nil || currentSpec == nil {
- panic(fmt.Errorf("%w: cannot write files without a current
spec", err))
+ if err != nil {
+ return func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
+ }
+ if currentSpec == nil {
+ return func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, fmt.Errorf("cannot write files without a
current spec: %w", err))
+ }
}
+ cw := newConcurrentDataFileWriter(func(rootLocation string, fs
iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts
...dataFileWriterOption) (dataFileWriter, error) {
+ return newDataFileWriter(rootLocation, fs, meta, props, opts...)
+ })
nextCount, stopCount := iter.Pull(args.counter)
if currentSpec.IsUnpartitioned() {
tasks := func(yield func(WriteTask) bool) {
@@ -1358,12 +1370,87 @@ func recordsToDataFiles(ctx context.Context,
rootLocation string, meta *Metadata
}
}
- return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
- } else {
- rollingDataWriters := NewWriterFactory(rootLocation, args,
meta, taskSchema, targetFileSize)
- partitionWriter := newPartitionedFanoutWriter(*currentSpec,
meta.CurrentSchema(), args.itr, &rollingDataWriters)
- workers := config.EnvConfig.MaxWorkers
+ return cw.writeFiles(ctx, rootLocation, args.fs, meta,
meta.props, nil, tasks)
+ }
+
+ factory := NewWriterFactory(rootLocation, args, meta, taskSchema,
targetFileSize)
+ partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw,
meta.CurrentSchema(), args.itr, &factory)
+ workers := config.EnvConfig.MaxWorkers
+
+ return partitionWriter.Write(ctx, workers)
+}
+
+type partitionContext struct {
+ partitionData map[int]any
+ specID int32
+}
+
+func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation
string, meta *MetadataBuilder, partitionContextByFilePath
map[string]partitionContext, args recordWritingArgs) (ret
iter.Seq2[iceberg.DataFile, error]) {
+ if args.counter == nil {
+ args.counter = internal.Counter(0)
+ }
+
+ defer func() {
+ if r := recover(); r != nil {
+ var err error
+ switch e := r.(type) {
+ case error:
+ err = fmt.Errorf("error encountered during
position delete file writing: %w", e)
+ default:
+ err = fmt.Errorf("error encountered during
position delete file writing: %v", e)
+ }
+ ret = func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
+ }
+ }()
- return partitionWriter.Write(ctx, workers)
+ latestMetadata, err := meta.Build()
+ if err != nil {
+ return func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
}
+
+ if args.writeUUID == nil {
+ u := uuid.Must(uuid.NewRandom())
+ args.writeUUID = &u
+ }
+
+ targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
+ WriteTargetFileSizeBytesDefault))
+
+ cw := newConcurrentDataFileWriter(func(rootLocation string, fs
iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts
...dataFileWriterOption) (dataFileWriter, error) {
+ return newPositionDeleteWriter(rootLocation, fs, meta, props,
opts...)
+ }, withSchemaSanitization(false))
+ nextCount, stopCount := iter.Pull(args.counter)
+ if latestMetadata.PartitionSpec().IsUnpartitioned() {
+ tasks := func(yield func(WriteTask) bool) {
+ defer stopCount()
+
+ fileCount := 0
+ for batch := range binPackRecords(args.itr,
defaultBinPackLookback, targetFileSize) {
+ cnt, _ := nextCount()
+ fileCount++
+ t := WriteTask{
+ Uuid: *args.writeUUID,
+ ID: cnt,
+ PartitionID:
iceberg.UnpartitionedSpec.ID(),
+ FileCount: fileCount,
+ Schema:
iceberg.PositionalDeleteSchema,
+ Batches: batch,
+ }
+ if !yield(t) {
+ return
+ }
+ }
+ }
+
+ return cw.writeFiles(ctx, rootLocation, args.fs, meta,
meta.props, nil, tasks)
+ }
+ writerFactory := NewWriterFactory(rootLocation, args, meta,
iceberg.PositionalDeleteSchema, targetFileSize)
+ partitionWriter :=
newPositionDeletePartitionedFanoutWriter(latestMetadata, cw,
partitionContextByFilePath, args.itr, &writerFactory)
+ workers := config.EnvConfig.MaxWorkers
+
+ return partitionWriter.Write(ctx, workers)
}
diff --git a/table/arrow_utils_internal_test.go
b/table/arrow_utils_internal_test.go
index d3229822..e463c748 100644
--- a/table/arrow_utils_internal_test.go
+++ b/table/arrow_utils_internal_test.go
@@ -200,7 +200,7 @@ func (suite *FileStatsMetricsSuite) getDataFile(meta
iceberg.Properties, writeSt
stats := format.DataFileStatsFromMeta(fileMeta, collector, mapping)
return stats.ToDataFile(tableMeta.CurrentSchema(),
tableMeta.PartitionSpec(), "fake-path.parquet",
- iceberg.ParquetFile, fileMeta.GetSourceFileSize(), nil)
+ iceberg.ParquetFile, iceberg.EntryContentData,
fileMeta.GetSourceFileSize(), nil)
}
func (suite *FileStatsMetricsSuite) TestRecordCount() {
diff --git a/table/evaluators_test.go b/table/evaluators_test.go
index cec68b43..3f86b27b 100644
--- a/table/evaluators_test.go
+++ b/table/evaluators_test.go
@@ -1071,6 +1071,7 @@ func (p *ProjectionTestSuite)
TestPartialProjectedFields() {
type mockDataFile struct {
path string
+ contentType iceberg.ManifestEntryContent
format iceberg.FileFormat
partition map[int]any
count int64
@@ -1085,28 +1086,28 @@ type mockDataFile struct {
specid int32
}
-func (*mockDataFile) ContentType() iceberg.ManifestEntryContent { return
iceberg.EntryContentData }
-func (m *mockDataFile) FilePath() string { return
m.path }
-func (m *mockDataFile) FileFormat() iceberg.FileFormat { return
m.format }
-func (m *mockDataFile) Partition() map[int]any { return
m.partition }
-func (m *mockDataFile) Count() int64 { return
m.count }
-func (m *mockDataFile) FileSizeBytes() int64 { return
m.filesize }
-func (m *mockDataFile) ColumnSizes() map[int]int64 { return
m.columnSizes }
-func (m *mockDataFile) ValueCounts() map[int]int64 { return
m.valueCounts }
-func (m *mockDataFile) NullValueCounts() map[int]int64 { return
m.nullCounts }
-func (m *mockDataFile) NaNValueCounts() map[int]int64 { return
m.nanCounts }
-func (*mockDataFile) DistinctValueCounts() map[int]int64 { return nil }
-func (m *mockDataFile) LowerBoundValues() map[int][]byte { return
m.lowerBounds }
-func (m *mockDataFile) UpperBoundValues() map[int][]byte { return
m.upperBounds }
-func (*mockDataFile) KeyMetadata() []byte { return nil }
-func (*mockDataFile) SplitOffsets() []int64 { return nil }
-func (*mockDataFile) EqualityFieldIDs() []int { return nil }
-func (*mockDataFile) SortOrderID() *int { return nil }
-func (m *mockDataFile) SpecID() int32 { return
m.specid }
-func (*mockDataFile) FirstRowID() *int64 { return nil }
-func (*mockDataFile) ReferencedDataFile() *string { return nil }
-func (*mockDataFile) ContentOffset() *int64 { return nil }
-func (*mockDataFile) ContentSizeInBytes() *int64 { return nil }
+func (m *mockDataFile) ContentType() iceberg.ManifestEntryContent { return
m.contentType }
+func (m *mockDataFile) FilePath() string { return
m.path }
+func (m *mockDataFile) FileFormat() iceberg.FileFormat { return
m.format }
+func (m *mockDataFile) Partition() map[int]any { return
m.partition }
+func (m *mockDataFile) Count() int64 { return
m.count }
+func (m *mockDataFile) FileSizeBytes() int64 { return
m.filesize }
+func (m *mockDataFile) ColumnSizes() map[int]int64 { return
m.columnSizes }
+func (m *mockDataFile) ValueCounts() map[int]int64 { return
m.valueCounts }
+func (m *mockDataFile) NullValueCounts() map[int]int64 { return
m.nullCounts }
+func (m *mockDataFile) NaNValueCounts() map[int]int64 { return
m.nanCounts }
+func (*mockDataFile) DistinctValueCounts() map[int]int64 { return nil
}
+func (m *mockDataFile) LowerBoundValues() map[int][]byte { return
m.lowerBounds }
+func (m *mockDataFile) UpperBoundValues() map[int][]byte { return
m.upperBounds }
+func (*mockDataFile) KeyMetadata() []byte { return nil
}
+func (*mockDataFile) SplitOffsets() []int64 { return nil
}
+func (*mockDataFile) EqualityFieldIDs() []int { return nil
}
+func (*mockDataFile) SortOrderID() *int { return nil
}
+func (m *mockDataFile) SpecID() int32 { return
m.specid }
+func (*mockDataFile) FirstRowID() *int64 { return nil
}
+func (*mockDataFile) ReferencedDataFile() *string { return nil
}
+func (*mockDataFile) ContentOffset() *int64 { return nil
}
+func (*mockDataFile) ContentSizeInBytes() *int64 { return nil
}
type InclusiveMetricsTestSuite struct {
suite.Suite
diff --git a/table/internal/interfaces.go b/table/internal/interfaces.go
index 0fdb8097..48830154 100644
--- a/table/internal/interfaces.go
+++ b/table/internal/interfaces.go
@@ -105,4 +105,5 @@ type WriteFileInfo struct {
FileName string
StatsCols map[int]StatisticsCollector
WriteProps any
+ Content iceberg.ManifestEntryContent
}
diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go
index 8e7cf252..2d0334c5 100644
--- a/table/internal/parquet_files.go
+++ b/table/internal/parquet_files.go
@@ -282,7 +282,7 @@ func (p parquetFormat) WriteDataFile(ctx context.Context,
fs iceio.WriteFileIO,
}
return p.DataFileStatsFromMeta(filemeta, info.StatsCols, colMapping).
- ToDataFile(info.FileSchema, info.Spec, info.FileName,
iceberg.ParquetFile, cntWriter.Count, partitionValues), nil
+ ToDataFile(info.FileSchema, info.Spec, info.FileName,
iceberg.ParquetFile, info.Content, cntWriter.Count, partitionValues), nil
}
type decAsIntAgg[T int32 | int64] struct {
diff --git a/table/internal/parquet_files_test.go
b/table/internal/parquet_files_test.go
index 85480da0..42adc22c 100644
--- a/table/internal/parquet_files_test.go
+++ b/table/internal/parquet_files_test.go
@@ -261,7 +261,7 @@ func TestMetricsPrimitiveTypes(t *testing.T) {
stats := format.DataFileStatsFromMeta(internal.Metadata(meta),
getCollector(), mapping)
df := stats.ToDataFile(tblMeta.CurrentSchema(),
tblMeta.PartitionSpec(), "fake-path.parquet",
- iceberg.ParquetFile, meta.GetSourceFileSize(), nil)
+ iceberg.ParquetFile, iceberg.EntryContentData,
meta.GetSourceFileSize(), nil)
assert.Len(t, df.ValueCounts(), 15)
assert.Len(t, df.NullValueCounts(), 15)
@@ -463,7 +463,7 @@ func TestDecimalPhysicalTypes(t *testing.T) {
require.NotNil(t, stats)
df := stats.ToDataFile(tableMeta.CurrentSchema(),
tableMeta.PartitionSpec(), "test.parquet",
- iceberg.ParquetFile, meta.GetSourceFileSize(),
nil)
+ iceberg.ParquetFile, iceberg.EntryContentData,
meta.GetSourceFileSize(), nil)
// Verify bounds are correctly extracted
require.Contains(t, df.LowerBoundValues(), 1)
diff --git a/table/internal/utils.go b/table/internal/utils.go
index 6227d746..4e9a5a05 100644
--- a/table/internal/utils.go
+++ b/table/internal/utils.go
@@ -234,7 +234,7 @@ func (d *DataFileStatistics) PartitionValue(field
iceberg.PartitionField, sc *ic
return lowerT.Val.Any()
}
-func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec
iceberg.PartitionSpec, path string, format iceberg.FileFormat, filesize int64,
partitionValues map[int]any) iceberg.DataFile {
+func (d *DataFileStatistics) ToDataFile(schema *iceberg.Schema, spec
iceberg.PartitionSpec, path string, format iceberg.FileFormat, content
iceberg.ManifestEntryContent, filesize int64, partitionValues map[int]any)
iceberg.DataFile {
var fieldIDToPartitionData map[int]any
fieldIDToLogicalType := make(map[int]avro.LogicalType)
fieldIDToFixedSize := make(map[int]int)
@@ -276,7 +276,7 @@ func (d *DataFileStatistics) ToDataFile(schema
*iceberg.Schema, spec iceberg.Par
}
}
- bldr, err := iceberg.NewDataFileBuilder(spec, iceberg.EntryContentData,
+ bldr, err := iceberg.NewDataFileBuilder(spec, content,
path, format, fieldIDToPartitionData, fieldIDToLogicalType,
fieldIDToFixedSize, d.RecordCount, filesize)
if err != nil {
panic(err)
diff --git a/table/partitioned_fanout_writer.go
b/table/partitioned_fanout_writer.go
index 624c5ad5..81479085 100644
--- a/table/partitioned_fanout_writer.go
+++ b/table/partitioned_fanout_writer.go
@@ -32,14 +32,15 @@ import (
"golang.org/x/sync/errgroup"
)
-// PartitionedFanoutWriter distributes Arrow records across multiple
partitions based on
+// partitionedFanoutWriter distributes Arrow records across multiple
partitions based on
// a partition specification, writing data to separate files for each
partition using
// a fanout pattern with configurable parallelism.
type partitionedFanoutWriter struct {
- partitionSpec iceberg.PartitionSpec
- schema *iceberg.Schema
- itr iter.Seq2[arrow.RecordBatch, error]
- writerFactory *writerFactory
+ partitionSpec iceberg.PartitionSpec
+ schema *iceberg.Schema
+ itr iter.Seq2[arrow.RecordBatch, error]
+ writerFactory *writerFactory
+ concurrentDataFileWriter *concurrentDataFileWriter
}
// PartitionInfo holds the row indices and partition values for a specific
partition,
@@ -52,12 +53,13 @@ type partitionInfo struct {
// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the
specified
// partition specification, schema, record iterator, and writerFactory.
-func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema
*iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error], writerFactory
*writerFactory) *partitionedFanoutWriter {
+func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec,
concurrentWriter *concurrentDataFileWriter, schema *iceberg.Schema, itr
iter.Seq2[arrow.RecordBatch, error], writerFactory *writerFactory)
*partitionedFanoutWriter {
return &partitionedFanoutWriter{
- partitionSpec: partitionSpec,
- schema: schema,
- itr: itr,
- writerFactory: writerFactory,
+ partitionSpec: partitionSpec,
+ schema: schema,
+ itr: itr,
+ writerFactory: writerFactory,
+ concurrentDataFileWriter: concurrentWriter,
}
}
@@ -73,7 +75,7 @@ func (p *partitionedFanoutWriter) Write(ctx context.Context,
workers int) iter.S
outputDataFilesCh := make(chan iceberg.DataFile, workers)
fanoutWorkers, ctx := errgroup.WithContext(ctx)
- p.startRecordFeeder(ctx, fanoutWorkers, inputRecordsCh)
+ startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh)
for range workers {
fanoutWorkers.Go(func() error {
@@ -84,11 +86,11 @@ func (p *partitionedFanoutWriter) Write(ctx
context.Context, workers int) iter.S
return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh)
}
-func (p *partitionedFanoutWriter) startRecordFeeder(ctx context.Context,
fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.RecordBatch) {
+func startRecordFeeder(ctx context.Context, itr iter.Seq2[arrow.RecordBatch,
error], fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.RecordBatch)
{
fanoutWorkers.Go(func() error {
defer close(inputRecordsCh)
- for record, err := range p.itr {
+ for record, err := range itr {
if err != nil {
return err
}
@@ -137,7 +139,7 @@ func (p *partitionedFanoutWriter) fanout(ctx
context.Context, inputRecordsCh <-c
}
partitionPath :=
p.partitionPath(val.partitionRec)
- rollingDataWriter, err :=
p.writerFactory.getOrCreateRollingDataWriter(ctx, partitionPath,
val.partitionValues, dataFilesChannel)
+ rollingDataWriter, err :=
p.writerFactory.getOrCreateRollingDataWriter(ctx, p.concurrentDataFileWriter,
partitionPath, val.partitionValues, dataFilesChannel)
if err != nil {
return err
}
@@ -152,13 +154,17 @@ func (p *partitionedFanoutWriter) fanout(ctx
context.Context, inputRecordsCh <-c
}
func (p *partitionedFanoutWriter) yieldDataFiles(fanoutWorkers
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile)
iter.Seq2[iceberg.DataFile, error] {
+ return yieldDataFiles(p.writerFactory, fanoutWorkers, outputDataFilesCh)
+}
+
+func yieldDataFiles(writerFactory *writerFactory, fanoutWorkers
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile)
iter.Seq2[iceberg.DataFile, error] {
// Use a channel to safely communicate the error from the goroutine
// to avoid a data race between writing err in the goroutine and
reading it in the iterator.
errCh := make(chan error, 1)
go func() {
defer close(outputDataFilesCh)
err := fanoutWorkers.Wait()
- err = errors.Join(err, p.writerFactory.closeAll())
+ err = errors.Join(err, writerFactory.closeAll())
errCh <- err
close(errCh)
}()
diff --git a/table/partitioned_fanout_writer_test.go
b/table/partitioned_fanout_writer_test.go
index 734e31c7..b2082c84 100644
--- a/table/partitioned_fanout_writer_test.go
+++ b/table/partitioned_fanout_writer_test.go
@@ -134,8 +134,11 @@ func (s *FanoutWriterTestSuite)
testTransformPartition(transform iceberg.Transfo
taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
s.Require().NoError(err)
- rollingDataWriters := NewWriterFactory(loc, args, metaBuilder,
icebergSchema, 1024*1024)
- partitionWriter := newPartitionedFanoutWriter(spec, taskSchema,
args.itr, &rollingDataWriters)
+ cw := newConcurrentDataFileWriter(func(rootLocation string, fs
iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts
...dataFileWriterOption) (dataFileWriter, error) {
+ return newDataFileWriter(rootLocation, fs, meta, props, opts...)
+ })
+ writerFactory := NewWriterFactory(loc, args, metaBuilder,
icebergSchema, 1024*1024)
+ partitionWriter := newPartitionedFanoutWriter(spec, cw, taskSchema,
args.itr, &writerFactory)
workers := config.EnvConfig.MaxWorkers
dataFiles := partitionWriter.Write(s.ctx, workers)
diff --git a/table/pos_delete_partitioned_fanout_writer.go
b/table/pos_delete_partitioned_fanout_writer.go
new file mode 100644
index 00000000..4b3daeaa
--- /dev/null
+++ b/table/pos_delete_partitioned_fanout_writer.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "fmt"
+ "iter"
+ "maps"
+ "slices"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/iceberg-go"
+ "golang.org/x/sync/errgroup"
+)
+
+// positionDeletePartitionedFanoutWriter distributes Arrow position delete
records across multiple partitions based on
+// a partition specification, writing data to separate delete files for each
partition using
+// a fanout pattern with configurable parallelism.
+type positionDeletePartitionedFanoutWriter struct {
+ partitionContextByFilePath map[string]partitionContext
+ metadata Metadata
+ schema *iceberg.Schema
+ itr iter.Seq2[arrow.RecordBatch, error]
+ writerFactory *writerFactory
+ concurrentDataFileWriter *concurrentDataFileWriter
+}
+
+// newPositionDeletePartitionedFanoutWriter creates a new
PartitionedFanoutWriter with the specified
+// partition specification, schema, and record iterator.
+func newPositionDeletePartitionedFanoutWriter(metadata Metadata,
concurrentWriter *concurrentDataFileWriter, partitionContextByFilePath
map[string]partitionContext, itr iter.Seq2[arrow.RecordBatch, error],
writerFactory *writerFactory) *positionDeletePartitionedFanoutWriter {
+ return &positionDeletePartitionedFanoutWriter{
+ partitionContextByFilePath: partitionContextByFilePath,
+ metadata: metadata,
+ schema: iceberg.PositionalDeleteSchema,
+ itr: itr,
+ writerFactory: writerFactory,
+ concurrentDataFileWriter: concurrentWriter,
+ }
+}
+
+// Write writes the Arrow records to the specified location using a fanout
pattern with
+// the specified number of workers. The returned iterator yields the data
files written
+// by the fanout process.
+func (p *positionDeletePartitionedFanoutWriter) Write(ctx context.Context,
workers int) iter.Seq2[iceberg.DataFile, error] {
+ inputRecordsCh := make(chan arrow.RecordBatch, workers)
+ outputDataFilesCh := make(chan iceberg.DataFile, workers)
+
+ fanoutWorkers, ctx := errgroup.WithContext(ctx)
+ startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh)
+
+ for range workers {
+ fanoutWorkers.Go(func() error {
+ return p.fanout(ctx, inputRecordsCh, outputDataFilesCh)
+ })
+ }
+
+ return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh)
+}
+
+func (p *positionDeletePartitionedFanoutWriter) fanout(ctx context.Context,
inputRecordsCh <-chan arrow.RecordBatch, dataFilesChannel chan<-
iceberg.DataFile) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return context.Cause(ctx)
+
+ case record, ok := <-inputRecordsCh:
+ if !ok {
+ return nil
+ }
+
+ err := p.processBatch(ctx, record, dataFilesChannel)
+ if err != nil {
+ return err
+ }
+ }
+ }
+}
+
+func (p *positionDeletePartitionedFanoutWriter) processBatch(ctx
context.Context, batch arrow.RecordBatch, dataFilesChannel chan<-
iceberg.DataFile) (err error) {
+ defer batch.Release()
+
+ select {
+ case <-ctx.Done():
+ return context.Cause(ctx)
+ default:
+ }
+
+ if batch.NumRows() == 0 {
+ return
+ }
+
+ columns := batch.Columns()
+ filePathArray := columns[0].(*array.String)
+ filePath := filePathArray.ValueStr(0)
+ partitionContext, ok := p.partitionContextByFilePath[filePath]
+ if !ok {
+ return fmt.Errorf("unexpected missing partition context for
path %s", filePath)
+ }
+
+ partitionPath, err := p.partitionPath(partitionContext)
+ if err != nil {
+ return err
+ }
+ rollingDataWriter, err :=
p.writerFactory.getOrCreateRollingDataWriter(ctx, p.concurrentDataFileWriter,
partitionPath, partitionContext.partitionData, dataFilesChannel)
+ if err != nil {
+ return err
+ }
+
+ err = rollingDataWriter.Add(batch)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (p *positionDeletePartitionedFanoutWriter) partitionPath(partitionContext
partitionContext) (string, error) {
+ data :=
partitionRecord(slices.Collect(maps.Values(partitionContext.partitionData)))
+ spec := p.metadata.PartitionSpecByID(int(partitionContext.specID))
+ if spec == nil {
+ return "", fmt.Errorf("unexpected missing partition spec in
metadata for spec id %d", partitionContext.specID)
+ }
+
+ return spec.PartitionToPath(data, p.schema), nil
+}
+
+func (p *positionDeletePartitionedFanoutWriter) yieldDataFiles(fanoutWorkers
*errgroup.Group, outputDataFilesCh chan iceberg.DataFile)
iter.Seq2[iceberg.DataFile, error] {
+ return yieldDataFiles(p.writerFactory, fanoutWorkers, outputDataFilesCh)
+}
diff --git a/table/pos_delete_partitioned_fanout_writer_test.go
b/table/pos_delete_partitioned_fanout_writer_test.go
new file mode 100644
index 00000000..f0f1ff4c
--- /dev/null
+++ b/table/pos_delete_partitioned_fanout_writer_test.go
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "maps"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/internal"
+ "github.com/apache/iceberg-go/io"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestPositionDeletePartitionedFanoutWriterProcessBatch(t *testing.T) {
+ t.Parallel()
+
+ testCases := []struct {
+ name string
+ pathToPartitionContext map[string]partitionContext
+ ctx context.Context
+ input arrow.RecordBatch
+ expectedDataFile iceberg.DataFile
+ expectedErr error
+ }{
+ {
+ name: "empty batch",
+ input:
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[]`),
+ expectedDataFile: nil,
+ },
+ {
+ name: "error on missing required path to
partition data",
+ input:
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[{"file_path":
"file://test_path.parquet", "pos": 0}]`),
+ expectedErr: errors.New("unexpected missing partition
context"),
+ },
+ {
+ name: "abort on context already done",
+ ctx:
onlyContext(context.WithDeadline(context.Background(), time.UnixMilli(0))),
+ input:
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[]`),
+ expectedErr: errors.New("context deadline exceeded"),
+ },
+ {
+ name: "error on partition context
pointing to unknown partition spec",
+ pathToPartitionContext:
map[string]partitionContext{"file://namespace/age_bucket=1/test.parquet":
{partitionData: map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 200}},
+ input:
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[{"file_path":
"file://namespace/age_bucket=1/test.parquet", "pos": 100}]`),
+ expectedErr: errors.New("unexpected missing
partition spec"),
+ },
+ {
+ name: "success",
+ pathToPartitionContext:
map[string]partitionContext{"file://namespace/age_bucket=1/test.parquet":
{partitionData: map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 0}},
+ input:
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[{"file_path":
"file://namespace/age_bucket=1/test.parquet", "pos": 100}]`),
+ expectedDataFile: &mockDataFile{columnSizes:
map[int]int64{2147483545: 88, 2147483546: 174}, format: iceberg.ParquetFile,
partition: map[int]any{iceberg.PartitionDataIDStart: 1}, count: 1, specid: 0,
contentType: iceberg.EntryContentPosDeletes},
+ },
+ // This test case illustrates how the
positionDeletePartitionedFanoutWriter does not validate that all records
+ // in a batch have the same file path. Doing so would be
prohibitive in the current implementation and
+ // the usage of the positionDeletePartitionedFanoutWriter is
expected to ensure batches all have the same
+ // file_path value.
+ {
+ name: "batch with records having
different file paths",
+ pathToPartitionContext:
map[string]partitionContext{"file://namespace/age_bucket=1/test.parquet":
{partitionData: map[int]any{iceberg.PartitionDataIDStart: 1}, specID: 0}},
+ input:
mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[{"file_path":
"file://namespace/age_bucket=1/test.parquet", "pos": 100}, {"file_path":
"file://namespace/age_bucket=0/test.parquet", "pos": 10}]`),
+ expectedDataFile: &mockDataFile{columnSizes:
map[int]int64{2147483545: 96, 2147483546: 187}, format: iceberg.ParquetFile,
partition: map[int]any{iceberg.PartitionDataIDStart: 1}, count: 2, specid: 0,
contentType: iceberg.EntryContentPosDeletes},
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ ctx := tc.ctx
+ if ctx == nil {
+ ctx = t.Context()
+ }
+
+ partitionSpec :=
iceberg.NewPartitionSpec(iceberg.PartitionField{
+ SourceID: 2,
+ Name: "age_bucket",
+ Transform: iceberg.BucketTransform{
+ NumBuckets: 2,
+ },
+ })
+
+ metadataBuilder, err := NewMetadataBuilder(2)
+ require.NoError(t, err)
+ err = metadataBuilder.AddSchema(iceberg.NewSchema(0,
append(iceberg.PositionalDeleteSchema.Fields(), iceberg.NestedField{Name:
"age", ID: 2, Type: iceberg.Int64Type{}})...))
+ require.NoError(t, err)
+ err = metadataBuilder.SetCurrentSchemaID(0)
+ require.NoError(t, err)
+ err = metadataBuilder.AddPartitionSpec(&partitionSpec,
true)
+ require.NoError(t, err)
+ err = metadataBuilder.SetDefaultSpecID(0)
+ require.NoError(t, err)
+ sortOrder, err := NewSortOrder(1, []SortField{{
+ SourceID: 2,
+ Direction: SortASC,
+ Transform: iceberg.IdentityTransform{},
+ NullOrder: NullsFirst,
+ }})
+ require.NoError(t, err)
+ err = metadataBuilder.AddSortOrder(&sortOrder)
+ require.NoError(t, err)
+ err = metadataBuilder.SetDefaultSortOrderID(1)
+ require.NoError(t, err)
+ latestMeta, err := metadataBuilder.Build()
+ require.NoError(t, err)
+
+ writeUUID := uuid.New()
+ cw := newConcurrentDataFileWriter(func(rootLocation
string, fs io.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties,
opts ...dataFileWriterOption) (dataFileWriter, error) {
+ return newPositionDeleteWriter(rootLocation,
fs, meta, props, opts...)
+ })
+ writerFactory := NewWriterFactory(t.TempDir(),
recordWritingArgs{
+ fs: &io.LocalFS{},
+ sc: PositionalDeleteArrowSchema,
+ writeUUID: &writeUUID,
+ counter: internal.Counter(0),
+ }, metadataBuilder, iceberg.PositionalDeleteSchema,
1024*1024)
+ writer :=
newPositionDeletePartitionedFanoutWriter(latestMeta, cw,
tc.pathToPartitionContext, nil, &writerFactory)
+ require.NoError(t, err)
+
+ dataFileCh := make(chan iceberg.DataFile, 10)
+ err = writer.processBatch(ctx, tc.input, dataFileCh)
+ if tc.expectedErr != nil {
+ require.ErrorContains(t, err,
tc.expectedErr.Error())
+
+ return
+ }
+ require.NoError(t, err)
+
+ err = writerFactory.closeAll()
+ require.NoError(t, err)
+
+ close(dataFileCh)
+
+ actualDataFile := <-dataFileCh
+ assert.NoError(t, equalsDataFile(tc.expectedDataFile,
actualDataFile, defaultPositionDeleteMatching...))
+ })
+ }
+}
+
+func onlyContext(ctx context.Context, _ func()) context.Context {
+ return ctx
+}
+
+// dataFileMatcher implements a custom "matcher" that compares DataFiles.
Because not all fields are always
+// important to validate, the dataFileMatcher can be passed in a number of
matching options to define
+// which of its fields are compared during matching.
+type dataFileMatcher struct {
+ matchers []fieldMatcher
+ formatters []formatter
+}
+
+type (
+ dataFileMatcherOption func(m *dataFileMatcher)
+ fieldMatcher func(expected iceberg.DataFile, actual
iceberg.DataFile) bool
+ formatter func(val iceberg.DataFile) string
+)
+
+func withFileFormatMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return expected.FileFormat() == actual.FileFormat()
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ return fmt.Sprintf("FileFormat: %s", val.FileFormat())
+ })
+ }
+}
+
+func withPathMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return expected.FilePath() == actual.FilePath()
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ return "FilePath: " + val.FilePath()
+ })
+ }
+}
+
+func withSpecIDMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return expected.SpecID() == actual.SpecID()
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ return fmt.Sprintf("SpecID: %d", val.SpecID())
+ })
+ }
+}
+
+func withPartitionMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return maps.Equal(expected.Partition(),
actual.Partition())
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ return fmt.Sprintf("Partition: %v", val.Partition())
+ })
+ }
+}
+
+func withContentTypeMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return expected.ContentType() == actual.ContentType()
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ return fmt.Sprintf("ContentType: %s", val.ContentType())
+ })
+ }
+}
+
+func withCountMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return expected.Count() == actual.Count()
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ return fmt.Sprintf("Count: %d", val.Count())
+ })
+ }
+}
+
+func withColumnSizesMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return maps.Equal(expected.ColumnSizes(),
actual.ColumnSizes())
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ return fmt.Sprintf("ColumnSizes: %v", val.ColumnSizes())
+ })
+ }
+}
+
+func withContentOffsetMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return comparePointerAndValue(expected.ContentOffset(),
actual.ContentOffset())
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ if val.ContentSizeInBytes() == nil {
+ return "ContentOffset: nil"
+ }
+
+ return fmt.Sprintf("ContentOffset: %d",
*val.ContentOffset())
+ })
+ }
+}
+
+func withContentSizeInBytesMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return
comparePointerAndValue(expected.ContentSizeInBytes(),
actual.ContentSizeInBytes())
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ if val.ContentSizeInBytes() == nil {
+ return "ContentSizeInBytes: nil"
+ }
+
+ return fmt.Sprintf("ContentSizeInBytes: %d",
*val.ContentSizeInBytes())
+ })
+ }
+}
+
+func withSortOrderIDMatching() dataFileMatcherOption {
+ return func(m *dataFileMatcher) {
+ m.matchers = append(m.matchers, func(expected iceberg.DataFile,
actual iceberg.DataFile) bool {
+ return comparePointerAndValue(expected.SortOrderID(),
actual.SortOrderID())
+ })
+ m.formatters = append(m.formatters, func(val iceberg.DataFile)
string {
+ if val.SortOrderID() == nil {
+ return "SortOrderID: nil"
+ }
+
+ return fmt.Sprintf("SortOrderID: %d",
*val.SortOrderID())
+ })
+ }
+}
+
+func comparePointerAndValue[T comparable](left *T, right *T) bool {
+ if left == nil && right == nil {
+ return true
+ }
+ if left == nil {
+ return false
+ }
+ if right == nil {
+ return false
+ }
+
+ return *left == *right
+}
+
+func (m *dataFileMatcher) Matches(expected iceberg.DataFile, actual
iceberg.DataFile) bool {
+ if expected == nil && actual == nil {
+ return true
+ }
+ if expected == nil {
+ return false
+ }
+ if actual == nil {
+ return false
+ }
+ for _, m := range m.matchers {
+ if !m(expected, actual) {
+ return false
+ }
+ }
+
+ return true
+}
+
+func (m *dataFileMatcher) Format(val iceberg.DataFile) string {
+ if val == nil {
+ return "nil"
+ }
+ values := make([]string, 0, len(m.formatters))
+ for _, format := range m.formatters {
+ values = append(values, format(val))
+ }
+
+ return fmt.Sprintf("{%s}", strings.Join(values, ", "))
+}
+
+// defaultPositionDeleteMatching is a convenience preset for the options we
want to match for position delete matching
+var defaultPositionDeleteMatching =
[]dataFileMatcherOption{withContentTypeMatching(), withColumnSizesMatching(),
withCountMatching(), withFileFormatMatching(), withSpecIDMatching(),
withPartitionMatching(), withCountMatching()}
+
+// equalsDataFile invokes a dataFileMatcher with the specified matching
options and compares two DataFile values.
+// Its return value is nil if both values are equal and an error with a
meaningful formatted message to help
+// show the mismatch in case they are not. This is meant to be used with
testify like:
+//
+// assert.NoError(t, equalsDataFile(expected, actual))
+func equalsDataFile(expected iceberg.DataFile, actual iceberg.DataFile, opts
...dataFileMatcherOption) (err error) {
+ matcher := &dataFileMatcher{}
+ for _, apply := range opts {
+ apply(matcher)
+ }
+ if !matcher.Matches(expected, actual) {
+ return fmt.Errorf("Expected: %s\nActual: %s",
matcher.Format(expected), matcher.Format(actual))
+ }
+
+ return nil
+}
diff --git a/table/rolling_data_writer.go b/table/rolling_data_writer.go
index 35d42ebc..e8950e2f 100644
--- a/table/rolling_data_writer.go
+++ b/table/rolling_data_writer.go
@@ -65,32 +65,34 @@ func NewWriterFactory(rootLocation string, args
recordWritingArgs, meta *Metadat
// them to data files when the target file size is reached, implementing a
rolling
// file strategy to manage file sizes.
type RollingDataWriter struct {
- partitionKey string
- partitionID int // unique ID for this partition
- fileCount atomic.Int64 // counter for files in this partition
- recordCh chan arrow.RecordBatch
- errorCh chan error
- factory *writerFactory
- partitionValues map[int]any
- ctx context.Context
- cancel context.CancelFunc
- wg sync.WaitGroup
+ partitionKey string
+ partitionID int // unique ID for this partition
+ fileCount atomic.Int64 // counter for files in this partition
+ recordCh chan arrow.RecordBatch
+ errorCh chan error
+ factory *writerFactory
+ partitionValues map[int]any
+ ctx context.Context
+ cancel context.CancelFunc
+ wg sync.WaitGroup
+ concurrentWriter *concurrentDataFileWriter
}
// NewRollingDataWriter creates a new RollingDataWriter for the specified
partition
// with the given partition values.
-func (w *writerFactory) NewRollingDataWriter(ctx context.Context, partition
string, partitionValues map[int]any, outputDataFilesCh chan<- iceberg.DataFile)
*RollingDataWriter {
+func (w *writerFactory) NewRollingDataWriter(ctx context.Context,
concurrentWriter *concurrentDataFileWriter, partition string, partitionValues
map[int]any, outputDataFilesCh chan<- iceberg.DataFile) *RollingDataWriter {
ctx, cancel := context.WithCancel(ctx)
partitionID := int(w.partitionIDCounter.Add(1) - 1)
writer := &RollingDataWriter{
- partitionKey: partition,
- partitionID: partitionID,
- recordCh: make(chan arrow.RecordBatch, 64),
- errorCh: make(chan error, 1),
- factory: w,
- partitionValues: partitionValues,
- ctx: ctx,
- cancel: cancel,
+ partitionKey: partition,
+ partitionID: partitionID,
+ recordCh: make(chan arrow.RecordBatch, 64),
+ errorCh: make(chan error, 1),
+ factory: w,
+ partitionValues: partitionValues,
+ ctx: ctx,
+ concurrentWriter: concurrentWriter,
+ cancel: cancel,
}
writer.wg.Add(1)
@@ -99,7 +101,7 @@ func (w *writerFactory) NewRollingDataWriter(ctx
context.Context, partition stri
return writer
}
-func (w *writerFactory) getOrCreateRollingDataWriter(ctx context.Context,
partition string, partitionValues map[int]any, outputDataFilesCh chan<-
iceberg.DataFile) (*RollingDataWriter, error) {
+func (w *writerFactory) getOrCreateRollingDataWriter(ctx context.Context,
concurrentWriter *concurrentDataFileWriter, partition string, partitionValues
map[int]any, outputDataFilesCh chan<- iceberg.DataFile) (*RollingDataWriter,
error) {
w.mu.Lock()
defer w.mu.Unlock()
@@ -111,7 +113,7 @@ func (w *writerFactory) getOrCreateRollingDataWriter(ctx
context.Context, partit
return nil, fmt.Errorf("invalid writer type for partition: %s",
partition)
}
- writer := w.NewRollingDataWriter(ctx, partition, partitionValues,
outputDataFilesCh)
+ writer := w.NewRollingDataWriter(ctx, concurrentWriter, partition,
partitionValues, outputDataFilesCh)
w.writers.Store(partition, writer)
return writer, nil
@@ -165,7 +167,7 @@ func (r *RollingDataWriter) flushToDataFile(batch
[]arrow.RecordBatch, outputDat
return nil
}
- task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
+ tasks := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
cnt, _ := r.factory.nextCount()
fileCount := int(r.fileCount.Add(1))
@@ -190,7 +192,7 @@ func (r *RollingDataWriter) flushToDataFile(batch
[]arrow.RecordBatch, outputDat
}
partitionMeta.props[WriteDataPathKey] =
parseDataLoc.JoinPath("data").JoinPath(r.partitionKey).String()
- outputDataFiles := writeFiles(r.ctx, r.factory.rootLocation,
r.factory.args.fs, &partitionMeta, r.partitionValues, task)
+ outputDataFiles := r.concurrentWriter.writeFiles(r.ctx,
r.factory.rootLocation, r.factory.args.fs, &partitionMeta, partitionMeta.props,
r.partitionValues, tasks)
for dataFile, err := range outputDataFiles {
if err != nil {
return err
diff --git a/table/scanner.go b/table/scanner.go
index 1bc9d005..bc3c0902 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -242,23 +242,31 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
}
func (scan *Scan) buildPartitionProjection(specID int)
(iceberg.BooleanExpression, error) {
- spec := scan.metadata.PartitionSpecByID(specID)
+ return buildPartitionProjection(specID, scan.metadata, scan.rowFilter,
scan.caseSensitive)
+}
+
+func buildPartitionProjection(specID int, meta Metadata, rowFilter
iceberg.BooleanExpression, caseSensitive bool) (iceberg.BooleanExpression,
error) {
+ spec := meta.PartitionSpecByID(specID)
if spec == nil {
return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound,
specID)
}
- project := newInclusiveProjection(scan.metadata.CurrentSchema(), *spec,
true)
+ project := newInclusiveProjection(meta.CurrentSchema(), *spec,
caseSensitive)
- return project(scan.rowFilter)
+ return project(rowFilter)
}
func (scan *Scan) buildManifestEvaluator(specID int)
(func(iceberg.ManifestFile) (bool, error), error) {
- spec := scan.metadata.PartitionSpecByID(specID)
+ return buildManifestEvaluator(specID, scan.metadata,
scan.partitionFilters, scan.caseSensitive)
+}
+
+func buildManifestEvaluator(specID int, metadata Metadata, partitionFilters
*keyDefaultMap[int, iceberg.BooleanExpression], caseSensitive bool)
(func(iceberg.ManifestFile) (bool, error), error) {
+ spec := metadata.PartitionSpecByID(specID)
if spec == nil {
return nil, fmt.Errorf("%w: id %d", ErrPartitionSpecNotFound,
specID)
}
- return newManifestEvaluator(*spec, scan.metadata.CurrentSchema(),
- scan.partitionFilters.Get(specID), scan.caseSensitive)
+ return newManifestEvaluator(*spec, metadata.CurrentSchema(),
+ partitionFilters.Get(specID), caseSensitive)
}
func (scan *Scan) buildPartitionEvaluator(specID int) (func(iceberg.DataFile)
(bool, error), error) {
diff --git a/table/snapshot_producers.go b/table/snapshot_producers.go
index c310499f..9bbc0e85 100644
--- a/table/snapshot_producers.go
+++ b/table/snapshot_producers.go
@@ -182,7 +182,7 @@ func (of *overwriteFiles) existingManifests()
([]iceberg.ManifestFile, error) {
return nil, err
}
- return wr.ToManifestFile(path, counter.Count)
+ return wr.ToManifestFile(path, counter.Count,
iceberg.WithManifestFileContent(m.ManifestContent()))
}
mf, err := rewriteManifest(m, notDeleted)
@@ -427,16 +427,17 @@ func (m *mergeAppendFiles) processManifests(manifests
[]iceberg.ManifestFile) ([
type snapshotProducer struct {
producerImpl
- commitUuid uuid.UUID
- io iceio.WriteFileIO
- txn *Transaction
- op Operation
- snapshotID int64
- parentSnapshotID int64
- addedFiles []iceberg.DataFile
- manifestCount atomic.Int32
- deletedFiles map[string]iceberg.DataFile
- snapshotProps iceberg.Properties
+ commitUuid uuid.UUID
+ io iceio.WriteFileIO
+ txn *Transaction
+ op Operation
+ snapshotID int64
+ parentSnapshotID int64
+ addedFiles []iceberg.DataFile
+ positionDeleteFiles []iceberg.DataFile
+ manifestCount atomic.Int32
+ deletedFiles map[string]iceberg.DataFile
+ snapshotProps iceberg.Properties
}
func createSnapshotProducer(op Operation, txn *Transaction, fs
iceio.WriteFileIO, commitUUID *uuid.UUID, snapshotProps iceberg.Properties)
*snapshotProducer {
@@ -482,6 +483,12 @@ func (sp *snapshotProducer) appendDataFile(df
iceberg.DataFile) *snapshotProduce
return sp
}
+func (sp *snapshotProducer) appendPositionDeleteFile(df iceberg.DataFile)
*snapshotProducer {
+ sp.positionDeleteFiles = append(sp.positionDeleteFiles, df)
+
+ return sp
+}
+
func (sp *snapshotProducer) deleteDataFile(df iceberg.DataFile)
*snapshotProducer {
sp.deletedFiles[df.FilePath()] = df
@@ -531,49 +538,17 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
var g errgroup.Group
- results := [...][]iceberg.ManifestFile{nil, nil, nil}
+ addedManifests := make([]iceberg.ManifestFile, 0)
+ positionDeleteManifests := make([]iceberg.ManifestFile, 0)
+ var deletedFilesManifests []iceberg.ManifestFile
+ var existingManifests []iceberg.ManifestFile
if len(sp.addedFiles) > 0 {
- g.Go(func() (err error) {
- out, path, err := sp.newManifestOutput()
- if err != nil {
- return err
- }
- defer internal.CheckedClose(out, &err)
-
- counter := &internal.CountingWriter{W: out}
- currentSpec, err := sp.txn.meta.CurrentSpec()
- if err != nil || currentSpec == nil {
- return fmt.Errorf("could not get current
partition spec: %w", err)
- }
- wr, err :=
iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter,
- *currentSpec, sp.txn.meta.CurrentSchema(),
- sp.snapshotID)
- if err != nil {
- return err
- }
- defer internal.CheckedClose(wr, &err)
-
- for _, df := range sp.addedFiles {
- err :=
wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
- nil, nil, df))
- if err != nil {
- return err
- }
- }
-
- // close the writer to force a flush and ensure
counter.Count is accurate
- if err := wr.Close(); err != nil {
- return err
- }
-
- mf, err := wr.ToManifestFile(path, counter.Count)
- if err == nil {
- results[0] = append(results[0], mf)
- }
+ g.Go(sp.manifestProducer(iceberg.ManifestContentData,
sp.addedFiles, &addedManifests))
+ }
- return err
- })
+ if len(sp.positionDeleteFiles) > 0 {
+ g.Go(sp.manifestProducer(iceberg.ManifestContentDeletes,
sp.positionDeleteFiles, &positionDeleteManifests))
}
if len(deleted) > 0 {
@@ -607,7 +582,7 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
if err != nil {
return err
}
- results[1] = append(results[1], mf)
+ deletedFilesManifests =
append(deletedFilesManifests, mf)
}
return nil
@@ -619,7 +594,7 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
if err != nil {
return err
}
- results[2] = m
+ existingManifests = m
return nil
})
@@ -628,11 +603,55 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
return nil, err
}
- manifests := slices.Concat(results[0], results[1], results[2])
+ manifests := slices.Concat(addedManifests, positionDeleteManifests,
deletedFilesManifests, existingManifests)
return sp.processManifests(manifests)
}
+func (sp *snapshotProducer) manifestProducer(content iceberg.ManifestContent,
files []iceberg.DataFile, output *[]iceberg.ManifestFile) func() (err error) {
+ return func() (err error) {
+ out, path, err := sp.newManifestOutput()
+ if err != nil {
+ return err
+ }
+ defer internal.CheckedClose(out, &err)
+
+ counter := &internal.CountingWriter{W: out}
+ currentSpec, err := sp.txn.meta.CurrentSpec()
+ if err != nil || currentSpec == nil {
+ return fmt.Errorf("could not get current partition
spec: %w", err)
+ }
+ wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion,
counter,
+ *currentSpec, sp.txn.meta.CurrentSchema(),
+ sp.snapshotID,
iceberg.WithManifestWriterContent(content))
+ if err != nil {
+ return err
+ }
+ defer internal.CheckedClose(wr, &err)
+
+ for _, df := range files {
+ err :=
wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
+ nil, nil, df))
+ if err != nil {
+ return err
+ }
+ }
+
+ // close the writer to force a flush and ensure counter.Count
is accurate
+ if err := wr.Close(); err != nil {
+ return err
+ }
+
+ mf, err := wr.ToManifestFile(path, counter.Count,
iceberg.WithManifestFileContent(content))
+ if err != nil {
+ return err
+ }
+ *output = []iceberg.ManifestFile{mf}
+
+ return nil
+ }
+}
+
func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error)
{
var ssc SnapshotSummaryCollector
partitionSummaryLimit := sp.txn.meta.props.
@@ -649,6 +668,11 @@ func (sp *snapshotProducer) summary(props
iceberg.Properties) (Summary, error) {
return Summary{}, err
}
}
+ for _, df := range sp.positionDeleteFiles {
+ if err = ssc.addFile(df, currentSchema, *partitionSpec); err !=
nil {
+ return Summary{}, err
+ }
+ }
if len(sp.deletedFiles) > 0 {
specs := sp.txn.meta.specs
diff --git a/table/table_test.go b/table/table_test.go
index 0e6c3715..6b47bcb0 100644
--- a/table/table_test.go
+++ b/table/table_test.go
@@ -22,13 +22,14 @@ import (
"compress/gzip"
"context"
"encoding/json"
- "errors"
"fmt"
"io"
"io/fs"
"log"
+ "maps"
"os"
"path/filepath"
+ "reflect"
"runtime"
"slices"
"strconv"
@@ -2176,9 +2177,11 @@ func (t *TableWritingTestSuite) TestOverwriteRecord() {
// TestDelete verifies that Table.Delete properly delegates to
Transaction.Delete
func (t *TableWritingTestSuite) TestDelete() {
testCases := []struct {
- name string
- table *table.Table
- expectedErr error
+ name string
+ table *table.Table
+ formatVersionRequirement int
+ expectedSnapshotSummary *table.Summary
+ expectedErr error
}{
{
name: "success with copy-on-write",
@@ -2188,10 +2191,51 @@ func (t *TableWritingTestSuite) TestDelete() {
*iceberg.UnpartitionedSpec,
t.tableSchema,
),
+ expectedSnapshotSummary: &table.Summary{
+ Operation: table.OpDelete,
+ Properties: map[string]string{
+ "added-data-files": "1",
+ "added-records": "1",
+ "deleted-data-files": "1",
+ "deleted-records": "2",
+ "total-data-files": "1",
+ "total-delete-files": "0",
+ "total-equality-deletes": "0",
+ "total-position-deletes": "0",
+ "total-records": "1",
+ },
+ },
+ expectedErr: nil,
+ },
+ {
+ name: "fallback to copy-on-write
when on v1 format",
+ formatVersionRequirement: 1,
+ table: t.createTableWithProps(
+ table.Identifier{"default",
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
+ map[string]string{
+ table.PropertyFormatVersion:
strconv.Itoa(t.formatVersion),
+ table.WriteDeleteModeKey:
table.WriteModeMergeOnRead,
+ },
+ t.tableSchema,
+ ),
+ expectedSnapshotSummary: &table.Summary{
+ Operation: table.OpDelete,
+ Properties: map[string]string{
+ "added-data-files": "1",
+ "added-records": "1",
+ "deleted-data-files": "1",
+ "deleted-records": "2",
+ "total-data-files": "1",
+ "total-delete-files": "0",
+ "total-equality-deletes": "0",
+ "total-position-deletes": "0",
+ "total-records": "1",
+ },
+ },
expectedErr: nil,
},
{
- name: "abort on merge-on-read",
+ name: "success with merge-on-read on v2 format",
table: t.createTableWithProps(
table.Identifier{"default",
"overwrite_record_wrapper_v" + strconv.Itoa(t.formatVersion)},
map[string]string{
@@ -2200,15 +2244,38 @@ func (t *TableWritingTestSuite) TestDelete() {
},
t.tableSchema,
),
- expectedErr: errors.New("only 'copy-on-write' is
currently supported"),
+ // Position deletes are only
+ formatVersionRequirement: 2,
+ expectedSnapshotSummary: &table.Summary{
+ Operation: table.OpDelete,
+ Properties: map[string]string{
+ "added-delete-files": "1",
+ "added-position-delete-files": "1",
+ "added-position-deletes": "1",
+ "total-data-files": "1",
+ "total-delete-files": "1",
+ "total-equality-deletes": "0",
+ "total-position-deletes": "1",
+ "total-records": "2",
+ },
+ },
+ expectedErr: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func() {
- // Set up the test table with some data
+ // Skip this test case execution if the format version
isn't the one the test case is limited to. This
+ // is because some test cases don't have the same
expected behavior or don't apply because of format
+ // specific features
+ if tc.formatVersionRequirement > 0 &&
tc.formatVersionRequirement != t.formatVersion {
+ return
+ }
+ // Set up the test table with some data. Include more
than just one row so that the delete operation
+ // is not a straight file deletion
newTable, err :=
array.TableFromJSON(memory.DefaultAllocator, t.arrSchema, []string{
- `[{"foo": false, "bar": "wrapper_test", "baz":
123, "qux": "2024-01-01"}]`,
+ `[{"foo": false, "bar": "wrapper_test", "baz":
123, "qux": "2024-01-01"},
+ {"foo": true, "bar": "keep_this", "baz": 456, "qux":
"2024-01-02"}]`,
})
t.Require().NoError(err)
defer newTable.Release()
@@ -2219,7 +2286,7 @@ func (t *TableWritingTestSuite) TestDelete() {
// Validate the pre-requisite that data is present on
the table before we go ahead and delete it
arrowTable, err := tbl.Scan().ToArrowTable(t.ctx)
t.Require().NoError(err)
- t.Equal(int64(1), arrowTable.NumRows())
+ t.Equal(int64(2), arrowTable.NumRows())
tbl, err = tbl.Delete(t.ctx,
iceberg.EqualTo(iceberg.Reference("bar"), "wrapper_test"), nil)
// If an error was expected, check that it's the
correct one and abort validating the operation
@@ -2231,12 +2298,12 @@ func (t *TableWritingTestSuite) TestDelete() {
snapshot := tbl.CurrentSnapshot()
t.NotNil(snapshot)
- t.Equal(table.OpDelete, snapshot.Summary.Operation)
+
t.NoError(equalSnapshotSummary(tc.expectedSnapshotSummary, snapshot.Summary))
arrowTable, err = tbl.Scan().ToArrowTable(t.ctx)
t.Require().NoError(err)
- t.Equal(int64(0), arrowTable.NumRows())
+ t.Equal(int64(1), arrowTable.NumRows())
})
}
}
@@ -2370,13 +2437,13 @@ func (m *DeleteOldMetadataMockedCatalog)
CommitTable(ctx context.Context, ident
location := m.metadata.Location()
randid := uuid.New().String()
- metdatafile := fmt.Sprintf("%s/metadata/%s.metadata.json", location,
randid)
+ metadatafile := fmt.Sprintf("%s/metadata/%s.metadata.json", location,
randid)
// removing old metadata files
bldr.TrimMetadataLogs(0)
bldr.AppendMetadataLog(table.MetadataLogEntry{
- MetadataFile: metdatafile,
+ MetadataFile: metadatafile,
TimestampMs: time.Now().UnixMilli(),
})
@@ -2393,7 +2460,7 @@ func (m *DeleteOldMetadataMockedCatalog) CommitTable(ctx
context.Context, ident
m.metadata = meta
- return meta, metdatafile, nil
+ return meta, metadatafile, nil
}
func createMetadataFile(metadatadir, metadataFile string) error {
@@ -2702,3 +2769,59 @@ func (t *TableTestSuite)
TestMetadataCompressionRoundTrip() {
t.True(tbl.Equals(*tbl2))
}
+
+type snapshotSummaryMatcher struct{}
+
+func (m *snapshotSummaryMatcher) Matches(expected *table.Summary, actual
*table.Summary) bool {
+ if expected == nil && actual == nil {
+ return true
+ }
+ if expected == nil {
+ return false
+ }
+ if actual == nil {
+ return false
+ }
+ // Filter properties to validate by deleting all the ones that aren't
in the expected summary properties. This is
+ // to allow ignoring the properties that vary per environment/test
execution like file sizes
+ filtered := cloneSummaryAndFilterProperties(expected, actual)
+
+ return reflect.DeepEqual(expected, filtered)
+}
+
+// cloneSummaryAndFilterProperties clones a summary and filters out any
summary properties that aren't part of the
+// expected summary. This is useful to ignore properties that we don't wish to
validate
+func cloneSummaryAndFilterProperties(expected *table.Summary, actual
*table.Summary) *table.Summary {
+ actualPropertiesFiltered := maps.Clone(actual.Properties)
+ maps.DeleteFunc(actualPropertiesFiltered, func(k string, v string) bool
{
+ _, ok := expected.Properties[k]
+
+ return !ok
+ })
+ filtered := *actual
+ filtered.Properties = actualPropertiesFiltered
+
+ return &filtered
+}
+
+func (m *snapshotSummaryMatcher) Format(val *table.Summary) string {
+ if val == nil {
+ return "nil"
+ }
+
+ return fmt.Sprintf("{Operation: %s, Properties: %#v}", val.Operation,
val.Properties)
+}
+
+// equalSnapshotSummary invokes a snapshotSummaryMatcher to compare two
snapshot summary values.
+// Its return value is nil if both values are equal and an error with a
meaningful formatted message to help
+// show the mismatch in case they are not. This is meant to be used with
testify like:
+//
+// assert.NoError(t, equalSnapshotSummary(expected, actual))
+func equalSnapshotSummary(expected *table.Summary, actual *table.Summary) (err
error) {
+ matcher := &snapshotSummaryMatcher{}
+ if !matcher.Matches(expected, actual) {
+ return fmt.Errorf("Expected: %s\nActual: %s",
matcher.Format(expected), matcher.Format(actual))
+ }
+
+ return nil
+}
diff --git a/table/transaction.go b/table/transaction.go
index 765d9149..49081792 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "iter"
"runtime"
"slices"
"sync"
@@ -30,8 +31,11 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/compute/exprs"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table/internal"
+ "github.com/apache/iceberg-go/table/substrait"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
@@ -897,7 +901,7 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx
context.Context, operation
commitUUID := uuid.New()
updater := t.updateSnapshot(fs, snapshotProps,
operation).mergeOverwrite(&commitUUID)
- filesToDelete, filesToRewrite, err := t.classifyFilesForOverwrite(ctx,
fs, filter, caseSensitive, concurrency)
+ filesToDelete, filesToRewrite, err := t.classifyFilesForDeletions(ctx,
fs, filter, caseSensitive, concurrency)
if err != nil {
return nil, err
}
@@ -915,6 +919,45 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx
context.Context, operation
return updater, nil
}
+func (t *Transaction) performMergeOnReadDeletion(ctx context.Context,
snapshotProps iceberg.Properties, filter iceberg.BooleanExpression,
caseSensitive bool, concurrency int) (*snapshotProducer, error) {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return nil, err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs, snapshotProps,
OpDelete).mergeOverwrite(&commitUUID)
+
+ filesToDelete, withPartialDeletions, err :=
t.classifyFilesForDeletions(ctx, fs, filter, caseSensitive, concurrency)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, df := range filesToDelete {
+ updater.deleteDataFile(df)
+ }
+
+ if len(withPartialDeletions) > 0 {
+ if err := t.writePositionDeletesForFiles(ctx, fs, updater,
withPartialDeletions, filter, caseSensitive, concurrency, commitUUID); err !=
nil {
+ return nil, err
+ }
+ }
+
+ return updater, nil
+}
+
type DeleteOption func(deleteOp *deleteOperation)
type deleteOperation struct {
@@ -958,7 +1001,7 @@ func WithDeleteCaseInsensitive() DeleteOption {
//
// The concurrency parameter controls the level of parallelism for manifest
processing and file rewriting and
// can be overridden using the WithOverwriteConcurrency option. Defaults to
runtime.GOMAXPROCS(0).
-func (t *Transaction) Delete(ctx context.Context, filter
iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts
...DeleteOption) error {
+func (t *Transaction) Delete(ctx context.Context, filter
iceberg.BooleanExpression, snapshotProps iceberg.Properties, opts
...DeleteOption) (err error) {
deleteOp := deleteOperation{
concurrency: runtime.GOMAXPROCS(0),
caseSensitive: true,
@@ -967,14 +1010,28 @@ func (t *Transaction) Delete(ctx context.Context, filter
iceberg.BooleanExpressi
apply(&deleteOp)
}
- writeDeleteMode := t.meta.props.Get(WriteDeleteModeKey,
WriteDeleteModeDefault)
- if writeDeleteMode != WriteModeCopyOnWrite {
- return fmt.Errorf("'%s' is set to '%s' but only '%s' is
currently supported", WriteDeleteModeKey, writeDeleteMode, WriteModeCopyOnWrite)
+ var updater *snapshotProducer
+ writeDeleteMode := WriteDeleteModeDefault
+ // Only copy on write is supported on v1 so we ignore any override to
the write delete mode unless the version is
+ // 2 and up
+ if t.meta.formatVersion > 1 {
+ writeDeleteMode = t.meta.props.Get(WriteDeleteModeKey,
WriteDeleteModeDefault)
}
- updater, err := t.performCopyOnWriteDeletion(ctx, OpDelete,
snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency)
- if err != nil {
- return err
+ switch writeDeleteMode {
+ case WriteModeCopyOnWrite:
+ updater, err = t.performCopyOnWriteDeletion(ctx, OpDelete,
snapshotProps, filter, deleteOp.caseSensitive, deleteOp.concurrency)
+ if err != nil {
+ return err
+ }
+ case WriteModeMergeOnRead:
+ updater, err = t.performMergeOnReadDeletion(ctx, snapshotProps,
filter, deleteOp.caseSensitive, deleteOp.concurrency)
+ if err != nil {
+ return err
+ }
+ default:
+ return fmt.Errorf("unsupported write mode: '%s'",
writeDeleteMode)
}
+
updates, reqs, err := updater.commit()
if err != nil {
return err
@@ -983,9 +1040,9 @@ func (t *Transaction) Delete(ctx context.Context, filter
iceberg.BooleanExpressi
return t.apply(updates, reqs)
}
-// classifyFilesForOverwrite classifies existing data files based on the
provided filter.
+// classifyFilesForDeletions classifies existing data files based on the
provided filter.
// Returns files to delete completely, files to rewrite partially, and any
error.
-func (t *Transaction) classifyFilesForOverwrite(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int)
(filesToDelete, filesToRewrite []iceberg.DataFile, err error) {
+func (t *Transaction) classifyFilesForDeletions(ctx context.Context, fs io.IO,
filter iceberg.BooleanExpression, caseSensitive bool, concurrency int)
(filesToDelete, filesWithPartialDeletions []iceberg.DataFile, err error) {
s := t.meta.currentSnapshot()
if s == nil {
return nil, nil, nil
@@ -1001,16 +1058,46 @@ func (t *Transaction) classifyFilesForOverwrite(ctx
context.Context, fs io.IO, f
}
}
- return filesToDelete, filesToRewrite, nil
+ return filesToDelete, filesWithPartialDeletions, nil
}
- return t.classifyFilesForFilteredOverwrite(ctx, fs, filter,
caseSensitive, concurrency)
+ return t.classifyFilesForFilteredDeletions(ctx, fs, filter,
caseSensitive, concurrency)
+}
+
+type fileClassificationTask struct {
+ meta Metadata
+ partitionFilters *keyDefaultMap[int, iceberg.BooleanExpression]
+ caseSensitive bool
+ rowFilter iceberg.BooleanExpression
}
-// classifyFilesForFilteredOverwrite classifies files for filtered overwrite
operations.
+func newFileClassificationTask(meta Metadata, rowFilter
iceberg.BooleanExpression, caseSensitive bool) *fileClassificationTask {
+ classificationTask := &fileClassificationTask{
+ meta: meta,
+ caseSensitive: caseSensitive,
+ rowFilter: rowFilter,
+ }
+ classificationTask.partitionFilters =
newKeyDefaultMapWrapErr(classificationTask.buildPartitionProjection)
+
+ return classificationTask
+}
+
+func (t *fileClassificationTask) buildManifestEvaluator(specID int)
(func(iceberg.ManifestFile) (bool, error), error) {
+ return buildManifestEvaluator(specID, t.meta, t.partitionFilters,
t.caseSensitive)
+}
+
+func (t *fileClassificationTask) buildPartitionProjection(specID int)
(iceberg.BooleanExpression, error) {
+ return buildPartitionProjection(specID, t.meta, t.rowFilter,
t.caseSensitive)
+}
+
+// classifyFilesForFilteredDeletions classifies files for filtered overwrite
operations.
// Returns files to delete completely, files to rewrite partially, and any
error.
-func (t *Transaction) classifyFilesForFilteredOverwrite(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency
int) (filesToDelete, filesToRewrite []iceberg.DataFile, err error) {
+func (t *Transaction) classifyFilesForFilteredDeletions(ctx context.Context,
fs io.IO, filter iceberg.BooleanExpression, caseSensitive bool, concurrency
int) (filesToDelete, filesWithPartialDeletes []iceberg.DataFile, err error) {
schema := t.meta.CurrentSchema()
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, nil, err
+ }
inclusiveEvaluator, err := newInclusiveMetricsEvaluator(schema, filter,
caseSensitive, false)
if err != nil {
@@ -1022,18 +1109,8 @@ func (t *Transaction)
classifyFilesForFilteredOverwrite(ctx context.Context, fs
return nil, nil, fmt.Errorf("failed to create strict metrics
evaluator: %w", err)
}
- var manifestEval func(iceberg.ManifestFile) (bool, error)
- meta, err := t.meta.Build()
- if err != nil {
- return nil, nil, fmt.Errorf("failed to build metadata: %w", err)
- }
- spec := meta.PartitionSpec()
- if !spec.IsUnpartitioned() {
- manifestEval, err = newManifestEvaluator(spec, schema, filter,
caseSensitive)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to create manifest
evaluator: %w", err)
- }
- }
+ classificationTask := newFileClassificationTask(meta, filter,
caseSensitive)
+ manifestEvaluators :=
newKeyDefaultMapWrapErr(classificationTask.buildManifestEvaluator)
s := t.meta.currentSnapshot()
var manifests []iceberg.ManifestFile
@@ -1044,11 +1121,7 @@ func (t *Transaction)
classifyFilesForFilteredOverwrite(ctx context.Context, fs
}
}
- var (
- mu sync.Mutex
- allFilesToDel []iceberg.DataFile
- allFilesToRewr []iceberg.DataFile
- )
+ var mu sync.Mutex
g, _ := errgroup.WithContext(ctx)
g.SetLimit(min(concurrency, len(manifests)))
@@ -1056,6 +1129,7 @@ func (t *Transaction)
classifyFilesForFilteredOverwrite(ctx context.Context, fs
for _, manifest := range manifests {
manifest := manifest // capture loop variable
g.Go(func() error {
+ manifestEval :=
manifestEvaluators.Get(int(manifest.PartitionSpecID()))
if manifestEval != nil {
match, err := manifestEval(manifest)
if err != nil {
@@ -1107,8 +1181,8 @@ func (t *Transaction)
classifyFilesForFilteredOverwrite(ctx context.Context, fs
if len(localDelete) > 0 || len(localRewrite) > 0 {
mu.Lock()
- allFilesToDel = append(allFilesToDel,
localDelete...)
- allFilesToRewr = append(allFilesToRewr,
localRewrite...)
+ filesToDelete = append(filesToDelete,
localDelete...)
+ filesWithPartialDeletes =
append(filesWithPartialDeletes, localRewrite...)
mu.Unlock()
}
@@ -1120,7 +1194,7 @@ func (t *Transaction)
classifyFilesForFilteredOverwrite(ctx context.Context, fs
return nil, nil, err
}
- return allFilesToDel, allFilesToRewr, nil
+ return filesToDelete, filesWithPartialDeletes, nil
}
// rewriteFilesWithFilter rewrites data files by preserving only rows that do
NOT match the filter
@@ -1212,6 +1286,117 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
return result, nil
}
+// writePositionDeletesForFiles rewrites data files by preserving only rows
that do NOT match the filter
+func (t *Transaction) writePositionDeletesForFiles(ctx context.Context, fs
io.IO, updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, caseSensitive bool, concurrency int, commitUUID
uuid.UUID) error {
+ posDeleteRecIter, err := t.makePositionDeleteRecordsForFilter(ctx, fs,
files, filter, caseSensitive, concurrency)
+ if err != nil {
+ return err
+ }
+
+ partitionContextByFilePath := make(map[string]partitionContext,
len(files))
+ for _, df := range files {
+ partitionContextByFilePath[df.FilePath()] =
partitionContext{partitionData: df.Partition(), specID: df.SpecID()}
+ }
+
+ posDeleteFiles := positionDeleteRecordsToDataFiles(ctx,
t.tbl.Location(), t.meta, partitionContextByFilePath, recordWritingArgs{
+ sc: PositionalDeleteArrowSchema,
+ itr: posDeleteRecIter,
+ writeUUID: &commitUUID,
+ fs: fs.(io.WriteFileIO),
+ })
+
+ for f, err := range posDeleteFiles {
+ if err != nil {
+ return err
+ }
+ updater.appendPositionDeleteFile(f)
+ }
+
+ return nil
+}
+
+func (t *Transaction) makePositionDeleteRecordsForFilter(ctx context.Context,
fs io.IO, files []iceberg.DataFile, filter iceberg.BooleanExpression,
caseSensitive bool, concurrency int) (seq2 iter.Seq2[arrow.RecordBatch, error],
err error) {
+ tasks := make([]FileScanTask, 0, len(files))
+ for _, f := range files {
+ tasks = append(tasks, FileScanTask{
+ File: f,
+ Start: 0,
+ Length: f.FileSizeBytes(),
+ })
+ }
+
+ boundFilter, err := iceberg.BindExpr(t.meta.CurrentSchema(), filter,
caseSensitive)
+ if err != nil {
+ return nil, fmt.Errorf("failed to bind filter: %w", err)
+ }
+
+ meta, err := t.meta.Build()
+ if err != nil {
+ return nil, fmt.Errorf("failed to build metadata: %w", err)
+ }
+
+ scanner := &arrowScan{
+ metadata: meta,
+ fs: fs,
+ projectedSchema: t.meta.CurrentSchema(),
+ boundRowFilter: boundFilter,
+ caseSensitive: caseSensitive,
+ rowLimit: -1, // No limit
+ concurrency: concurrency,
+ }
+
+ deletesPerFile, err := readAllDeleteFiles(ctx, fs, tasks, concurrency)
+ if err != nil {
+ return nil, err
+ }
+
+ extSet := substrait.NewExtensionSet()
+
+ ctx, cancel := context.WithCancelCause(exprs.WithExtensionIDSet(ctx,
extSet))
+ taskChan := make(chan internal.Enumerated[FileScanTask], len(tasks))
+
+ numWorkers := min(concurrency, len(tasks))
+ records := make(chan enumeratedRecord, numWorkers)
+
+ var wg sync.WaitGroup
+ wg.Add(numWorkers)
+ for i := 0; i < numWorkers; i++ {
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case task, ok := <-taskChan:
+ if !ok {
+ return
+ }
+
+ if err :=
scanner.producePosDeletesFromTask(ctx, task,
deletesPerFile[task.Value.File.FilePath()], records); err != nil {
+ cancel(err)
+
+ return
+ }
+ }
+ }
+ }()
+ }
+
+ go func() {
+ for i, t := range tasks {
+ taskChan <- internal.Enumerated[FileScanTask]{
+ Value: t, Index: i, Last: i == len(tasks)-1,
+ }
+ }
+ close(taskChan)
+
+ wg.Wait()
+ close(records)
+ }()
+
+ return createIterator(ctx, uint(numWorkers), records, deletesPerFile,
cancel, scanner.rowLimit), nil
+}
+
func (t *Transaction) Scan(opts ...ScanOption) (*Scan, error) {
updatedMeta, err := t.meta.Build()
if err != nil {
diff --git a/table/transaction_test.go b/table/transaction_test.go
index db06d64d..d356b468 100644
--- a/table/transaction_test.go
+++ b/table/transaction_test.go
@@ -536,6 +536,120 @@ func (s *SparkIntegrationTestSuite)
TestDeleteInsensitive() {
+----------+---------+---+`)
}
+func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadUnpartitioned() {
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "first_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "last_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32},
+ )
+
+ tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default",
"go_test_merge_on_read_delete"), icebergSchema,
+ catalog.WithProperties(
+ map[string]string{
+ table.WriteDeleteModeKey:
table.WriteModeMergeOnRead,
+ },
+ ),
+ )
+ s.Require().NoError(err)
+
+ arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true,
false)
+ s.Require().NoError(err)
+
+ initialTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"first_name": "alan", "last_name": "gopher", "age": 7},
+ {"first_name": "steve", "last_name": "gopher", "age":
5},
+ {"first_name": "dead", "last_name": "gopher", "age": 97}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer initialTable.Release()
+
+ tx := tbl.NewTransaction()
+ err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+ s.Require().NoError(err)
+ tbl, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ // Delete the dead gopher and confirm that alan and steve are still
present
+ filter := iceberg.EqualTo(iceberg.Reference("first_name"), "dead")
+ tx = tbl.NewTransaction()
+ err = tx.Delete(s.ctx, filter, nil)
+ s.Require().NoError(err)
+ _, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT * FROM default.go_test_merge_on_read_delete ORDER BY age")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve |gopher |5 |
+|alan |gopher |7 |
++----------+---------+---+`)
+}
+
+func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadPartitioned() {
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "first_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "last_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32},
+ )
+
+ spec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+ SourceID: 3,
+ Name: "age_bucket",
+ Transform: iceberg.BucketTransform{
+ NumBuckets: 2,
+ },
+ })
+ tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default",
"go_test_merge_on_read_delete_partitioned"), icebergSchema,
+ catalog.WithProperties(
+ map[string]string{
+ table.WriteDeleteModeKey:
table.WriteModeMergeOnRead,
+ },
+ ),
+ catalog.WithPartitionSpec(&spec),
+ )
+ s.Require().NoError(err)
+
+ arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true,
false)
+ s.Require().NoError(err)
+
+ initialTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"first_name": "alan", "last_name": "gopher", "age": 7},
+ {"first_name": "steve", "last_name": "gopher", "age":
5},
+ {"first_name": "dead", "last_name": "gopher", "age":
97},
+ {"first_name": "uncle", "last_name": "gopher", "age":
90}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer initialTable.Release()
+
+ tx := tbl.NewTransaction()
+ err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+ s.Require().NoError(err)
+ tbl, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ // Delete the dead gopher and confirm that alan and steve are still
present
+ filter := iceberg.NewAnd(iceberg.GreaterThan(iceberg.Reference("age"),
"50"), iceberg.EqualTo(iceberg.Reference("first_name"), "dead"))
+ tx = tbl.NewTransaction()
+ err = tx.Delete(s.ctx, filter, nil)
+ s.Require().NoError(err)
+ _, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT * FROM default.go_test_merge_on_read_delete_partitioned ORDER BY age")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve |gopher |5 |
+|alan |gopher |7 |
+|uncle |gopher |90 |
++----------+---------+---+`)
+}
+
func TestSparkIntegration(t *testing.T) {
suite.Run(t, new(SparkIntegrationTestSuite))
}
diff --git a/table/writer.go b/table/writer.go
index 8952b6c6..cdda0357 100644
--- a/table/writer.go
+++ b/table/writer.go
@@ -47,16 +47,58 @@ func (w WriteTask) GenerateDataFileName(extension string)
string {
return fmt.Sprintf("%05d-%d-%s-%05d.%s", w.PartitionID, w.ID, w.Uuid,
w.FileCount, extension)
}
-type writer struct {
+type defaultDataFileWriter struct {
loc LocationProvider
fs io.WriteFileIO
fileSchema *iceberg.Schema
format internal.FileFormat
- props any
+ props iceberg.Properties
+ content iceberg.ManifestEntryContent
meta *MetadataBuilder
}
-func (w *writer) writeFile(ctx context.Context, partitionValues map[int]any,
task WriteTask) (iceberg.DataFile, error) {
+type dataFileWriterOption func(writer *defaultDataFileWriter)
+
+func withFormat(format internal.FileFormat) dataFileWriterOption {
+ return func(writer *defaultDataFileWriter) {
+ writer.format = format
+ }
+}
+
+func withFileSchema(schema *iceberg.Schema) dataFileWriterOption {
+ return func(writer *defaultDataFileWriter) {
+ writer.fileSchema = schema
+ }
+}
+
+func withContent(content iceberg.ManifestEntryContent) dataFileWriterOption {
+ return func(writer *defaultDataFileWriter) {
+ writer.content = content
+ }
+}
+
+func newDataFileWriter(rootLocation string, fs io.WriteFileIO, meta
*MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption)
(*defaultDataFileWriter, error) {
+ locProvider, err := LoadLocationProvider(rootLocation, props)
+ if err != nil {
+ return nil, err
+ }
+ w := defaultDataFileWriter{
+ loc: locProvider,
+ fs: fs,
+ fileSchema: meta.CurrentSchema(),
+ format: internal.GetFileFormat(iceberg.ParquetFile),
+ content: iceberg.EntryContentData,
+ props: props,
+ meta: meta,
+ }
+ for _, apply := range opts {
+ apply(&w)
+ }
+
+ return &w, nil
+}
+
+func (w *defaultDataFileWriter) writeFile(ctx context.Context, partitionValues
map[int]any, task WriteTask) (iceberg.DataFile, error) {
defer func() {
for _, b := range task.Batches {
b.Release()
@@ -89,49 +131,76 @@ func (w *writer) writeFile(ctx context.Context,
partitionValues map[int]any, tas
return w.format.WriteDataFile(ctx, w.fs, partitionValues,
internal.WriteFileInfo{
FileSchema: w.fileSchema,
+ Content: w.content,
FileName: filePath,
StatsCols: statsCols,
- WriteProps: w.props,
+ WriteProps: w.format.GetWriteProperties(w.props),
Spec: *currentSpec,
}, batches)
}
-func writeFiles(ctx context.Context, rootLocation string, fs io.WriteFileIO,
meta *MetadataBuilder, partitionValues map[int]any, tasks iter.Seq[WriteTask])
iter.Seq2[iceberg.DataFile, error] {
- locProvider, err := LoadLocationProvider(rootLocation, meta.props)
- if err != nil {
- return func(yield func(iceberg.DataFile, error) bool) {
- yield(nil, err)
- }
+type dataFileWriter interface {
+ writeFile(ctx context.Context, partitionValues map[int]any, task
WriteTask) (iceberg.DataFile, error)
+}
+
+func newPositionDeleteWriter(rootLocation string, fs io.WriteFileIO, meta
*MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption)
(*defaultDataFileWriter, error) {
+ // Always enforce the file schema to be the Positional Delete Schema by
appending the option at the very end
+ return newDataFileWriter(rootLocation, fs, meta, props, append(opts,
withFileSchema(iceberg.PositionalDeleteSchema),
withContent(iceberg.EntryContentPosDeletes))...)
+}
+
+type dataFileWriterMaker func(rootLocation string, fs io.WriteFileIO, meta
*MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption)
(dataFileWriter, error)
+
+type concurrentDataFileWriter struct {
+ newDataFileWriter dataFileWriterMaker
+ sanitizeSchema bool
+}
+
+type concurrentDataFileWriterOption func(w *concurrentDataFileWriter)
+
+func withSchemaSanitization(enabled bool) concurrentDataFileWriterOption {
+ return func(w *concurrentDataFileWriter) {
+ w.sanitizeSchema = enabled
}
+}
- format := internal.GetFileFormat(iceberg.ParquetFile)
+func newConcurrentDataFileWriter(newDataFileWriter dataFileWriterMaker, opts
...concurrentDataFileWriterOption) *concurrentDataFileWriter {
+ w := concurrentDataFileWriter{
+ newDataFileWriter: newDataFileWriter,
+ sanitizeSchema: true,
+ }
+ for _, apply := range opts {
+ apply(&w)
+ }
+
+ return &w
+}
+
+func (w *concurrentDataFileWriter) writeFiles(ctx context.Context,
rootLocation string, fs io.WriteFileIO, meta *MetadataBuilder, props
iceberg.Properties, partitionValues map[int]any, tasks iter.Seq[WriteTask])
iter.Seq2[iceberg.DataFile, error] {
fileSchema := meta.CurrentSchema()
- sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
- if err != nil {
- return func(yield func(iceberg.DataFile, error) bool) {
- yield(nil, err)
+ if w.sanitizeSchema {
+ sanitized, err := iceberg.SanitizeColumnNames(fileSchema)
+ if err != nil {
+ return func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
}
- }
- // if the schema needs to be transformed, use the transformed schema
- // and adjust the arrow schema appropriately. otherwise we just
- // use the original schema.
- if !sanitized.Equals(fileSchema) {
- fileSchema = sanitized
+ // if the schema needs to be transformed, use the transformed
schema
+ // and adjust the arrow schema appropriately. otherwise we just
+ // use the original schema.
+ if !sanitized.Equals(fileSchema) {
+ fileSchema = sanitized
+ }
}
- w := &writer{
- loc: locProvider,
- fs: fs,
- fileSchema: fileSchema,
- format: format,
- props: format.GetWriteProperties(meta.props),
- meta: meta,
+ fw, err := w.newDataFileWriter(rootLocation, fs, meta, props,
withFileSchema(fileSchema))
+ if err != nil {
+ return func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
}
- nworkers := config.EnvConfig.MaxWorkers
-
- return internal.MapExec(nworkers, tasks, func(t WriteTask)
(iceberg.DataFile, error) {
- return w.writeFile(ctx, partitionValues, t)
+ return internal.MapExec(config.EnvConfig.MaxWorkers, tasks, func(t
WriteTask) (iceberg.DataFile, error) {
+ return fw.writeFile(ctx, partitionValues, t)
})
}