laskoviymishka commented on code in PR #721:
URL: https://github.com/apache/iceberg-go/pull/721#discussion_r2849808292
##########
table/transaction.go:
##########
@@ -915,6 +920,45 @@ func (t *Transaction) performCopyOnWriteDeletion(ctx
context.Context, operation
return updater, nil
}
+func (t *Transaction) performMergeOnReadDeletion(ctx context.Context,
snapshotProps iceberg.Properties, filter iceberg.BooleanExpression,
caseSensitive bool, concurrency int) (*snapshotProducer, error) {
+ fs, err := t.tbl.fsF(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ if t.meta.NameMapping() == nil {
+ nameMapping := t.meta.CurrentSchema().NameMapping()
+ mappingJson, err := json.Marshal(nameMapping)
+ if err != nil {
+ return nil, err
+ }
+ err = t.SetProperties(iceberg.Properties{DefaultNameMappingKey:
string(mappingJson)})
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ commitUUID := uuid.New()
+ updater := t.updateSnapshot(fs, snapshotProps,
OpDelete).mergeOverwrite(&commitUUID)
Review Comment:
For merge-on-read deletes, this path still builds an overwrite-style
snapshot producer (mergeOverwrite) instead of a RowDelta-style update. In
Iceberg Java, MOR deletes are modeled through newRowDelta()
(BaseTransaction#newRowDelta) and validated via BaseRowDelta conflict checks.
Could we align this flow with RowDelta semantics to avoid divergence in commit
validation behavior?
see:
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/BaseTransaction.java
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/BaseRowDelta.java
##########
table/transaction_test.go:
##########
@@ -536,6 +536,120 @@ func (s *SparkIntegrationTestSuite)
TestDeleteInsensitive() {
+----------+---------+---+`)
}
+func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadUnpartitioned() {
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "first_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "last_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32},
+ )
+
+ tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default",
"go_test_merge_on_read_delete"), icebergSchema,
+ catalog.WithProperties(
+ map[string]string{
+ table.WriteDeleteModeKey:
table.WriteModeMergeOnRead,
+ },
+ ),
+ )
+ s.Require().NoError(err)
+
+ arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true,
false)
+ s.Require().NoError(err)
+
+ initialTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"first_name": "alan", "last_name": "gopher", "age": 7},
+ {"first_name": "steve", "last_name": "gopher", "age":
5},
+ {"first_name": "dead", "last_name": "gopher", "age": 97}
+ ]`,
+ })
+ s.Require().NoError(err)
+ defer initialTable.Release()
+
+ tx := tbl.NewTransaction()
+ err = tx.AppendTable(s.ctx, initialTable, 3, nil)
+ s.Require().NoError(err)
+ tbl, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ // Delete the dead gopher and confirm that alan and steve are still
present
+ filter := iceberg.EqualTo(iceberg.Reference("first_name"), "dead")
+ tx = tbl.NewTransaction()
+ err = tx.Delete(s.ctx, filter, nil)
+ s.Require().NoError(err)
+ _, err = tx.Commit(s.ctx)
+ s.Require().NoError(err)
+
+ output, err := recipe.ExecuteSpark(s.T(), "./validation.py", "--sql",
"SELECT * FROM default.go_test_merge_on_read_delete ORDER BY age")
+ s.Require().NoError(err)
+ s.Require().Contains(output, `|first_name|last_name|age|
++----------+---------+---+
+|steve |gopher |5 |
+|alan |gopher |7 |
++----------+---------+---+`)
+}
+
+func (s *SparkIntegrationTestSuite) TestDeleteMergeOnReadPartitioned() {
+ icebergSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "first_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "last_name", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 3, Name: "age", Type:
iceberg.PrimitiveTypes.Int32},
+ )
+
+ spec := iceberg.NewPartitionSpec(iceberg.PartitionField{
+ SourceID: 3,
+ Name: "age_bucket",
+ Transform: iceberg.BucketTransform{
+ NumBuckets: 2,
+ },
+ })
+ tbl, err := s.cat.CreateTable(s.ctx, catalog.ToIdentifier("default",
"go_test_merge_on_read_delete_partitioned"), icebergSchema,
+ catalog.WithProperties(
+ map[string]string{
+ table.WriteDeleteModeKey:
table.WriteModeMergeOnRead,
+ },
+ ),
+ catalog.WithPartitionSpec(&spec),
+ )
+ s.Require().NoError(err)
+
+ arrowSchema, err := table.SchemaToArrowSchema(icebergSchema, nil, true,
false)
+ s.Require().NoError(err)
+
+ initialTable, err := array.TableFromJSON(memory.DefaultAllocator,
arrowSchema, []string{
+ `[
+ {"first_name": "alan", "last_name": "gopher", "age": 7},
+ {"first_name": "steve", "last_name": "gopher", "age":
5},
+ {"first_name": "dead", "last_name": "gopher", "age": 97}
Review Comment:
nit: missing coma, this should panic
##########
table/arrow_scanner.go:
##########
@@ -473,6 +523,78 @@ func (as *arrowScan) recordsFromTask(ctx context.Context,
task internal.Enumerat
return err
}
+func (as *arrowScan) producePosDeletesFromTask(ctx context.Context, task
internal.Enumerated[FileScanTask], positionalDeletes positionDeletes, out
chan<- enumeratedRecord) (err error) {
+ defer func() {
+ if err != nil {
+ out <- enumeratedRecord{Task: task, Err: err}
+ }
+ }()
+
+ var (
+ rdr internal.FileReader
+ iceSchema *iceberg.Schema
+ colIndices []int
+ filterFunc recProcessFn
+ dropFile bool
+ )
+
+ iceSchema, colIndices, rdr, err = as.prepareToRead(ctx, task.Value.File)
+ if err != nil {
+ return err
+ }
+ defer iceinternal.CheckedClose(rdr, &err)
+
+ fields := append(iceSchema.Fields(),
iceberg.PositionalDeleteSchema.Fields()...)
+ enrichedIcebergSchema := iceberg.NewSchema(iceSchema.ID+1, fields...)
+
+ pipeline := make([]recProcessFn, 0, 2)
+ pipeline = append(pipeline, enrichRecordsWithPosDeleteFields(ctx,
task.Value.File))
+ if len(positionalDeletes) > 0 {
+ deletes := set[int64]{}
+ for _, chunk := range positionalDeletes {
+ for _, a := range chunk.Chunks() {
+ for _, v := range
a.(*array.Int64).Int64Values() {
+ deletes[v] = struct{}{}
+ }
+ }
+ }
+
+ pipeline = append(pipeline, processPositionalDeletes(ctx,
deletes))
+ }
+
+ filterFunc, dropFile, err = as.getRecordFilter(ctx, iceSchema)
+ if err != nil {
+ return err
+ }
+
+ // Nothing to delete in a dropped file
+ if dropFile {
+ var emptySchema *arrow.Schema
+ emptySchema, err = SchemaToArrowSchema(as.projectedSchema, nil,
false, as.useLargeTypes)
Review Comment:
nit: when dropFile is true (filter resolves to AlwaysFalse for this file, so
nothing to delete), the empty sentinel batch is built with the data schema
(projectedSchema) rather than PositionalDeleteArrowSchema.
It's harmless today because createIterator skips zero-row batches, but it's
a latent schema mismatch. Suggest using PositionalDeleteArrowSchema here to
keep things consistent with the rest of the pipeline.
##########
table/transaction.go:
##########
@@ -1212,6 +1282,117 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
return result, nil
}
+// writePositionDeletesForFiles rewrites data files by preserving only rows
that do NOT match the filter
+func (t *Transaction) writePositionDeletesForFiles(ctx context.Context, fs
io.IO, updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, caseSensitive bool, concurrency int) error {
+ posDeleteRecIter, err := t.makePositionDeleteRecordsForFilter(ctx, fs,
files, filter, caseSensitive, concurrency)
+ if err != nil {
+ return err
+ }
+
+ partitionDataPerFile := make(map[string]map[int]any, len(files))
+ for _, df := range files {
+ partitionPath, _ := path.Split(df.FilePath())
+ partitionDataPerFile[partitionPath] = df.Partition()
Review Comment:
Partition routing is currently keyed by the **parent directory string**
(`path.Split(df.FilePath())`). That’s a path convention, not something
guaranteed by the Iceberg spec.
If two data files from different partitions share the same directory
(possible with a custom `LocationProvider` or flat `write.data.path`), the
second entry silently overwrites the first. As a result, position delete files
can be written with incorrect partition values.
In Java, routing is never derived from the file path. It is keyed by the
`(PartitionSpec, partition)` pair taken directly from the data file metadata.
See
`SparkPositionDeltaWrite.DeleteOnlyDeltaWriter.delete()`:
[https://github.com/apache/iceberg/blob/master/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java](https://github.com/apache/iceberg/blob/master/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java)
```java
int specId = metadata.getInt(specIdOrdinal);
PartitionSpec spec = specs.get(specId);
InternalRow partition = metadata.getStruct(partitionOrdinal, ...);
delegate.write(positionDelete, spec, partitionProjection); // (spec,
partition)
```
And `ClusteredWriter.write()` opens a new writer per unique `(spec,
partition)` pair — again, not path-based.
**Simplest compatible fix:**
Key the routing map by the full `df.FilePath()` (not its parent directory),
and in `processBatch` look up by the full `file_path` column value instead of
stripping the filename.
This is a potential data corruption, so CRIT.
##########
table/pos_delete_partitioned_fanout_writer.go:
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "fmt"
+ "iter"
+ "path"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/iceberg-go"
+ "golang.org/x/sync/errgroup"
+)
+
+// positionDeletePartitionedFanoutWriter distributes Arrow position delete
records across multiple partitions based on
+// a partition specification, writing data to separate delete files for each
partition using
+// a fanout pattern with configurable parallelism.
+type positionDeletePartitionedFanoutWriter struct {
+ partitionSpec iceberg.PartitionSpec
+ partitionDataByFilePath map[string]map[int]any
+ schema *iceberg.Schema
+ itr iter.Seq2[arrow.RecordBatch, error]
+ writerFactory *writerFactory
+ concurrentDataFileWriter *concurrentDataFileWriter
+}
+
+// newPositionDeletePartitionedFanoutWriter creates a new
PartitionedFanoutWriter with the specified
+// partition specification, schema, and record iterator.
+func newPositionDeletePartitionedFanoutWriter(partitionSpec
iceberg.PartitionSpec, concurrentWriter *concurrentDataFileWriter,
partitionDataByFilePath map[string]map[int]any, itr
iter.Seq2[arrow.RecordBatch, error], writerFactory *writerFactory)
*positionDeletePartitionedFanoutWriter {
+ return &positionDeletePartitionedFanoutWriter{
+ partitionSpec: partitionSpec,
+ partitionDataByFilePath: partitionDataByFilePath,
+ schema: iceberg.PositionalDeleteSchema,
+ itr: itr,
+ writerFactory: writerFactory,
+ concurrentDataFileWriter: concurrentWriter,
+ }
+}
+
+// Write writes the Arrow records to the specified location using a fanout
pattern with
+// the specified number of workers. The returned iterator yields the data
files written
+// by the fanout process.
+func (p *positionDeletePartitionedFanoutWriter) Write(ctx context.Context,
workers int) iter.Seq2[iceberg.DataFile, error] {
+ inputRecordsCh := make(chan arrow.RecordBatch, workers)
+ outputDataFilesCh := make(chan iceberg.DataFile, workers)
+
+ fanoutWorkers, ctx := errgroup.WithContext(ctx)
+ startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh)
+
+ for range workers {
+ fanoutWorkers.Go(func() error {
+ return p.fanout(ctx, inputRecordsCh, outputDataFilesCh)
+ })
+ }
+
+ return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh)
+}
+
+func (p *positionDeletePartitionedFanoutWriter) fanout(ctx context.Context,
inputRecordsCh <-chan arrow.RecordBatch, dataFilesChannel chan<-
iceberg.DataFile) error {
+ for {
+ select {
+ case <-ctx.Done():
+ return context.Cause(ctx)
+
+ case record, ok := <-inputRecordsCh:
+ if !ok {
+ return nil
+ }
+
+ err := p.processBatch(ctx, record, dataFilesChannel)
+ if err != nil {
+ return err
+ }
+ }
+ }
+}
+
+func (p *positionDeletePartitionedFanoutWriter) processBatch(ctx
context.Context, batch arrow.RecordBatch, dataFilesChannel chan<-
iceberg.DataFile) (err error) {
+ defer batch.Release()
+
+ select {
+ case <-ctx.Done():
+ return context.Cause(ctx)
+ default:
+ }
+
+ if batch.NumRows() == 0 {
+ return
+ }
+
+ columns := batch.Columns()
+ filePath := columns[0].(*array.String)
+ partitionPath, _ := path.Split(filePath.Value(0))
+
+ partitionValues, ok := p.partitionDataByFilePath[partitionPath]
Review Comment:
same as above
##########
table/snapshot_producers.go:
##########
@@ -628,11 +603,55 @@ func (sp *snapshotProducer) manifests() (_
[]iceberg.ManifestFile, err error) {
return nil, err
}
- manifests := slices.Concat(results[0], results[1], results[2])
+ manifests := slices.Concat(addedManifests, positionDeleteManifests,
deletedFilesManifests, existingManifests)
return sp.processManifests(manifests)
}
+func (sp *snapshotProducer) manifestProducer(content iceberg.ManifestContent,
files []iceberg.DataFile, output *[]iceberg.ManifestFile) func() (err error) {
+ return func() (err error) {
+ out, path, err := sp.newManifestOutput()
+ if err != nil {
+ return err
+ }
+ defer internal.CheckedClose(out, &err)
+
+ counter := &internal.CountingWriter{W: out}
+ currentSpec, err := sp.txn.meta.CurrentSpec()
+ if err != nil || currentSpec == nil {
+ return fmt.Errorf("could not get current partition
spec: %w", err)
+ }
+ wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion,
counter,
+ *currentSpec, sp.txn.meta.CurrentSchema(),
+ sp.snapshotID)
+ if err != nil {
+ return err
+ }
+ defer internal.CheckedClose(wr, &err)
+
+ for _, df := range files {
+ err :=
wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
+ nil, nil, df))
+ if err != nil {
+ return err
+ }
+ }
+
+ // close the writer to force a flush and ensure counter.Count
is accurate
+ if err := wr.Close(); err != nil {
+ return err
+ }
+
+ mf, err := wr.ToManifestFile(path, counter.Count,
iceberg.WithManifestFileContent(content))
+ if err != nil {
+ return err
+ }
+ *output = []iceberg.ManifestFile{mf}
+
+ return nil
+ }
+}
+
func (sp *snapshotProducer) summary(props iceberg.Properties) (Summary, error)
{
Review Comment:
summary() is never updated to iterate positionDeleteFiles, so MoR delete
snapshots will always report zero added-delete-files,
added-position-delete-files, and added-position-deletes. The fix is a sibling
loop to the existing addedFiles one in summary() — ssc.addFile already handles
EntryContentPosDeletes correctly, the call just never happens for this slice.
(could be a follow up to this PR)
##########
table/transaction.go:
##########
@@ -1212,6 +1282,117 @@ func (t *Transaction) rewriteSingleFile(ctx
context.Context, fs io.IO, originalF
return result, nil
}
+// writePositionDeletesForFiles rewrites data files by preserving only rows
that do NOT match the filter
+func (t *Transaction) writePositionDeletesForFiles(ctx context.Context, fs
io.IO, updater *snapshotProducer, files []iceberg.DataFile, filter
iceberg.BooleanExpression, caseSensitive bool, concurrency int) error {
+ posDeleteRecIter, err := t.makePositionDeleteRecordsForFilter(ctx, fs,
files, filter, caseSensitive, concurrency)
+ if err != nil {
+ return err
+ }
+
+ partitionDataPerFile := make(map[string]map[int]any, len(files))
+ for _, df := range files {
+ partitionPath, _ := path.Split(df.FilePath())
+ partitionDataPerFile[partitionPath] = df.Partition()
+ }
+
+ posDeleteFiles := positionDeleteRecordsToDataFiles(ctx,
t.tbl.Location(), t.meta, partitionDataPerFile, recordWritingArgs{
Review Comment:
nit: writeUUID is not set here, so positionDeleteRecordsToDataFiles
generates a fresh random UUID for the delete file names. Position delete files
that belong to the same commit should share the commitUUID for traceability —
it makes it easy to correlate which files were written together.
Maybe better passing writeUUID: &commitUUID.
##########
table/arrow_utils.go:
##########
@@ -1358,12 +1370,87 @@ func recordsToDataFiles(ctx context.Context,
rootLocation string, meta *Metadata
}
}
- return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
- } else {
- rollingDataWriters := NewWriterFactory(rootLocation, args,
meta, taskSchema, targetFileSize)
- partitionWriter := newPartitionedFanoutWriter(*currentSpec,
meta.CurrentSchema(), args.itr, &rollingDataWriters)
- workers := config.EnvConfig.MaxWorkers
+ return cw.writeFiles(ctx, rootLocation, args.fs, meta,
meta.props, nil, tasks)
+ }
+
+ factory := NewWriterFactory(rootLocation, args, meta, taskSchema,
targetFileSize)
+ partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw,
meta.CurrentSchema(), args.itr, &factory)
+ workers := config.EnvConfig.MaxWorkers
- return partitionWriter.Write(ctx, workers)
+ return partitionWriter.Write(ctx, workers)
+}
+
+func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation
string, meta *MetadataBuilder, partitionDataPerFile map[string]map[int]any,
args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
+ if args.counter == nil {
+ args.counter = internal.Counter(0)
}
+
+ defer func() {
+ if r := recover(); r != nil {
+ var err error
+ switch e := r.(type) {
+ case error:
+ err = fmt.Errorf("error encountered during
position delete file writing: %w", e)
+ default:
+ err = fmt.Errorf("error encountered during
position delete file writing: %v", e)
+ }
+ ret = func(yield func(iceberg.DataFile, error) bool) {
+ yield(nil, err)
+ }
+ }
+ }()
+
+ if args.writeUUID == nil {
+ u := uuid.Must(uuid.NewRandom())
+ args.writeUUID = &u
+ }
+
+ targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
+ WriteTargetFileSizeBytesDefault))
+
+ currentSpec, err := meta.CurrentSpec()
Review Comment:
All position delete files are currently written using the table’s current
partition spec, regardless of which spec the referenced data files were written
under.
For tables that have undergone partition spec evolution, this can produce
delete manifest entries with partition values encoded against the wrong spec.
Readers that perform partition-based pruning on delete manifests may then
incorrectly skip relevant deletes.
In Java, PositionDeleteWriter writes each delete file using the spec of the
data file it references, not the table’s current spec.
At minimum, we should validate (and clearly document) that this
implementation is only safe for tables that have not evolved their partition
spec. Ideally, it should mirror the Java behavior and write delete files
against the matching data file spec.
This one is also CRIT, i think this could lead to data corruption.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]