mystictraveler opened a new issue, #860:
URL: https://github.com/apache/iceberg-go/issues/860
## Summary
Under concurrent producer load, `Transaction.Overwrite` produces a staged
snapshot whose total row count is **less than** the input snapshot's row count,
even though the operation should preserve rows. The losses are
non-deterministic and small (typically 1k–10k rows on tables with millions of
rows). They reproduce repeatably when a parallel Iceberg writer is committing
into the same table while compaction runs.
The transaction reports success — the row loss is silent unless the caller
does a post-stage row-count consistency check.
## Versions
- iceberg-go: v0.5.0
- arrow-go: v18.5.2-0.20260220015023
- Go: 1.24
- OS: macOS 25.2.0 (also reproduced on Linux)
- Storage backends tested: local fileblob (`gocloud.dev/blob/fileblob`)
## Symptom
A janitor-style compactor runs:
```go
tbl, _ := cat.LoadTable(ctx, ident)
scan := tbl.Scan(table.WithRowFilter(filter)) //
partition-equality filter
schema, recIter, _ := scan.ToArrowRecords(ctx)
reader := newStreamingRecordReader(schema, recIter) // adapts
iter.Seq2 → array.RecordReader
defer reader.Release()
tx := tbl.NewTransaction()
if err := tx.Overwrite(ctx, reader, nil, table.WithOverwriteFilter(filter));
err != nil {
return err
}
staged, _ := tx.StagedTable()
// Verify row preservation:
beforeRows := sumRowsAcrossManifests(tbl) // walks
tbl.CurrentSnapshot() manifests
stagedRows := sumRowsAcrossManifests(staged.Table)
if beforeRows != stagedRows {
return fmt.Errorf("input rows %d != staged rows %d", beforeRows,
stagedRows)
}
```
While a parallel goroutine commits new data to the same table at ~10
commits/sec via a separate `cat.LoadTable` + `tx.Append`, this verification
fails on roughly 60% of compaction attempts:
```
input rows 5900000 != staged rows 5890013 (Δ -9987)
input rows 6500000 != staged rows 6499023 (Δ -977)
input rows 8000000 != staged rows 7989862 (Δ -10138)
input rows 8800000 != staged rows 8797027 (Δ -2973)
input rows 9800000 != staged rows 9789864 (Δ -10136)
input rows 13000000 != staged rows 12990059 (Δ -9941)
input rows 15600000 != staged rows 15596013 (Δ -3987)
input rows 18000000 != staged rows 17989981 (Δ -10019)
input rows 19000000 != staged rows 18996026 (Δ -3974)
input rows 24600000 != staged rows 24596019 (Δ -3981)
input rows 34200000 != staged rows 34192221 (Δ -7779)
```
`staged < before` in every case. Rows are being dropped, not duplicated.
Both `beforeRows` and `stagedRows` are computed by walking
`Snapshot.Manifests(fs)` and summing `DataFile.Count()` across every entry —
i.e., they read the immutable on-disk manifest list. The discrepancy means the
staged snapshot's manifest list **actually contains data files whose total row
count is less than the input snapshot's data files**. This is not a stat skew
or a master-check arithmetic bug; it's fewer rows in the resulting table.
The same compaction code path on the same table with **no concurrent
writer** passes the verification cleanly every time (29200 in / 29200 out,
600000 in / 600000 out across many runs). The race only fires under concurrent
producer load.
## Suspect: `partitionedFanoutWriter` row-loss race
`tx.Overwrite` → `recordsToDataFiles` (table/arrow_utils.go:1300) →
`partitionedFanoutWriter.Write` (table/partitioned_fanout_writer.go:73).
The fanout writer spawns N workers (default `config.EnvConfig.MaxWorkers =
5`) all reading from a single `inputRecordsCh`:
```go
// table/partitioned_fanout_writer.go:73-87
func (p *partitionedFanoutWriter) Write(ctx context.Context, workers int)
iter.Seq2[iceberg.DataFile, error] {
inputRecordsCh := make(chan arrow.RecordBatch, workers)
outputDataFilesCh := make(chan iceberg.DataFile, workers)
fanoutWorkers, ctx := errgroup.WithContext(ctx)
startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh)
for range workers {
fanoutWorkers.Go(func() error {
return p.fanout(ctx, inputRecordsCh, outputDataFilesCh)
})
}
return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh)
}
```
Each worker, on each batch:
```go
// table/partitioned_fanout_writer.go:112-153
func (p *partitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh
<-chan arrow.RecordBatch, dataFilesChannel chan<- iceberg.DataFile) error {
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case record, ok := <-inputRecordsCh:
if !ok {
return nil
}
defer record.Release() // ← line 122: defer inside a for loop
partitions, err := p.getPartitions(record)
...
for _, val := range partitions {
...
rollingDataWriter, err :=
p.writerFactory.getOrCreateRollingDataWriter(
ctx, p.concurrentDataFileWriter, partitionPath,
val.partitionValues, dataFilesChannel)
if err != nil {
return err
}
err = rollingDataWriter.Add(partitionRecord)
if err != nil {
return err
}
}
}
}
}
```
Two concrete concerns:
**1. `defer record.Release()` inside a `for` loop (line 122).** Defer is
function-scoped, so releases accumulate until `fanout` returns rather than
firing per-iteration. At minimum a memory smell. Under early-exit paths it
could mean the released-on-defer record is still being read by another worker
that pulled the same `*RecordBatch` (Arrow record batches are
reference-counted; if Retain happens elsewhere this might be benign, but it's
worth auditing).
**2. Concurrent `writerFactory.getOrCreateRollingDataWriter` for the same
partition path.** N workers all racing to read from a single channel will
frequently pull adjacent batches that hash to the same partition. If
`getOrCreate` isn't strictly mutually exclusive across workers (or if the
per-writer `Add` isn't synchronized), two workers can:
- Both observe the partition is missing, both create a rolling writer, only
one ends up wired into the data-files channel → the other writer's flushed
Parquet file is **never appended to the staged manifest list**, so its rows
vanish from the staged total. This produces *exactly* the failure shape we see:
a small, non-deterministic number of rows missing, biased toward partitions
that received bursts during the compaction window.
- Or: both register but interleave `Add` calls in a way that drops some
row-index slices.
I have not yet read `writer_factory.go` / `concurrent_data_file_writer.go`
end-to-end to confirm whether the synchronization is sufficient — that would be
the next step.
## What's confirmed vs. hypothesized
**Confirmed:**
- The symptom: `staged.totalRows < tbl.totalRows` after `tx.Overwrite`
returns success.
- It is reproducible against fileblob with the workload described below.
- It does not reproduce against a quiescent table (same code, same filter,
same sequence of operations).
- The transaction's snapshot pinning at the metadata level looks correct:
`t.meta.currentSnapshot()` is captured at `NewTransaction` time,
`Snapshot.dataFiles(fs, nil)` reads the immutable on-disk manifest list, and
`classifyFilesForDeletions` correctly uses that pinned snapshot. So this does
NOT appear to be the classifier mis-classifying files; the classifier sees the
correct file set.
**Hypothesized:**
- The row loss happens on the writer side (fanout) rather than the
reader/classifier side.
- The specific mechanism is one of the two `partitionedFanoutWriter`
concerns above.
- Setting `config.EnvConfig.MaxWorkers = 1` should make the symptom vanish
entirely (single-worker fanout has no concurrent `getOrCreate`).
I have a diagnostic build running with `MaxWorkers = 1` right now and will
post a follow-up comment with the result. If the failures vanish at
MaxWorkers=1 and reappear at MaxWorkers=5, the writer-race hypothesis is
confirmed and the bug is upstream of any caller workaround.
## Reproduction
The full reproduction harness lives at
https://github.com/mystictraveler/iceberg-janitor (branch
`feature/go-rewrite-mvp`). The relevant pieces:
- `go/cmd/janitor-streamer/main.go` — TPC-DS streaming workload writer using
iceberg-go's `SqlCatalog` (committing ~10/sec/fact at 200k–50k rows per commit,
50 partitions per fact)
- `go/pkg/janitor/compact.go` — the compaction code path shown above
- `go/pkg/safety/verify.go` — the row-count verification (master check)
- `go/test/bench/bench-tpcds.sh` — driver
```bash
DURATION_SECONDS=180 \
COMMITS_PER_MINUTE=600 \
STORE_SALES_PER_BATCH=200000 \
STORE_RETURNS_PER_BATCH=50000 \
CATALOG_SALES_PER_BATCH=100000 \
QUERY_INTERVAL_SECONDS=45 \
MAINTENANCE_INTERVAL_SECONDS=20 \
./go/test/bench/bench-tpcds.sh
```
11/18 compactions trip the verification on the post-#1-fix iceberg-janitor
build.
## Severity
The transaction returns success on a row-losing operation. Any caller that
doesn't independently row-count-check the staged snapshot will silently lose
data. Concretely: a maintenance/compaction job using `Transaction.Overwrite`
against any table receiving concurrent writes will accumulate row loss.
We are working around this on our side by replacing `tx.Overwrite` with
`Transaction.ReplaceDataFilesWithDataFiles` (table/transaction.go:599), which
takes explicit `filesToDelete []iceberg.DataFile` and `filesToAdd
[]iceberg.DataFile` lists with no row-filter classification and no
partitionedFanoutWriter on the write side. Happy to share that as a separate
issue/PR once it's landed and stable, in case it's useful for upstream tests.
## Adjacent
- mystictraveler/iceberg-janitor#1 — different concurrency-failure mode (CAS
retry on "branch main has changed" requirement validation). Already fixed on
our side; not the same bug.
- mystictraveler/iceberg-janitor#4 — our internal tracker for this bug,
which links back here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]