t3hw opened a new pull request, #15651:
URL: https://github.com/apache/iceberg/pull/15651
## Fix: CommitState buffer accumulation causes duplicate records after
failed commits
### Problem
When `Coordinator.doCommit()` throws, `clearResponses()` is never called.
Stale `DataWritten` events persist in `commitBuffer` and get committed
alongside new events in a single `RowDelta`. Both the stale and new data files
are added to the table, producing duplicate rows for the same key.
This is workload-agnostic — it affects CDC, upsert, and append-only modes.
The bug is in the Coordinator's `CommitState`, not in the writer layer.
### Fix (CommitState.java)
1-line change — clear `commitBuffer` at the start of each new commit cycle:
```java
void startNewCommit() {
commitBuffer.clear(); // Discard stale events from any prior failed
commit cycle.
currentCommitId = UUID.randomUUID();
startTime = System.currentTimeMillis();
}
```
**Why this is safe:** Workers always re-produce `DataWritten` events for
each new `StartCommit` — source records are still in Kafka (consumer offsets
weren't committed for the failed cycle), so workers re-read and re-process
them. Old events in the buffer reference orphaned data files from the previous
cycle's writers. There is no mechanism to "adopt" those files into a new commit
cycle.
**Why this approach over alternatives:**
| Option | Where | Issue |
|--------|-------|-------|
| A: Filter by commitId in `tableCommitMap()` | Read path | Memory leak —
stale events accumulate indefinitely under sustained failures |
| B: Clear in `endCurrentCommit()` | End of cycle | Conflates
success/failure cleanup; less explicit about "clean slate" intent |
| **C: Clear in `startNewCommit()`** | **Start of cycle** | **Chosen —
simplest, no memory leak, handles both same-JVM failure and restart/rebalance**
|
Existing `clearResponses()` call in `doCommit()` (line 173) is now redundant
on the happy path but retained as defense-in-depth.
### Test coverage
**CommitState unit tests** (`TestCommitState.java` — 2 new) — *regression
tests for the fix:*
- `testStartNewCommitClearsStaleResponses` — failed commit cycle leaves
stale events; `startNewCommit()` must clear them
- `testRecoveryEventsDiscardedOnNewCommit` — old events from
restart/rebalance recovery; `startNewCommit()` must discard them
**Recovery scenario tests** (`TestRecoveryScenario.java` — 2 tests) —
*document the consequence, not the fix:*
Table-level integration tests using append-only writers that prove combined
commits produce duplicate rows. Write via `RecordUtils.createTableWriter()`,
commit via `table.newRowDelta()`, read back via `IcebergGenerics.read(table)`,
assert with `StructLikeSet`. Parameterized across format versions (2, 3), file
formats (Parquet, ORC), and partition modes (unpartitioned, partitioned).
- `testRecoveryScenarioInsertThenUpdateCombinedCommit` — two batches for the
same key committed in a single RowDelta → both rows survive (duplicate)
- `testRecoveryScenarioInsertThenReInsertCombinedCommit` — two INSERTs for
the same key combined → both survive
**Test infrastructure** (`TableLevelTestBase.java`):
Abstract base class providing parameterized test configuration, writer
creation via `RecordUtils.createTableWriter()`, commit helpers
(`commitTransaction`, `commitCombined`), and read-back utilities
(`actualRowSet`). Schema-agnostic — subclasses implement `schema()` and
`sinkConfig()` to define their table schema and writer mode.
**Build** (`build.gradle`):
Added `testArtifacts` dependencies for `iceberg-api` and `iceberg-core` to
support `TestBase`, `StructLikeSet`, and `IcebergGenerics` in integration tests.
### Red-green verification
Commenting out `commitBuffer.clear()` in `startNewCommit()` causes 2 of 20
tests to fail:
- `testStartNewCommitClearsStaleResponses` — `tableCommitMap()` not empty
after `startNewCommit()`
- `testRecoveryEventsDiscardedOnNewCommit` — same assertion failure
Restoring the fix brings all 20 tests back to green. The recovery scenario
tests pass in both cases — they document the consequence (duplicates from
combined commits), not the fix. The CommitState unit tests are the regression
safety net.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]