koodin9 opened a new pull request, #15710:
URL: https://github.com/apache/iceberg/pull/15710
## Summary
After a partial commit (timeout), late-arriving `DataWritten` events from
the previous
commit cycle are added to the next cycle's `commitBuffer`. The current
implementation
groups all envelopes by table only (`tableCommitMap()`), so stale and
current data files
end up in the same `RowDelta`. Since all files in a single `RowDelta`
receive the same
sequence number, equality deletes fail to apply (`data_seq < delete_seq`
is false),
resulting in **duplicate rows**.
This PR fixes the issue by separating envelopes by `commitId` and
committing each group
in a distinct `RowDelta`, ensuring stale data files get a lower sequence
number than
the current cycle's equality deletes.
## Root Cause
1. Coordinator times out waiting for all workers → partial commit with
available files
2. `commitConsumerOffsets()` advances control topic offsets
3. Late `DataWritten` from the timed-out worker arrives in the next
`process()` call
4. `tableCommitMap()` merges stale + current envelopes into one `RowDelta`
5. Same sequence number → equality deletes don't apply → duplicate rows
## Changes
- **`CommitState.java`**: Replace `tableCommitMap()` with
`tableCommitMaps()` that returns
`List<Map<TableIdentifier, List<Envelope>>>`, separated by `commitId`.
Stale commitIds are ordered first (via `LinkedHashMap` insertion order),
current commitId is always last.
- **`Coordinator.java`**: `doCommit()` iterates over the list and commits
each batch
in a separate `RowDelta`. `offsetsJson` and `vtts` are only stored on
the last batch
to prevent `lastCommittedOffsetsForTable()` from filtering out
subsequent batches.
Null guards added to `commitToTable()`.
- **`TestCoordinatorPartialCommit.java`**: New regression test that
simulates the
partial commit scenario and verifies that the stale snapshot's sequence
number is
strictly less than the current snapshot's sequence number.
## Why not clear the buffer? (re: #15651)
An alternative approach is to discard stale events in `startNewCommit()`.
However, this
causes **data loss** in the partial commit scenario: after
`doCommit(true)` succeeds,
consumer offsets are committed, so the timed-out worker will not re-send
its
`DataWritten`. Clearing the buffer discards data that cannot be recovered.
## Test Plan
- [x] New unit test `TestCoordinatorPartialCommit` — verifies separate
RowDeltas
produce strictly increasing sequence numbers
- [x] Existing Coordinator and integration tests pass
- [x] Production-like stress test: 4,398,000 rows with CDC (INSERT/UPDATE)
under
aggressive partial commit settings (`commit.interval-ms=10000`,
`commit.timeout-ms=1000`). Verified zero data loss and no duplicate
rows
via Trino row count comparison against source MySQL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]