This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new be4cf4cf feat(table): add RewriteFiles snapshot-op builder (#1033)
be4cf4cf is described below
commit be4cf4cfed619e4940829935b305977210b58b12
Author: Tobias Pütz <[email protected]>
AuthorDate: Tue May 12 11:58:53 2026 +0200
feat(table): add RewriteFiles snapshot-op builder (#1033)
Decompose the atomic rewrite-snapshot of `Transaction.RewriteDataFiles`
which assumes in-process compaction to enable out-of-process distributed
compaction, by exposing a builder similar to what iceberg-java has with
`Table.newRewrite()`.
Mirrors iceberg-java's RewriteFiles (Table.newRewrite()), sitting beside
RowDelta. Suppresses the overwrite producer's default validator and
queues a rewrite-specific conflict validator internally, so
RewriteDataFiles and external callers (distributed compaction
coordinators) commit the same shape through one indivisible operation.
Transaction.NewRewrite(snapshotProps) returns *RewriteFiles with
DeleteFile / AddDataFile / Apply / Commit. ExecuteCompactionGroup is the
worker-side read+write step, returning a plain-data
CompactionGroupResult.
RewriteDataFiles drives the builder internally. Partial-progress mode
now commits per group inside the loop, fixing a latent bug where a
mid-loop failure left staged groups uncovered by the rewrite validator.
Tag rewrite snapshots OpReplace instead of OpOverwrite (matching
iceberg-java's BaseRewriteFiles.operation()) — gated on
cfg.rewriteSemantics so generic ReplaceFiles callers, where row content
can change, keep OpOverwrite. This also addresses #841 and should
address the comments made in #867 since only the unexported
`cfg.rewriteSemantics` makes OpOverwrite -> OpReplace and
`RewriteFiles.Commit()` is the only caller of `withRewriteSemantics()`.
Black-box coverage: equivalence with RewriteDataFiles, safe pos-delete
expunge, concurrent eq-delete rejection under refresh-and-replay.
---------
Signed-off-by: Tobias Pütz <[email protected]>
---
table/rewrite_data_files.go | 368 ++++++++++++++++------
table/rewrite_data_files_test.go | 137 +++++++++
table/rewrite_files.go | 240 +++++++++++++++
table/rewrite_files_test.go | 636 +++++++++++++++++++++++++++++++++++++++
table/row_delta.go | 2 +-
table/transaction.go | 37 ++-
6 files changed, 1320 insertions(+), 100 deletions(-)
diff --git a/table/rewrite_data_files.go b/table/rewrite_data_files.go
index 911488a9..e2cee62f 100644
--- a/table/rewrite_data_files.go
+++ b/table/rewrite_data_files.go
@@ -20,6 +20,7 @@ package table
import (
"context"
"fmt"
+ "maps"
"github.com/apache/iceberg-go"
)
@@ -58,7 +59,8 @@ type RewriteResult struct {
// import between table and table/compaction.
//
// Use [compaction.Config.PlanCompaction] to produce groups, then convert
-// [compaction.Group] → [CompactionTaskGroup] to call
[Transaction.RewriteDataFiles].
+// [compaction.Group] → [CompactionTaskGroup] to call
+// [Transaction.RewriteDataFiles] or [ExecuteCompactionGroup].
type CompactionTaskGroup struct {
// PartitionKey is an opaque grouping key for display/logging.
PartitionKey string
@@ -70,34 +72,120 @@ type CompactionTaskGroup struct {
TotalSizeBytes int64
}
+// CompactionGroupResult is the per-group output of a compaction
+// worker: the new files written, the old files being replaced, and
+// the position delete files safe to expunge in the rewrite snapshot.
+//
+// A distributed coordinator aggregates results from N workers and
+// applies them to a [RewriteFiles] builder via [RewriteFiles.Apply]
+// to commit a single atomic snapshot. Each field is plain data
+// ([]iceberg.DataFile values plus scalars) — callers serialize the
+// contained DataFiles across process boundaries themselves; the
+// typical pattern is to have the worker write a manifest containing
+// the new files and ship the manifest path to the coordinator, which
+// re-reads it.
+type CompactionGroupResult struct {
+ // PartitionKey mirrors [CompactionTaskGroup.PartitionKey] for
+ // display/logging on the coordinator.
+ PartitionKey string
+
+ // OldDataFiles are the data files this group replaces.
+ OldDataFiles []iceberg.DataFile
+
+ // NewDataFiles are the consolidated outputs the worker wrote.
+ NewDataFiles []iceberg.DataFile
+
+ // SafePosDeletes are position-delete files referenced by tasks in
+ // this group whose target data file is being rewritten, computed
+ // via [CollectSafePositionDeletes]. They are safe to expunge in
+ // the rewrite snapshot.
+ SafePosDeletes []iceberg.DataFile
+
+ // BytesBefore is [CompactionTaskGroup.TotalSizeBytes] passed
+ // through, recorded so the coordinator can roll up metrics
+ // without re-reading the plan.
+ BytesBefore int64
+
+ // BytesAfter is the sum of [iceberg.DataFile.FileSizeBytes] across
+ // NewDataFiles.
+ BytesAfter int64
+}
+
// RewriteDataFilesOptions bundles the per-rewrite knobs for
-// Transaction.RewriteDataFiles.
+// [Transaction.RewriteDataFiles].
type RewriteDataFilesOptions struct {
- // PartialProgress, when true, stages each group via ReplaceFiles
- // inside the loop so work survives a mid-loop write failure. When
- // false (the default), all groups are committed in a single atomic
- // snapshot.
+ // PartialProgress, when true, stages each group as its own
+ // rewrite snapshot inside the loop so a mid-loop write failure
+ // leaves the already-completed groups staged on this transaction
+ // (the in-memory transaction can be discarded by group rather
+ // than wholesale). When false (the default), every group lands in
+ // a single atomic rewrite snapshot.
//
- // In both modes the final catalog commit happens once at
- // Transaction.Commit() time. True per-group durability (matching
- // Java's behavior) requires committing separate transactions per
- // group, which is left to the caller.
+ // In both modes the catalog commit happens once at
+ // [Transaction.Commit] time, so a process crash mid-loop loses
+ // every staged group regardless of this flag. Callers who need
+ // true per-group catalog durability (matching Java's behavior)
+ // should drive [Transaction.NewRewrite] themselves and commit a
+ // fresh transaction per group.
PartialProgress bool
// SnapshotProps are added to the rewrite snapshot's summary.
+ // In partial-progress mode the same properties land on every
+ // per-group snapshot rather than being summed or split.
SnapshotProps iceberg.Properties
// ExtraDeleteFilesToRemove are delete files (typically equality
// deletes that are dead after the rewrite) that the caller wants
- // expunged in the same snapshot as the rewrite. The executor
- // passes them through to ReplaceFiles unchanged. Honored only
- // when PartialProgress is false.
+ // expunged in the same snapshot as the rewrite. Honored only when
+ // PartialProgress is false.
//
// Use [compaction.CollectDeadEqualityDeletes] to compute this list
// from the current snapshot. Position delete files that are fully
// applied are removed automatically and do NOT need to be passed
// in here.
ExtraDeleteFilesToRemove []iceberg.DataFile
+
+ // GroupOptions are forwarded to every [ExecuteCompactionGroup]
+ // call to tune the per-group read+write pipeline (target file
+ // size, scan concurrency). See the With* helpers returning
+ // [CompactionGroupOption].
+ GroupOptions []CompactionGroupOption
+}
+
+// CompactionGroupOption configures a single [ExecuteCompactionGroup]
+// call. Use the With* helpers to construct values.
+type CompactionGroupOption func(*compactionGroupConfig)
+
+type compactionGroupConfig struct {
+ targetFileSize int64
+ scanConcurrency int
+}
+
+// WithCompactionTargetFileSize sets the size target for output files
+// written by [ExecuteCompactionGroup]. Forwarded to [WriteRecords] as
+// [WithTargetFileSize]. A non-positive value (including the zero
+// default) means inherit the table's `write.target-file-size-bytes`
+// property.
+func WithCompactionTargetFileSize(size int64) CompactionGroupOption {
+ if size <= 0 {
+ return func(*compactionGroupConfig) {}
+ }
+
+ return func(c *compactionGroupConfig) {
+ c.targetFileSize = size
+ }
+}
+
+// WithCompactionScanConcurrency sets the scan concurrency used when
+// reading the group's tasks. Forwarded to [Table.Scan] as
+// [WitMaxConcurrency]. Zero (the default) means runtime.GOMAXPROCS.
+//
+// TODO: the [WitMaxConcurrency] link enshrines a pre-existing typo
+// (missing `h`). Update this reference when that symbol is renamed.
+func WithCompactionScanConcurrency(n int) CompactionGroupOption {
+ return func(c *compactionGroupConfig) {
+ c.scanConcurrency = n
+ }
}
// RewriteDataFiles compacts the given groups by reading data with
@@ -116,22 +204,20 @@ type RewriteDataFilesOptions struct {
//
// Use [compaction.Config.PlanCompaction] to produce the groups, then
// convert [compaction.Group] → [CompactionTaskGroup] and pass them
-// here.
+// here. Distributed coordinators stage worker results via
+// [ExecuteCompactionGroup] and commit them via [Transaction.NewRewrite]
+// + [RewriteFiles.Apply] + [RewriteFiles.Commit] instead.
func (t *Transaction) RewriteDataFiles(ctx context.Context, groups
[]CompactionTaskGroup, opts RewriteDataFilesOptions) (*RewriteResult, error) {
if len(groups) == 0 {
return &RewriteResult{}, nil
}
- // Use an unfiltered scan to read all surviving rows. Compaction must
- // preserve every non-deleted row in the data files being rewritten.
- scan := t.tbl.Scan()
- result := &RewriteResult{}
+ if opts.PartialProgress {
+ return t.rewriteDataFilesPartial(ctx, groups, opts)
+ }
- var (
- allOldData []iceberg.DataFile
- allNewData []iceberg.DataFile
- allOldDeletes []iceberg.DataFile
- )
+ result := &RewriteResult{}
+ rewrite := t.NewRewrite(opts.SnapshotProps)
for _, group := range groups {
if err := ctx.Err(); err != nil {
@@ -142,91 +228,168 @@ func (t *Transaction) RewriteDataFiles(ctx
context.Context, groups []CompactionT
continue
}
- // Read with deletes applied.
- arrowSchema, records, err := scan.ReadTasks(ctx, group.Tasks)
+ gr, err := ExecuteCompactionGroup(ctx, t.tbl, group,
opts.GroupOptions...)
if err != nil {
- return result, fmt.Errorf("read tasks for compaction
group %q: %w", group.PartitionKey, err)
+ return result, err
}
- // Each compaction group is single-partition by construction,
so the
- // read stream is trivially clustered and we can use the
clustered writer.
- var newFiles []iceberg.DataFile
- for df, err := range WriteRecords(ctx, t.tbl, arrowSchema,
records, WithClusteredWrite()) {
- if err != nil {
- return result, fmt.Errorf("write compacted
files for group %q: %w", group.PartitionKey, err)
- }
- newFiles = append(newFiles, df)
+ if len(gr.OldDataFiles) == 0 && len(gr.NewDataFiles) == 0 {
+ continue
}
- // Collect old data files.
- oldDataFiles := make([]iceberg.DataFile, 0, len(group.Tasks))
- for _, task := range group.Tasks {
- oldDataFiles = append(oldDataFiles, task.File)
+ rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles,
gr.SafePosDeletes)
+ accumulateGroupMetrics(result, gr)
+ }
+
+ if result.RewrittenGroups == 0 {
+ return result, nil
+ }
+
+ for _, df := range opts.ExtraDeleteFilesToRemove {
+ rewrite.DeleteFile(df)
+ result.RemovedEqualityDeleteFiles++
+ }
+
+ if err := rewrite.Commit(ctx); err != nil {
+ return result, fmt.Errorf("commit compaction: %w", err)
+ }
+
+ return result, nil
+}
+
+// ExecuteCompactionGroup reads a compaction group's tasks (with
+// deletes applied), writes consolidated output files via
+// [WriteRecords], and computes the position-delete files safe to
+// expunge in the rewrite snapshot. It does not commit — the caller
+// hands the result to a coordinator that uses [Transaction.NewRewrite]
+// + [RewriteFiles.Apply] + [RewriteFiles.Commit] to stage the
+// atomic commit.
+//
+// Empty groups return a zero [CompactionGroupResult] without doing
+// any I/O.
+//
+// In-process callers should prefer [Transaction.RewriteDataFiles],
+// which drives this and the commit step in one call.
+//
+// Tunables are exposed via [CompactionGroupOption]. The clustered
+// write path is always used (a compaction group is single-partition
+// by construction so its read stream is trivially clustered).
+func ExecuteCompactionGroup(ctx context.Context, tbl *Table, group
CompactionTaskGroup, opts ...CompactionGroupOption) (CompactionGroupResult,
error) {
+ if len(group.Tasks) == 0 {
+ return CompactionGroupResult{PartitionKey: group.PartitionKey},
nil
+ }
+
+ cfg := compactionGroupConfig{}
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ var scanOpts []ScanOption
+ if cfg.scanConcurrency > 0 {
+ scanOpts = append(scanOpts,
WitMaxConcurrency(cfg.scanConcurrency))
+ }
+
+ arrowSchema, records, err := tbl.Scan(scanOpts...).ReadTasks(ctx,
group.Tasks)
+ if err != nil {
+ return CompactionGroupResult{}, fmt.Errorf("read tasks for
compaction group %q: %w", group.PartitionKey, err)
+ }
+
+ // Each compaction group is single-partition by construction, so the
+ // read stream is trivially clustered and we can use the clustered
writer.
+ writeOpts := []WriteRecordOption{WithClusteredWrite()}
+ if cfg.targetFileSize > 0 {
+ writeOpts = append(writeOpts,
WithTargetFileSize(cfg.targetFileSize))
+ }
+
+ var (
+ newFiles []iceberg.DataFile
+ bytesAfter int64
+ )
+ for df, err := range WriteRecords(ctx, tbl, arrowSchema, records,
writeOpts...) {
+ if err != nil {
+ return CompactionGroupResult{}, fmt.Errorf("write
compacted files for group %q: %w", group.PartitionKey, err)
}
+ newFiles = append(newFiles, df)
+ bytesAfter += df.FileSizeBytes()
+ }
+
+ oldFiles := make([]iceberg.DataFile, 0, len(group.Tasks))
+ for _, task := range group.Tasks {
+ oldFiles = append(oldFiles, task.File)
+ }
+
+ return CompactionGroupResult{
+ PartitionKey: group.PartitionKey,
+ OldDataFiles: oldFiles,
+ NewDataFiles: newFiles,
+ SafePosDeletes: CollectSafePositionDeletes(group.Tasks),
+ BytesBefore: group.TotalSizeBytes,
+ BytesAfter: bytesAfter,
+ }, nil
+}
- // Collect position delete files safe to remove.
- safeDeletes := collectSafePositionDeletes(group.Tasks)
+// rewriteDataFilesPartial stages each group as its own rewrite
+// snapshot via [Transaction.ReplaceFiles] directly. Per-group staging
+// lets a mid-loop write failure leave already-staged groups on the
+// transaction; the catalog still receives them at
+// [Transaction.Commit] time.
+//
+// Validator registration is coalesced: a single [rewriteValidator]
+// covering every rewritten path across all groups is registered once,
+// after the loop, instead of one per group. The transaction's
+// validator list otherwise grows linearly with the group count, and
+// each entry independently walks the concurrent-snapshot set on
+// refresh-replay — the union walk subsumes them.
+func (t *Transaction) rewriteDataFilesPartial(ctx context.Context, groups
[]CompactionTaskGroup, opts RewriteDataFilesOptions) (*RewriteResult, error) {
+ result := &RewriteResult{}
+ props := maps.Clone(opts.SnapshotProps)
+ var allRewritten []string
- // Update result metrics.
- var bytesAfter int64
- for _, df := range newFiles {
- bytesAfter += df.FileSizeBytes()
+ for _, group := range groups {
+ if err := ctx.Err(); err != nil {
+ return result, err
}
- result.RewrittenGroups++
- result.AddedDataFiles += len(newFiles)
- result.RemovedDataFiles += len(oldDataFiles)
- result.RemovedPositionDeleteFiles += len(safeDeletes)
- result.BytesBefore += group.TotalSizeBytes
- result.BytesAfter += bytesAfter
-
- // Always accumulate across groups; partial-progress mode also
- // stages each group via ReplaceFiles so work survives a
- // mid-loop write failure, but the final catalog commit is
- // always one atomic doCommit at Transaction.Commit() time.
- allOldData = append(allOldData, oldDataFiles...)
- allNewData = append(allNewData, newFiles...)
- allOldDeletes = append(allOldDeletes, safeDeletes...)
-
- if opts.PartialProgress {
- if err := t.ReplaceFiles(ctx, oldDataFiles, newFiles,
safeDeletes, opts.SnapshotProps, withRewriteSemantics()); err != nil {
- return result, fmt.Errorf("commit compaction
group %q: %w", group.PartitionKey, err)
- }
+ if len(group.Tasks) == 0 {
+ continue
}
- }
- // Register the rewrite-specific conflict validator covering every
- // rewritten data file across every group. The validator list is
- // drained at Transaction.Commit() → doCommit. Runs alongside the
- // overwrite producer's suppressed validator (via
- // withRewriteSemantics) so concurrent pos/eq-deletes targeting a
- // rewritten file are caught pre-flight.
- if len(allOldData) > 0 {
- rewritten := make([]string, 0, len(allOldData))
- for _, f := range allOldData {
- rewritten = append(rewritten, f.FilePath())
+ gr, err := ExecuteCompactionGroup(ctx, t.tbl, group,
opts.GroupOptions...)
+ if err != nil {
+ return result, err
}
- t.validators = append(t.validators, rewriteValidator(rewritten))
- }
- if !opts.PartialProgress {
- // Caller-supplied dead eq-deletes (typically from
- // [compaction.CollectDeadEqualityDeletes]). The caller is
- // responsible for computing these against the same snapshot
- // this transaction is staged on.
- if len(opts.ExtraDeleteFilesToRemove) > 0 {
- allOldDeletes = append(allOldDeletes,
opts.ExtraDeleteFilesToRemove...)
- result.RemovedEqualityDeleteFiles +=
len(opts.ExtraDeleteFilesToRemove)
+ if len(gr.OldDataFiles) == 0 && len(gr.NewDataFiles) == 0 {
+ continue
}
- if err := t.ReplaceFiles(ctx, allOldData, allNewData,
allOldDeletes, opts.SnapshotProps, withRewriteSemantics()); err != nil {
- return result, fmt.Errorf("commit compaction: %w", err)
+ if err := t.ReplaceFiles(ctx, gr.OldDataFiles, gr.NewDataFiles,
gr.SafePosDeletes,
+ props, withRewriteSemantics()); err != nil {
+ return result, fmt.Errorf("commit compaction group %q:
%w", group.PartitionKey, err)
}
+
+ for _, f := range gr.OldDataFiles {
+ allRewritten = append(allRewritten, f.FilePath())
+ }
+ accumulateGroupMetrics(result, gr)
+ }
+
+ if len(allRewritten) > 0 {
+ t.addValidator(rewriteValidator(allRewritten))
}
return result, nil
}
+func accumulateGroupMetrics(r *RewriteResult, gr CompactionGroupResult) {
+ r.RewrittenGroups++
+ r.AddedDataFiles += len(gr.NewDataFiles)
+ r.RemovedDataFiles += len(gr.OldDataFiles)
+ r.RemovedPositionDeleteFiles += len(gr.SafePosDeletes)
+ r.BytesBefore += gr.BytesBefore
+ r.BytesAfter += gr.BytesAfter
+}
+
// rewriteValidator builds a conflictValidatorFunc that rejects the
// commit if a concurrent snapshot added delete files pointing at any
// of the rewritten data-file paths (or eq-deletes during the rewrite,
@@ -242,19 +405,38 @@ func rewriteValidator(rewrittenPaths []string)
conflictValidatorFunc {
}
}
-// collectSafePositionDeletes returns position delete files from the given
-// tasks that are safe to remove during compaction.
+// CollectSafePositionDeletes returns position delete files from the
+// given tasks that are safe to remove during compaction.
//
-// A position delete file is safe to remove when it was matched to a data
-// file (via scan planning) and that data file is being rewritten in this
-// compaction group. Since ReadTasks applies the deletes during reading,
-// the new output files will not contain the deleted rows.
+// A position delete file is safe to remove when it was matched to a
+// data file (via scan planning) and that data file is being rewritten
+// in this compaction group. Since ReadTasks applies the deletes during
+// reading, the new output files will not contain the deleted rows.
//
// Only position deletes (EntryContentPosDeletes) are considered.
// Equality deletes are decided by [compaction.DecideDeadEqualityDeletes]
// (which needs partition-wide visibility, not just the task scope).
// Deletion vectors will be handled when DV read support lands.
-func collectSafePositionDeletes(tasks []FileScanTask) []iceberg.DataFile {
+//
+// Caller contract: every data file referenced by a returned pos-delete
+// must be in the caller's rewrite set across the entire commit.
+// This function only sees one group's tasks, but a pos-delete file
+// can reference data files across multiple groups (the planner
+// bin-packs within a partition via [compaction.Config.PlanCompaction]
+// and skips files via MinInputFiles). If a pos-delete is reported safe
+// by one group but references a still-live data file in another group
+// — or a file the planner skipped — committing only this group's
+// rewrite would orphan the still-live data file's deletes. Coordinators
+// that aggregate multiple groups into one rewrite snapshot are
+// responsible for re-checking against the full set of rewritten paths,
+// or for moving this computation leader-side once worker outputs have
+// aggregated.
+//
+// [ExecuteCompactionGroup] calls this internally to populate
+// [CompactionGroupResult.SafePosDeletes]. It is kept exported for
+// custom workers that want the spec-shaped predicate without taking
+// the rest of [ExecuteCompactionGroup]'s read+write pipeline.
+func CollectSafePositionDeletes(tasks []FileScanTask) []iceberg.DataFile {
seen := make(map[string]bool)
var safe []iceberg.DataFile
diff --git a/table/rewrite_data_files_test.go b/table/rewrite_data_files_test.go
index ce12a8fd..22d295df 100644
--- a/table/rewrite_data_files_test.go
+++ b/table/rewrite_data_files_test.go
@@ -243,6 +243,143 @@ func TestRewriteDataFiles_EmptyPlan(t *testing.T) {
assert.Equal(t, int64(0), result.BytesBefore)
}
+// TestExecuteCompactionGroup_TargetFileSizeForwarded verifies that
+// WithCompactionTargetFileSize reaches the underlying WriteRecords
+// call: a tiny target size on a multi-row group must force the
+// writer to emit more than one output file.
+func TestExecuteCompactionGroup_TargetFileSizeForwarded(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+
+ arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false,
false)
+ require.NoError(t, err)
+
+ for i := range 5 {
+ dataPath := tbl.Location() +
fmt.Sprintf("/data/file-%d.parquet", i)
+ writeParquetFile(t, dataPath, arrowSc,
+ fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+ tx := tbl.NewTransaction()
+ require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath},
nil, false))
+ tbl, err = tx.Commit(t.Context())
+ require.NoError(t, err)
+ }
+
+ tasks, err := tbl.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+ require.Len(t, tasks, 5)
+
+ // Build the group manually so option-forwarding is decoupled from
+ // the planner's bin-packing / MinInputFiles knobs.
+ scanTasks := make([]table.FileScanTask, len(tasks))
+ var totalSize int64
+ for i, st := range tasks {
+ scanTasks[i] = st
+ totalSize += st.File.FileSizeBytes()
+ }
+ group := table.CompactionTaskGroup{
+ PartitionKey: "single",
+ Tasks: scanTasks,
+ TotalSizeBytes: totalSize,
+ }
+
+ withTiny, err := table.ExecuteCompactionGroup(t.Context(), tbl, group,
+ table.WithCompactionTargetFileSize(1))
+ require.NoError(t, err)
+ assert.Greater(t, len(withTiny.NewDataFiles), 1,
+ "WithCompactionTargetFileSize(1) must force the writer to roll
over per row")
+
+ withDefault, err := table.ExecuteCompactionGroup(t.Context(), tbl,
group)
+ require.NoError(t, err)
+ assert.Len(t, withDefault.NewDataFiles, 1,
+ "without the option, the same group consolidates into a single
file")
+}
+
+// TestExecuteCompactionGroup_ScanConcurrencyForwarded is a smoke test
+// confirming WithCompactionScanConcurrency is wired through without
+// breaking the read path. We can't easily observe scan parallelism
+// from the result, so the assertion is correctness equivalence with a
+// no-option baseline: same group → same files in / out.
+func TestExecuteCompactionGroup_ScanConcurrencyForwarded(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+
+ arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false,
false)
+ require.NoError(t, err)
+
+ for i := range 3 {
+ dataPath := tbl.Location() +
fmt.Sprintf("/data/file-%d.parquet", i)
+ writeParquetFile(t, dataPath, arrowSc,
+ fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+ tx := tbl.NewTransaction()
+ require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath},
nil, false))
+ tbl, err = tx.Commit(t.Context())
+ require.NoError(t, err)
+ }
+
+ tasks, err := tbl.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+
+ plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+ require.NoError(t, err)
+ require.NotEmpty(t, plan.Groups)
+
+ group := toTaskGroups(plan.Groups)[0]
+
+ withOption, err := table.ExecuteCompactionGroup(t.Context(), tbl, group,
+ table.WithCompactionScanConcurrency(1))
+ require.NoError(t, err)
+
+ baseline, err := table.ExecuteCompactionGroup(t.Context(), tbl, group)
+ require.NoError(t, err)
+
+ assert.Equal(t, len(baseline.OldDataFiles),
len(withOption.OldDataFiles),
+ "setting scan concurrency must not change the set of old files
read")
+ assert.Equal(t, len(baseline.NewDataFiles),
len(withOption.NewDataFiles),
+ "setting scan concurrency must not change the set of
consolidated outputs")
+}
+
+// TestRewriteDataFiles_GroupOptionsForwarded verifies that
+// RewriteDataFilesOptions.GroupOptions are piped through to every
+// ExecuteCompactionGroup call.
+func TestRewriteDataFiles_GroupOptionsForwarded(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+
+ arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false,
false)
+ require.NoError(t, err)
+
+ for i := range 5 {
+ dataPath := tbl.Location() +
fmt.Sprintf("/data/file-%d.parquet", i)
+ writeParquetFile(t, dataPath, arrowSc,
+ fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+ tx := tbl.NewTransaction()
+ require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath},
nil, false))
+ tbl, err = tx.Commit(t.Context())
+ require.NoError(t, err)
+ }
+
+ tasks, err := tbl.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+
+ plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+ require.NoError(t, err)
+
+ tx := tbl.NewTransaction()
+ result, err := tx.RewriteDataFiles(t.Context(),
toTaskGroups(plan.Groups), table.RewriteDataFilesOptions{
+ GroupOptions: []table.CompactionGroupOption{
+ table.WithCompactionTargetFileSize(1),
+ },
+ })
+ require.NoError(t, err)
+
+ assert.Greater(t, result.AddedDataFiles, 1,
+ "GroupOptions must reach ExecuteCompactionGroup; tiny target
size should split output")
+
+ // Drive the commit through to catch regressions that break
+ // ReplaceFiles under tiny-target rewrites — the in-process counter
+ // above only proves the option reached the writer.
+ committed, err := tx.Commit(t.Context())
+ require.NoError(t, err)
+ assertRowCount(t, committed, 5)
+}
+
func TestRewriteDataFiles_EmptyGroupSkipped(t *testing.T) {
tbl := newRewriteTestTable(t)
diff --git a/table/rewrite_files.go b/table/rewrite_files.go
new file mode 100644
index 00000000..6ee2b805
--- /dev/null
+++ b/table/rewrite_files.go
@@ -0,0 +1,240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "fmt"
+ "maps"
+
+ "github.com/apache/iceberg-go"
+)
+
+// RewriteFiles is the snapshot-operation builder for rewrite
+// (compaction) commits. It is the snapshot-level sibling of [RowDelta]
+// and mirrors Java's org.apache.iceberg.RewriteFiles interface
+// (returned by Table.newRewrite() in Java).
+//
+// Compared to a raw [Transaction.ReplaceFiles] call, the builder
+// owns the rewrite-specific isolation contract internally:
+//
+// - The overwrite producer's default isolation validator is suppressed
+// (concurrent appends into rewritten partitions are allowed; this
+// is the defining behavior of a rewrite).
+// - A rewrite-specific conflict validator is registered so concurrent
+// pos/eq-delete files targeting any rewritten data file are
+// rejected pre-flight at [Transaction.Commit] time. The pos-delete
+// branch only fires when the concurrent writer populated the
+// manifest's referenced_data_file column (field id 143). That
+// column is V2-optional and V3-required for deletion-vector
+// deletes; V2 pos-delete writers commonly leave it empty, in
+// which case only the conservative eq-delete-during-rewrite rule
+// fires.
+//
+// Distributed compaction coordinators construct one [RewriteFiles] on
+// the leader transaction, feed worker outputs in via [RewriteFiles.Apply],
+// and commit one snapshot. In-process callers can use
+// [Transaction.RewriteDataFiles] which drives this builder internally.
+//
+// The builder follows the same fail-fast pattern as
+// [view.MetadataBuilder]: a method that hits an invalid input stages
+// the error and short-circuits all subsequent calls until
+// [RewriteFiles.Commit] drains it. The builder is single-use; once
+// Commit has been called, a second call returns an error regardless
+// of whether the first call succeeded.
+//
+// Adding new delete files (e.g., rewriting position deletes into
+// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile]
+// rejects pos/eq-delete inputs at insertion time. Add the support to
+// the underlying [Transaction.ReplaceFiles] before lifting that
+// restriction.
+type RewriteFiles struct {
+ txn *Transaction
+ dataFilesToDelete []iceberg.DataFile
+ dataFilesToAdd []iceberg.DataFile
+ deleteFilesToRemove []iceberg.DataFile
+ snapshotProps iceberg.Properties
+ err error
+ committed bool
+}
+
+// NewRewrite returns a [RewriteFiles] builder bound to this transaction.
+// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is
+// cloned and the clone is added to the rewrite snapshot's summary;
+// pass nil for none.
+//
+// Usage:
+//
+// rewrite := tx.NewRewrite(nil)
+// rewrite.DeleteFile(oldDataFile)
+// rewrite.AddDataFile(newDataFile)
+// if err := rewrite.Commit(ctx); err != nil { ... }
+// committed, err := tx.Commit(ctx)
+func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties)
*RewriteFiles {
+ return &RewriteFiles{txn: t, snapshotProps: maps.Clone(snapshotProps)}
+}
+
+// DeleteFile marks a file for removal in this rewrite. Routes by
+// content type: data files are queued as data-file replacements;
+// pos/eq-delete files are queued for delete-file removal alongside
+// the data rewrite (typical when a delete is fully applied to data
+// files being rewritten and is therefore safe to expunge).
+//
+// Any other content type stages an error that is returned from the
+// next [RewriteFiles.Commit] call.
+func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles {
+ if r.err != nil {
+ return r
+ }
+ if df == nil {
+ r.err = fmt.Errorf("%w: DeleteFile got nil data file",
ErrInvalidOperation)
+
+ return r
+ }
+
+ switch df.ContentType() {
+ case iceberg.EntryContentData:
+ r.dataFilesToDelete = append(r.dataFilesToDelete, df)
+ case iceberg.EntryContentPosDeletes, iceberg.EntryContentEqDeletes:
+ r.deleteFilesToRemove = append(r.deleteFilesToRemove, df)
+ default:
+ r.err = fmt.Errorf("%w: DeleteFile got unsupported content type
%s (%s)",
+ ErrInvalidOperation, df.ContentType(), df.FilePath())
+ }
+
+ return r
+}
+
+// AddDataFile queues a new data file. Adding delete files is not yet
+// supported by the underlying snapshot machinery; a pos/eq-delete here
+// stages an error that is returned from the next [RewriteFiles.Commit]
+// call. The error names the offending file path so callers driving the
+// builder via [RewriteFiles.Apply] can identify it without tracking
+// queue order.
+func (r *RewriteFiles) AddDataFile(df iceberg.DataFile) *RewriteFiles {
+ if r.err != nil {
+ return r
+ }
+ if df == nil {
+ r.err = fmt.Errorf("%w: AddDataFile got nil data file",
ErrInvalidOperation)
+
+ return r
+ }
+
+ if df.ContentType() != iceberg.EntryContentData {
+ r.err = fmt.Errorf("%w: AddDataFile only supports data files;
got content type %s (%s)",
+ ErrInvalidOperation, df.ContentType(), df.FilePath())
+
+ return r
+ }
+ r.dataFilesToAdd = append(r.dataFilesToAdd, df)
+
+ return r
+}
+
+// Apply is a bulk shortcut that routes three slices onto this builder:
+// every entry in deletes and safeDeletes is queued via
+// [RewriteFiles.DeleteFile] (which routes data vs. delete files by
+// content type), and every entry in adds via [RewriteFiles.AddDataFile].
+//
+// Distributed coordinators should prefer [RewriteFiles.ApplyResult],
+// which takes a [CompactionGroupResult] directly: the three positional
+// same-typed slices here transpose silently under refactor.
+func (r *RewriteFiles) Apply(deletes, adds, safeDeletes []iceberg.DataFile)
*RewriteFiles {
+ if r.err != nil {
+ return r
+ }
+
+ for _, df := range deletes {
+ r.DeleteFile(df)
+ }
+ for _, df := range adds {
+ r.AddDataFile(df)
+ }
+ for _, df := range safeDeletes {
+ r.DeleteFile(df)
+ }
+
+ return r
+}
+
+// ApplyResult is the typed coordinator entry point: it queues a worker's
+// [CompactionGroupResult] onto this builder by routing OldDataFiles
+// (via DeleteFile), NewDataFiles (via AddDataFile), and SafePosDeletes
+// (via DeleteFile) in one call. Prefer this over [RewriteFiles.Apply]
+// when feeding worker outputs — the field names line up with the
+// builder semantics, so a refactor of CompactionGroupResult cannot
+// silently transpose roles.
+//
+// Typical distributed-coordinator pattern:
+//
+// rewrite := leaderTxn.NewRewrite(snapshotProps)
+// for _, gr := range workerResults {
+// rewrite.ApplyResult(gr)
+// }
+// if err := rewrite.Commit(ctx); err != nil { ... }
+func (r *RewriteFiles) ApplyResult(gr CompactionGroupResult) *RewriteFiles {
+ return r.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes)
+}
+
+// Commit stages the rewrite snapshot on the underlying transaction.
+// The catalog commit happens once, later, at [Transaction.Commit] time.
+//
+// Commit is single-shot: any second call returns an error regardless
+// of whether the first call succeeded, and neither re-stages the
+// rewrite nor re-registers the conflict validator. Returns an error
+// if any file passed to [RewriteFiles.AddDataFile] or
+// [RewriteFiles.DeleteFile] had an unsupported content type, if the
+// builder has no file changes, or if the underlying
+// [Transaction.ReplaceFiles] call fails.
+func (r *RewriteFiles) Commit(ctx context.Context) error {
+ if r.committed {
+ return fmt.Errorf("%w: RewriteFiles.Commit already called on
this builder", ErrInvalidOperation)
+ }
+ r.committed = true
+
+ if r.err != nil {
+ return r.err
+ }
+ if len(r.dataFilesToDelete) == 0 && len(r.dataFilesToAdd) == 0 &&
len(r.deleteFilesToRemove) == 0 {
+ return fmt.Errorf("%w: rewrite must have at least one file
change", ErrInvalidOperation)
+ }
+ // Adds-without-deletes would route through ReplaceFiles →
+ // ReplaceDataFilesWithDataFiles → AddDataFiles, an OpAppend
+ // producer that never reads cfg.rewriteSemantics. The snapshot
+ // would be tagged append with no rewrite validator — silently
+ // wrong for a rewrite. A pure delete-file expunge (only
+ // deleteFilesToRemove non-empty) is still legitimate.
+ if len(r.dataFilesToDelete) == 0 && len(r.dataFilesToAdd) > 0 {
+ return fmt.Errorf("%w: rewrite must delete at least one data
file when adding data files", ErrInvalidOperation)
+ }
+
+ if err := r.txn.ReplaceFiles(ctx, r.dataFilesToDelete,
r.dataFilesToAdd, r.deleteFilesToRemove, r.snapshotProps,
withRewriteSemantics()); err != nil {
+ return err
+ }
+
+ if len(r.dataFilesToDelete) > 0 {
+ rewritten := make([]string, 0, len(r.dataFilesToDelete))
+ for _, df := range r.dataFilesToDelete {
+ rewritten = append(rewritten, df.FilePath())
+ }
+ r.txn.addValidator(rewriteValidator(rewritten))
+ }
+
+ return nil
+}
diff --git a/table/rewrite_files_test.go b/table/rewrite_files_test.go
new file mode 100644
index 00000000..7647dd54
--- /dev/null
+++ b/table/rewrite_files_test.go
@@ -0,0 +1,636 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+// Black-box coverage for the [table.RewriteFiles] snapshot-operation
+// builder, including the supporting [table.ExecuteCompactionGroup]
+// worker function and the [table.CollectSafePositionDeletes]
+// predicate. Tests cover the in-process path (one transaction stages
+// and commits) and the distributed-coordinator path (workers produce
+// [table.CompactionGroupResult]s, a leader builds a single
+// [table.RewriteFiles] from them and commits).
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "sync/atomic"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// concurrentTestCatalog enforces Requirement.Validate against its
+// current metadata on every CommitTable, so a leader transaction
+// whose AssertRefSnapshotID points at a stale snapshot fails its
+// first attempt and triggers refresh-and-replay. attempts counts
+// CommitTable invocations so tests can prove the retry boundary.
+type concurrentTestCatalog struct {
+ metadata table.Metadata
+ location string
+ fsF table.FSysF
+ attempts atomic.Int32
+}
+
+func (c *concurrentTestCatalog) LoadTable(_ context.Context, ident
table.Identifier) (*table.Table, error) {
+ return table.New(ident, c.metadata, c.location, c.fsF, c), nil
+}
+
+func (c *concurrentTestCatalog) CommitTable(_ context.Context, _
table.Identifier, reqs []table.Requirement, updates []table.Update)
(table.Metadata, string, error) {
+ c.attempts.Add(1)
+ for _, req := range reqs {
+ if err := req.Validate(c.metadata); err != nil {
+ return nil, "", fmt.Errorf("%w: %w",
table.ErrCommitFailed, err)
+ }
+ }
+ meta, err := table.UpdateTableMetadata(c.metadata, updates, "")
+ if err != nil {
+ return nil, "", err
+ }
+ c.metadata = meta
+
+ // Returning the seed location is enough to keep subsequent
+ // NewTransaction calls from adding AssertCreate; the value is not
+ // re-read by the table machinery between commits in this stub.
+ return meta, c.location, nil
+}
+
+func newConcurrentRewriteTestTable(t *testing.T) (*table.Table,
*concurrentTestCatalog) {
+ t.Helper()
+
+ location := filepath.ToSlash(t.TempDir())
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
table.UnsortedSortOrder, location,
+ iceberg.Properties{
+ table.PropertyFormatVersion: "2",
+ table.CommitNumRetriesKey: "2",
+ table.CommitMinRetryWaitMsKey: "1",
+ table.CommitMaxRetryWaitMsKey: "2",
+ table.CommitTotalRetryTimeoutMsKey: "1000",
+ })
+ require.NoError(t, err)
+
+ metaLoc := location + "/metadata/v1.metadata.json"
+ fsF := func(context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil }
+ cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF:
fsF}
+
+ return table.New(table.Identifier{"db", "concurrent_rewrite"}, meta,
metaLoc, fsF, cat), cat
+}
+
+func newPosDeleteFile(t *testing.T, path string) iceberg.DataFile {
+ t.Helper()
+
+ b, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+ path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+ require.NoError(t, err)
+
+ return b.Build()
+}
+
+func newEqDeleteFile(t *testing.T, path string) iceberg.DataFile {
+ t.Helper()
+
+ b, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+ path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+ require.NoError(t, err)
+ b = b.EqualityFieldIDs([]int{1})
+
+ return b.Build()
+}
+
+func newDataFile(t *testing.T, path string) iceberg.DataFile {
+ t.Helper()
+
+ b, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentData,
+ path, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+ require.NoError(t, err)
+
+ return b.Build()
+}
+
+func TestCollectSafePositionDeletes_FiltersAndDedupes(t *testing.T) {
+ posA := newPosDeleteFile(t, "pos-a.parquet")
+ posB := newPosDeleteFile(t, "pos-b.parquet")
+ eq := newEqDeleteFile(t, "eq.parquet")
+
+ tasks := []table.FileScanTask{
+ {DeleteFiles: []iceberg.DataFile{posA, posB},
EqualityDeleteFiles: []iceberg.DataFile{eq}},
+ {DeleteFiles: []iceberg.DataFile{posA}}, // duplicate of posA
+ }
+
+ got := table.CollectSafePositionDeletes(tasks)
+ require.Len(t, got, 2, "duplicate pos-deletes must be deduped,
eq-deletes must be filtered out")
+
+ paths := []string{got[0].FilePath(), got[1].FilePath()}
+ assert.ElementsMatch(t, []string{"pos-a.parquet", "pos-b.parquet"},
paths)
+}
+
+func TestCollectSafePositionDeletes_EmptyTasks(t *testing.T) {
+ assert.Empty(t, table.CollectSafePositionDeletes(nil))
+ assert.Empty(t,
table.CollectSafePositionDeletes([]table.FileScanTask{{}}))
+}
+
+func TestExecuteCompactionGroup_EmptyGroup(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+
+ got, err := table.ExecuteCompactionGroup(t.Context(), tbl,
+ table.CompactionTaskGroup{PartitionKey: "p"})
+ require.NoError(t, err)
+ assert.Equal(t, "p", got.PartitionKey)
+ assert.Empty(t, got.OldDataFiles)
+ assert.Empty(t, got.NewDataFiles)
+ assert.Empty(t, got.SafePosDeletes)
+}
+
+func TestRewriteFiles_EmptyCommit_Errors(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+ tx := tbl.NewTransaction()
+
+ err := tx.NewRewrite(nil).Commit(t.Context())
+ require.Error(t, err, "an empty rewrite has nothing to stage and must
reject")
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "at least one file change")
+}
+
+func TestRewriteFiles_AddDataFile_RejectsNonDataFile(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+ tx := tbl.NewTransaction()
+
+ posDel := newPosDeleteFile(t, "spurious-pos-del.parquet")
+
+ err := tx.NewRewrite(nil).AddDataFile(posDel).Commit(t.Context())
+ require.Error(t, err)
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "AddDataFile only supports data files",
+ "adding a delete file via AddDataFile must be reported at
commit time")
+ assert.Contains(t, err.Error(), "spurious-pos-del.parquet",
+ "error must name the offending file path")
+}
+
+func TestRewriteFiles_Commit_SingleShot(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+
+ arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false,
false)
+ require.NoError(t, err)
+
+ for i := range 2 {
+ dataPath := tbl.Location() +
fmt.Sprintf("/data/seed-%d.parquet", i)
+ writeParquetFile(t, dataPath, arrowSc,
+ fmt.Sprintf(`[{"id": %d, "data": "seed"}]`, i+1))
+ seedTx := tbl.NewTransaction()
+ require.NoError(t, seedTx.AddFiles(t.Context(),
[]string{dataPath}, nil, false))
+ tbl, err = seedTx.Commit(t.Context())
+ require.NoError(t, err)
+ }
+
+ tasks, err := tbl.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+
+ plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+ require.NoError(t, err)
+ require.NotEmpty(t, plan.Groups)
+
+ groups := toTaskGroups(plan.Groups)
+
+ results := make([]table.CompactionGroupResult, 0, len(groups))
+ for _, g := range groups {
+ gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+ require.NoError(t, err)
+ results = append(results, gr)
+ }
+
+ tx := tbl.NewTransaction()
+ rewrite := tx.NewRewrite(nil)
+ for _, gr := range results {
+ rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles,
gr.SafePosDeletes)
+ }
+ require.NoError(t, rewrite.Commit(t.Context()))
+
+ err = rewrite.Commit(t.Context())
+ require.Error(t, err, "double commit must reject so validators are not
re-appended")
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "already called")
+}
+
+// TestRewriteFiles_Commit_SingleShot_AfterFailure pins down the
+// stricter contract: a builder is dead even when the first Commit
+// failed before reaching ReplaceFiles. Without this guard, an
+// empty-rewrite or stranger-file failure would leave the builder
+// reusable, and a retry would slip through and append the conflict
+// validator a second time.
+func TestRewriteFiles_Commit_SingleShot_AfterFailure(t *testing.T) {
+ t.Run("empty rewrite", func(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+ tx := tbl.NewTransaction()
+
+ rewrite := tx.NewRewrite(nil)
+
+ err := rewrite.Commit(t.Context())
+ require.Error(t, err, "first Commit on an empty builder must
fail")
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "at least one file change")
+
+ err = rewrite.Commit(t.Context())
+ require.Error(t, err, "second Commit must reject even though
the first never staged anything")
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "already called")
+ })
+
+ t.Run("ReplaceFiles failure", func(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+ tx := tbl.NewTransaction()
+
+ stranger := newDataFile(t,
tbl.Location()+"/data/stranger.parquet")
+ replacement := newDataFile(t,
tbl.Location()+"/data/replacement.parquet")
+
+ rewrite := tx.NewRewrite(nil).
+ DeleteFile(stranger).
+ AddDataFile(replacement)
+
+ err := rewrite.Commit(t.Context())
+ require.Error(t, err, "ReplaceFiles must reject a stranger data
file")
+
+ err = rewrite.Commit(t.Context())
+ require.Error(t, err, "second Commit must reject so a retry
can't re-stage ReplaceFiles or re-append the validator")
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "already called")
+ })
+}
+
+func TestRewriteFiles_DeleteFile_RoutesByContentType(t *testing.T) {
+ // Validates routing into the right slice via observable behavior:
+ // the data-slice and delete-file-slice membership checks in
+ // ReplaceFiles produce distinct error messages, so a mis-routed
+ // file would surface the wrong error class.
+ //
+ // Builder A — DeleteFile(strangerData): if routed correctly, the
+ // transaction has an empty deleteFilesToRemove slice and falls
+ // through to ReplaceDataFilesWithDataFiles, which rejects with
+ // "cannot delete files that do not belong to the table". A
+ // mis-route into deleteFilesToRemove would error with "cannot
+ // remove delete files that do not belong to the table" instead.
+ //
+ // Builder B — DeleteFile(strangerPosDel) only: if routed
+ // correctly, ReplaceFiles' main path rejects with "cannot remove
+ // delete files that do not belong to the table". A mis-route
+ // into dataFilesToDelete would surface "cannot delete files
+ // that do not belong to the table" instead. (No AddDataFile
+ // here: adds-without-data-deletes is rejected pre-flight by
+ // Commit, so it would mask the routing error.)
+ tbl := newRewriteTestTable(t)
+
+ arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false,
false)
+ require.NoError(t, err)
+ dataPath := tbl.Location() + "/data/seed.parquet"
+ writeParquetFile(t, dataPath, arrowSc, `[{"id": 1, "data": "seed"}]`)
+ seedTx := tbl.NewTransaction()
+ require.NoError(t, seedTx.AddFiles(t.Context(), []string{dataPath},
nil, false))
+ tbl, err = seedTx.Commit(t.Context())
+ require.NoError(t, err)
+
+ strangerData := newDataFile(t, tbl.Location()+"/data/stranger.parquet")
+ strangerPosDel := newPosDeleteFile(t,
tbl.Location()+"/data/stranger-pos-del.parquet")
+ replacement := newDataFile(t,
tbl.Location()+"/data/replacement.parquet")
+
+ dataSliceMiss := "cannot delete files that do not belong to the table"
+ deleteFileSliceMiss := "cannot remove delete files that do not belong
to the table"
+
+ dataTx := tbl.NewTransaction()
+ err = dataTx.NewRewrite(nil).
+ DeleteFile(strangerData).
+ AddDataFile(replacement).
+ Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), dataSliceMiss,
+ "DeleteFile(data) must route into dataFilesToDelete; mis-routed
it would surface the delete-file-slice error")
+ assert.NotContains(t, err.Error(), deleteFileSliceMiss)
+
+ delTx := tbl.NewTransaction()
+ err = delTx.NewRewrite(nil).
+ DeleteFile(strangerPosDel).
+ Commit(t.Context())
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), deleteFileSliceMiss,
+ "DeleteFile(pos-delete) must route into deleteFilesToRemove;
mis-routed it would surface the data-slice error")
+ assert.NotContains(t, err.Error(), dataSliceMiss)
+}
+
+// TestRewriteFiles_RejectsAddsWithoutDataDeletes proves that staging
+// new data files with no data files to delete is rejected up front —
+// otherwise the chain would slip through ReplaceFiles →
+// ReplaceDataFilesWithDataFiles → AddDataFiles (an OpAppend producer)
+// and silently tag the snapshot append instead of replace, with no
+// rewrite validator registered.
+func TestRewriteFiles_RejectsAddsWithoutDataDeletes(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+ tx := tbl.NewTransaction()
+
+ add := newDataFile(t, tbl.Location()+"/data/lonely-add.parquet")
+
+ err := tx.NewRewrite(nil).AddDataFile(add).Commit(t.Context())
+ require.Error(t, err)
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "must delete at least one data file
when adding")
+}
+
+func TestRewriteFiles_RejectsNilDataFile(t *testing.T) {
+ t.Run("DeleteFile", func(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+ tx := tbl.NewTransaction()
+
+ err := tx.NewRewrite(nil).DeleteFile(nil).Commit(t.Context())
+ require.Error(t, err)
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "DeleteFile got nil data file")
+ })
+
+ t.Run("AddDataFile", func(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+ tx := tbl.NewTransaction()
+
+ err := tx.NewRewrite(nil).AddDataFile(nil).Commit(t.Context())
+ require.Error(t, err)
+ assert.ErrorIs(t, err, table.ErrInvalidOperation)
+ assert.Contains(t, err.Error(), "AddDataFile got nil data file")
+ })
+}
+
+// TestRewriteFiles_DistributedEquivalence proves the worker+coordinator
+// pipeline lands at the same end state as the bundled in-process
+// [Transaction.RewriteDataFiles]: workers run [table.ExecuteCompactionGroup]
+// per group (here inline; in distributed compaction this happens on
+// remote peers and the results travel over the wire), then a single
+// leader transaction stages [table.RewriteFiles.Apply] +
+// [table.RewriteFiles.Commit] for one atomic snapshot.
+func TestRewriteFiles_DistributedEquivalence(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+
+ arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false,
false)
+ require.NoError(t, err)
+
+ for i := range 4 {
+ dataPath := tbl.Location() +
fmt.Sprintf("/data/file-%d.parquet", i)
+ writeParquetFile(t, dataPath, arrowSc,
+ fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+ tx := tbl.NewTransaction()
+ require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath},
nil, false))
+ tbl, err = tx.Commit(t.Context())
+ require.NoError(t, err)
+ }
+
+ tasks, err := tbl.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+ require.Len(t, tasks, 4)
+
+ plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+ require.NoError(t, err)
+ require.NotEmpty(t, plan.Groups)
+
+ groups := toTaskGroups(plan.Groups)
+
+ results := make([]table.CompactionGroupResult, 0, len(groups))
+ for _, g := range groups {
+ gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+ require.NoError(t, err)
+ require.NotEmpty(t, gr.NewDataFiles)
+ results = append(results, gr)
+ }
+
+ leaderTxn := tbl.NewTransaction()
+ rewrite := leaderTxn.NewRewrite(nil)
+ for _, gr := range results {
+ rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles,
gr.SafePosDeletes)
+ }
+ require.NoError(t, rewrite.Commit(t.Context()))
+
+ committed, err := leaderTxn.Commit(t.Context())
+ require.NoError(t, err)
+
+ assertRowCount(t, committed, 4)
+
+ snap := committed.CurrentSnapshot()
+ require.NotNil(t, snap)
+ assert.Equal(t, table.OpReplace, snap.Summary.Operation,
+ "rewrite snapshot must be tagged replace, not overwrite")
+
+ // Lock the cross-client summary contract under OpReplace. PyIceberg
+ // (and other readers) parse these keys; the OpOverwrite → OpReplace
+ // flip must not silently drop any of them.
+ props := snap.Summary.Properties
+ require.NotNil(t, props)
+ assert.Equal(t, "1", props["added-data-files"])
+ assert.Equal(t, "4", props["deleted-data-files"])
+ assert.Equal(t, "4", props["added-records"])
+ assert.Equal(t, "4", props["deleted-records"])
+ assert.Equal(t, "1", props["total-data-files"])
+ assert.Equal(t, "4", props["total-records"])
+ assert.NotEmpty(t, props["added-files-size"], "byte counter must be
populated")
+ assert.NotEmpty(t, props["removed-files-size"], "byte counter must be
populated")
+ assert.NotEmpty(t, props["total-files-size"], "byte counter must be
populated")
+
+ postTasks, err := committed.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+ assert.Len(t, postTasks, 1, "leader-side commit must produce one
consolidated file")
+}
+
+// TestRewriteFiles_DropsSafePositionDeletes drives the pipeline with a
+// position-delete present at plan time and asserts the pos-delete is
+// expunged in the rewrite snapshot.
+func TestRewriteFiles_DropsSafePositionDeletes(t *testing.T) {
+ tbl := newRewriteTestTable(t)
+
+ arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false,
false)
+ require.NoError(t, err)
+
+ for i := range 3 {
+ dataPath := tbl.Location() +
fmt.Sprintf("/data/file-%d.parquet", i)
+ writeParquetFile(t, dataPath, arrowSc, fmt.Sprintf(
+ `[{"id": %d, "data": "a"}, {"id": %d, "data": "b"}]`,
i*2+1, i*2+2))
+ tx := tbl.NewTransaction()
+ require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath},
nil, false))
+ tbl, err = tx.Commit(t.Context())
+ require.NoError(t, err)
+ }
+
+ firstDataPath := tbl.Location() + "/data/file-0.parquet"
+ posDelPath := tbl.Location() + "/data/pos-del-001.parquet"
+ writeParquetFile(t, posDelPath, table.PositionalDeleteArrowSchema,
+ fmt.Sprintf(`[{"file_path": "%s", "pos": 0}]`, firstDataPath))
+
+ posDelBuilder, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentPosDeletes,
+ posDelPath, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+ require.NoError(t, err)
+
+ tx := tbl.NewTransaction()
+ rd := tx.NewRowDelta(nil)
+ rd.AddDeletes(posDelBuilder.Build())
+ require.NoError(t, rd.Commit(t.Context()))
+ tbl, err = tx.Commit(t.Context())
+ require.NoError(t, err)
+ assertRowCount(t, tbl, 5)
+
+ tasks, err := tbl.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+
+ cfg := defaultTestCompactionCfg
+ cfg.DeleteFileThreshold = 1
+ plan, err := cfg.PlanCompaction(tasks)
+ require.NoError(t, err)
+ require.NotEmpty(t, plan.Groups)
+
+ groups := toTaskGroups(plan.Groups)
+
+ results := make([]table.CompactionGroupResult, 0, len(groups))
+ totalSafe := 0
+ for _, g := range groups {
+ gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+ require.NoError(t, err)
+ results = append(results, gr)
+ totalSafe += len(gr.SafePosDeletes)
+ }
+ require.Equal(t, 1, totalSafe, "the staged pos-delete must be reported
safe by exactly one group")
+
+ leaderTxn := tbl.NewTransaction()
+ rewrite := leaderTxn.NewRewrite(nil)
+ for _, gr := range results {
+ rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles,
gr.SafePosDeletes)
+ }
+ require.NoError(t, rewrite.Commit(t.Context()))
+
+ committed, err := leaderTxn.Commit(t.Context())
+ require.NoError(t, err)
+
+ assertRowCount(t, committed, 5)
+
+ // Pos-delete removal must show up in the OpReplace summary —
+ // `removed-position-delete-files` (count of files) and
+ // `removed-position-deletes` (count of rows) are the keys other
+ // clients read.
+ snap := committed.CurrentSnapshot()
+ require.NotNil(t, snap)
+ props := snap.Summary.Properties
+ require.NotNil(t, props)
+ assert.Equal(t, "1", props["removed-position-delete-files"])
+ assert.Equal(t, "1", props["removed-position-deletes"])
+
+ postTasks, err := committed.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+ for _, task := range postTasks {
+ assert.Empty(t, task.DeleteFiles,
+ "safe pos-delete must be expunged in the rewrite
snapshot")
+ }
+}
+
+// TestRewriteFiles_RejectsConcurrentEqDelete is the negative path: a
+// leader stages a rewrite, a concurrent peer commits an equality-delete
+// during the rewrite window, and the leader's [table.Transaction.Commit]
+// must fail with [table.ErrConflictingDeleteFiles] — proving the
+// rewrite-specific conflict validator that [table.RewriteFiles.Commit]
+// registers internally fires under refresh-and-replay.
+//
+// First Commit attempt fails on the stale AssertRefSnapshotID (the
+// catalog has advanced past the leader's base). The retry refreshes,
+// builds a fresh conflictContext walking S0 → S1, and the rewrite
+// validator's conservative rule rejects any concurrent equality-delete
+// during a rewrite — terminal exit before any further CommitTable
+// call.
+//
+// Equality deletes are used here rather than positional deletes because
+// the v2 manifest schema does not carry a pos-delete's referenced data
+// file path; the validator's pos-delete branch is only effective on v3
+// tables. The eq-delete branch is the conservative rule for v2.
+func TestRewriteFiles_RejectsConcurrentEqDelete(t *testing.T) {
+ tbl, cat := newConcurrentRewriteTestTable(t)
+
+ arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false,
false)
+ require.NoError(t, err)
+
+ for i := range 3 {
+ dataPath := tbl.Location() +
fmt.Sprintf("/data/file-%d.parquet", i)
+ writeParquetFile(t, dataPath, arrowSc,
+ fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1))
+ tx := tbl.NewTransaction()
+ require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath},
nil, false))
+ tbl, err = tx.Commit(t.Context())
+ require.NoError(t, err)
+ }
+
+ tasks, err := tbl.Scan().PlanFiles(t.Context())
+ require.NoError(t, err)
+
+ plan, err := defaultTestCompactionCfg.PlanCompaction(tasks)
+ require.NoError(t, err)
+ require.NotEmpty(t, plan.Groups)
+
+ groups := toTaskGroups(plan.Groups)
+
+ results := make([]table.CompactionGroupResult, 0, len(groups))
+ for _, g := range groups {
+ gr, err := table.ExecuteCompactionGroup(t.Context(), tbl, g)
+ require.NoError(t, err)
+ require.NotEmpty(t, gr.OldDataFiles)
+ results = append(results, gr)
+ }
+
+ leaderTxn := tbl.NewTransaction()
+ rewrite := leaderTxn.NewRewrite(nil)
+ for _, gr := range results {
+ rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles,
gr.SafePosDeletes)
+ }
+ require.NoError(t, rewrite.Commit(t.Context()),
+ "staging the rewrite must succeed; the conflict surfaces at
Commit time")
+
+ eqDelPath := tbl.Location() + "/data/concurrent-eq-del.parquet"
+ eqDelBuilder, err := iceberg.NewDataFileBuilder(
+ *iceberg.UnpartitionedSpec, iceberg.EntryContentEqDeletes,
+ eqDelPath, iceberg.ParquetFile, nil, nil, nil, 1, 128)
+ require.NoError(t, err)
+ eqDelBuilder = eqDelBuilder.EqualityFieldIDs([]int{1})
+
+ peerTxn := tbl.NewTransaction()
+ rd := peerTxn.NewRowDelta(nil)
+ rd.AddDeletes(eqDelBuilder.Build())
+ require.NoError(t, rd.Commit(t.Context()))
+ _, err = peerTxn.Commit(t.Context())
+ require.NoError(t, err, "peer commit advances the catalog so the
leader's first attempt fails")
+
+ beforeLeader := cat.attempts.Load()
+ _, err = leaderTxn.Commit(t.Context())
+ require.Error(t, err)
+ assert.ErrorIs(t, err, table.ErrConflictingDeleteFiles,
+ "refresh-and-replay must detect the concurrent equality-delete
during a rewrite")
+ // The expected delta is exactly 1 CommitTable invocation, the
+ // first attempt that fails on the stale AssertRefSnapshotID. The
+ // retry refreshes the conflictContext, the rewrite validator
+ // rejects the concurrent eq-delete, and the leader exits before
+ // re-issuing CommitTable. A delta of 0 means the validator fired
+ // pre-flight; a delta of ≥2 means the retry reached the catalog.
+ assert.Equal(t, int32(1), cat.attempts.Load()-beforeLeader,
+ "only the stale-assertion attempt landed; the retry never
reached CommitTable")
+}
diff --git a/table/row_delta.go b/table/row_delta.go
index 2f7de4f1..03c73613 100644
--- a/table/row_delta.go
+++ b/table/row_delta.go
@@ -174,7 +174,7 @@ func (rd *RowDelta) Commit(ctx context.Context) error {
// fast-append producer's validator is a no-op; RowDelta semantics
// (pos-delete references, eq-delete predicate) require a dedicated
// check that snapshot_producers does not know about.
- rd.txn.validators = append(rd.txn.validators, rd.validate)
+ rd.txn.addValidator(rd.validate)
return rd.txn.apply(updates, reqs)
}
diff --git a/table/transaction.go b/table/transaction.go
index 1ffa18de..2fdc9872 100644
--- a/table/transaction.go
+++ b/table/transaction.go
@@ -142,6 +142,18 @@ func (t *Transaction) apply(updates []Update, reqs
[]Requirement) error {
return nil
}
+// addValidator appends a conflict validator under t.mx. Producers
+// that register validators from outside doCommit (RowDelta, RewriteFiles)
+// must use this helper rather than mutating t.validators directly —
+// the RewriteFiles type doc endorses fanout builders against a single
+// transaction, so the append races with Transaction.Commit's
+// validator read under t.mx.
+func (t *Transaction) addValidator(v conflictValidatorFunc) {
+ t.mx.Lock()
+ defer t.mx.Unlock()
+ t.validators = append(t.validators, v)
+}
+
func (t *Transaction) appendSnapshotProducer(afs io.IO, props
iceberg.Properties) *snapshotProducer {
manifestMerge := t.meta.props.GetBool(ManifestMergeEnabledKey,
ManifestMergeEnabledDefault)
updateSnapshot := t.updateSnapshot(afs, props, OpAppend)
@@ -550,10 +562,13 @@ type dataFileCfg struct {
// withRewriteSemantics marks an overwrite/replace operation as a
// rewrite (compaction) rather than a user-facing overwrite. The
// overwrite producer's default pre-commit conflict validator is
-// bypassed; the caller registers a rewrite-specific validator on the
-// transaction separately via validateNoNewDeletesForRewrittenFiles.
-// Unexported: only RewriteDataFiles passes this; there is no public
-// surface for user code to bypass overwrite isolation.
+// suppressed in favor of the rewrite-specific validator built by
+// [rewriteValidator] and queued onto the transaction's validator list.
+//
+// Unexported: the only safe way to flip this flag is via the
+// [RewriteFiles] builder ([Transaction.NewRewrite]), which always
+// pairs the suppression with the matching validator. Direct callers
+// of [Transaction.ReplaceFiles] cannot bypass overwrite isolation.
func withRewriteSemantics() WriteOption {
return func(cfg *dataFileCfg) {
cfg.rewriteSemantics = true
@@ -765,8 +780,13 @@ func (t *Transaction) ReplaceDataFilesWithDataFiles(ctx
context.Context, filesTo
}
}
+ op := OpOverwrite
+ if cfg.rewriteSemantics {
+ op = OpReplace
+ }
+
commitUUID := uuid.New()
- updater := t.updateSnapshot(fs, snapshotProps,
OpOverwrite).mergeOverwrite(&commitUUID, nil)
+ updater := t.updateSnapshot(fs, snapshotProps,
op).mergeOverwrite(&commitUUID, nil)
if cfg.rewriteSemantics {
// mergeOverwrite guarantees an *overwriteFiles producerImpl.
updater.producerImpl.(*overwriteFiles).skipDefaultValidator =
true
@@ -882,8 +902,13 @@ func (t *Transaction) ReplaceFiles(ctx context.Context,
dataFilesToDelete, dataF
}
}
+ op := OpOverwrite
+ if cfg.rewriteSemantics {
+ op = OpReplace
+ }
+
commitUUID := uuid.New()
- updater := t.updateSnapshot(fs, snapshotProps,
OpOverwrite).mergeOverwrite(&commitUUID, nil)
+ updater := t.updateSnapshot(fs, snapshotProps,
op).mergeOverwrite(&commitUUID, nil)
if cfg.rewriteSemantics {
// mergeOverwrite guarantees an *overwriteFiles producerImpl.
updater.producerImpl.(*overwriteFiles).skipDefaultValidator =
true