mystictraveler opened a new issue, #860:
URL: https://github.com/apache/iceberg-go/issues/860

   ## Summary
   
   Under concurrent producer load, `Transaction.Overwrite` produces a staged 
snapshot whose total row count is **less than** the input snapshot's row count, 
even though the operation should preserve rows. The losses are 
non-deterministic and small (typically 1k–10k rows on tables with millions of 
rows). They reproduce repeatably when a parallel Iceberg writer is committing 
into the same table while compaction runs.
   
   The transaction reports success — the row loss is silent unless the caller 
does a post-stage row-count consistency check.
   
   ## Versions
   
   - iceberg-go: v0.5.0
   - arrow-go: v18.5.2-0.20260220015023
   - Go: 1.24
   - OS: macOS 25.2.0 (also reproduced on Linux)
   - Storage backends tested: local fileblob (`gocloud.dev/blob/fileblob`)
   
   ## Symptom
   
   A janitor-style compactor runs:
   
   ```go
   tbl, _ := cat.LoadTable(ctx, ident)
   
   scan := tbl.Scan(table.WithRowFilter(filter))               // 
partition-equality filter
   schema, recIter, _ := scan.ToArrowRecords(ctx)
   reader := newStreamingRecordReader(schema, recIter)         // adapts 
iter.Seq2 → array.RecordReader
   defer reader.Release()
   
   tx := tbl.NewTransaction()
   if err := tx.Overwrite(ctx, reader, nil, table.WithOverwriteFilter(filter)); 
err != nil {
       return err
   }
   staged, _ := tx.StagedTable()
   
   // Verify row preservation:
   beforeRows := sumRowsAcrossManifests(tbl)        // walks 
tbl.CurrentSnapshot() manifests
   stagedRows := sumRowsAcrossManifests(staged.Table)
   if beforeRows != stagedRows {
       return fmt.Errorf("input rows %d != staged rows %d", beforeRows, 
stagedRows)
   }
   ```
   
   While a parallel goroutine commits new data to the same table at ~10 
commits/sec via a separate `cat.LoadTable` + `tx.Append`, this verification 
fails on roughly 60% of compaction attempts:
   
   ```
   input rows 5900000 != staged rows 5890013    (Δ -9987)
   input rows 6500000 != staged rows 6499023    (Δ -977)
   input rows 8000000 != staged rows 7989862    (Δ -10138)
   input rows 8800000 != staged rows 8797027    (Δ -2973)
   input rows 9800000 != staged rows 9789864    (Δ -10136)
   input rows 13000000 != staged rows 12990059  (Δ -9941)
   input rows 15600000 != staged rows 15596013  (Δ -3987)
   input rows 18000000 != staged rows 17989981  (Δ -10019)
   input rows 19000000 != staged rows 18996026  (Δ -3974)
   input rows 24600000 != staged rows 24596019  (Δ -3981)
   input rows 34200000 != staged rows 34192221  (Δ -7779)
   ```
   
   `staged < before` in every case. Rows are being dropped, not duplicated.
   
   Both `beforeRows` and `stagedRows` are computed by walking 
`Snapshot.Manifests(fs)` and summing `DataFile.Count()` across every entry — 
i.e., they read the immutable on-disk manifest list. The discrepancy means the 
staged snapshot's manifest list **actually contains data files whose total row 
count is less than the input snapshot's data files**. This is not a stat skew 
or a master-check arithmetic bug; it's fewer rows in the resulting table.
   
   The same compaction code path on the same table with **no concurrent 
writer** passes the verification cleanly every time (29200 in / 29200 out, 
600000 in / 600000 out across many runs). The race only fires under concurrent 
producer load.
   
   ## Suspect: `partitionedFanoutWriter` row-loss race
   
   `tx.Overwrite` → `recordsToDataFiles` (table/arrow_utils.go:1300) → 
`partitionedFanoutWriter.Write` (table/partitioned_fanout_writer.go:73).
   
   The fanout writer spawns N workers (default `config.EnvConfig.MaxWorkers = 
5`) all reading from a single `inputRecordsCh`:
   
   ```go
   // table/partitioned_fanout_writer.go:73-87
   func (p *partitionedFanoutWriter) Write(ctx context.Context, workers int) 
iter.Seq2[iceberg.DataFile, error] {
       inputRecordsCh := make(chan arrow.RecordBatch, workers)
       outputDataFilesCh := make(chan iceberg.DataFile, workers)
   
       fanoutWorkers, ctx := errgroup.WithContext(ctx)
       startRecordFeeder(ctx, p.itr, fanoutWorkers, inputRecordsCh)
   
       for range workers {
           fanoutWorkers.Go(func() error {
               return p.fanout(ctx, inputRecordsCh, outputDataFilesCh)
           })
       }
   
       return p.yieldDataFiles(fanoutWorkers, outputDataFilesCh)
   }
   ```
   
   Each worker, on each batch:
   
   ```go
   // table/partitioned_fanout_writer.go:112-153
   func (p *partitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh 
<-chan arrow.RecordBatch, dataFilesChannel chan<- iceberg.DataFile) error {
       for {
           select {
           case <-ctx.Done():
               return context.Cause(ctx)
   
           case record, ok := <-inputRecordsCh:
               if !ok {
                   return nil
               }
               defer record.Release()    // ← line 122: defer inside a for loop
   
               partitions, err := p.getPartitions(record)
               ...
               for _, val := range partitions {
                   ...
                   rollingDataWriter, err := 
p.writerFactory.getOrCreateRollingDataWriter(
                       ctx, p.concurrentDataFileWriter, partitionPath, 
val.partitionValues, dataFilesChannel)
                   if err != nil {
                       return err
                   }
   
                   err = rollingDataWriter.Add(partitionRecord)
                   if err != nil {
                       return err
                   }
               }
           }
       }
   }
   ```
   
   Two concrete concerns:
   
   **1. `defer record.Release()` inside a `for` loop (line 122).** Defer is 
function-scoped, so releases accumulate until `fanout` returns rather than 
firing per-iteration. At minimum a memory smell. Under early-exit paths it 
could mean the released-on-defer record is still being read by another worker 
that pulled the same `*RecordBatch` (Arrow record batches are 
reference-counted; if Retain happens elsewhere this might be benign, but it's 
worth auditing).
   
   **2. Concurrent `writerFactory.getOrCreateRollingDataWriter` for the same 
partition path.** N workers all racing to read from a single channel will 
frequently pull adjacent batches that hash to the same partition. If 
`getOrCreate` isn't strictly mutually exclusive across workers (or if the 
per-writer `Add` isn't synchronized), two workers can:
   
   - Both observe the partition is missing, both create a rolling writer, only 
one ends up wired into the data-files channel → the other writer's flushed 
Parquet file is **never appended to the staged manifest list**, so its rows 
vanish from the staged total. This produces *exactly* the failure shape we see: 
a small, non-deterministic number of rows missing, biased toward partitions 
that received bursts during the compaction window.
   - Or: both register but interleave `Add` calls in a way that drops some 
row-index slices.
   
   I have not yet read `writer_factory.go` / `concurrent_data_file_writer.go` 
end-to-end to confirm whether the synchronization is sufficient — that would be 
the next step.
   
   ## What's confirmed vs. hypothesized
   
   **Confirmed:**
   - The symptom: `staged.totalRows < tbl.totalRows` after `tx.Overwrite` 
returns success.
   - It is reproducible against fileblob with the workload described below.
   - It does not reproduce against a quiescent table (same code, same filter, 
same sequence of operations).
   - The transaction's snapshot pinning at the metadata level looks correct: 
`t.meta.currentSnapshot()` is captured at `NewTransaction` time, 
`Snapshot.dataFiles(fs, nil)` reads the immutable on-disk manifest list, and 
`classifyFilesForDeletions` correctly uses that pinned snapshot. So this does 
NOT appear to be the classifier mis-classifying files; the classifier sees the 
correct file set.
   
   **Hypothesized:**
   - The row loss happens on the writer side (fanout) rather than the 
reader/classifier side.
   - The specific mechanism is one of the two `partitionedFanoutWriter` 
concerns above.
   - Setting `config.EnvConfig.MaxWorkers = 1` should make the symptom vanish 
entirely (single-worker fanout has no concurrent `getOrCreate`).
   
   I have a diagnostic build running with `MaxWorkers = 1` right now and will 
post a follow-up comment with the result. If the failures vanish at 
MaxWorkers=1 and reappear at MaxWorkers=5, the writer-race hypothesis is 
confirmed and the bug is upstream of any caller workaround.
   
   ## Reproduction
   
   The full reproduction harness lives at 
https://github.com/mystictraveler/iceberg-janitor (branch 
`feature/go-rewrite-mvp`). The relevant pieces:
   
   - `go/cmd/janitor-streamer/main.go` — TPC-DS streaming workload writer using 
iceberg-go's `SqlCatalog` (committing ~10/sec/fact at 200k–50k rows per commit, 
50 partitions per fact)
   - `go/pkg/janitor/compact.go` — the compaction code path shown above
   - `go/pkg/safety/verify.go` — the row-count verification (master check)
   - `go/test/bench/bench-tpcds.sh` — driver
   
   ```bash
   DURATION_SECONDS=180 \
   COMMITS_PER_MINUTE=600 \
   STORE_SALES_PER_BATCH=200000 \
   STORE_RETURNS_PER_BATCH=50000 \
   CATALOG_SALES_PER_BATCH=100000 \
   QUERY_INTERVAL_SECONDS=45 \
   MAINTENANCE_INTERVAL_SECONDS=20 \
   ./go/test/bench/bench-tpcds.sh
   ```
   
   11/18 compactions trip the verification on the post-#1-fix iceberg-janitor 
build.
   
   ## Severity
   
   The transaction returns success on a row-losing operation. Any caller that 
doesn't independently row-count-check the staged snapshot will silently lose 
data. Concretely: a maintenance/compaction job using `Transaction.Overwrite` 
against any table receiving concurrent writes will accumulate row loss.
   
   We are working around this on our side by replacing `tx.Overwrite` with 
`Transaction.ReplaceDataFilesWithDataFiles` (table/transaction.go:599), which 
takes explicit `filesToDelete []iceberg.DataFile` and `filesToAdd 
[]iceberg.DataFile` lists with no row-filter classification and no 
partitionedFanoutWriter on the write side. Happy to share that as a separate 
issue/PR once it's landed and stable, in case it's useful for upstream tests.
   
   ## Adjacent
   
   - mystictraveler/iceberg-janitor#1 — different concurrency-failure mode (CAS 
retry on "branch main has changed" requirement validation). Already fixed on 
our side; not the same bug.
   - mystictraveler/iceberg-janitor#4 — our internal tracker for this bug, 
which links back here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to