UrsSchoenenbergerNu opened a new issue, #15846: URL: https://github.com/apache/iceberg/issues/15846
### Apache Iceberg version 1.10.0 ### Query engine Flink ### Please describe the bug 🐞 Hi, we have recently encountered a case of data loss in an application using `FlinkSink` (v1), running on Flink 2.x with unaligned checkpoints. What we saw: - One commit to the Iceberg table was missing approximately half of the data files that were written by the writer during this checkpoint. - Around this time, one checkpoint was started and Flink snapshots were triggered, but this checkpoint timed out before completing. Based on our research, we believe that the culprit might be the internal state tracking in `IcebergFilesCommitter`. Here's our current theory: For unaligned checkpoints, the order of operations happening on `IcebergFilesCommitter` can change subtly since `processElement` can be called for a `FlinkWriteResult` that is part of checkpoint N even _after_ `snapshotState` was called for checkpoint N: - `processElement(FlinkWriteResult1[part of checkpoint N])`, recorded in `writeResultsSinceLastSnapshot` - `snapshotState(checkpoint N)` - `writeToManifestUptoLatestCheckpoint(N)` is called, putting the contents of `writeResultsSinceLastSnapshot` to `dataFilesPerCheckpoint(N)` and clearing `writeResultsSinceLastSnapshot` - `processElement(FlinkWriteResult2[part of checkpoint N])`, recorded in `writeResultsSinceLastSnapshot` (now the single element there) - checkpoint N times out, so `notifyCheckpointComplete(N)` is never called - `processElement(FlinkWriteResult3[part of checkpoint N+1])` - `snapshotState(checkpoint N+1)` - `writeToManifestUptoLatestCheckpoint(N+1)` is called, putting the contents of `writeResultsSinceLastSnapshot` to `dataFilesPerCheckpoint(N)` and `dataFilesPerCheckpoint(N+1)` and clearing `writeResultsSinceLastSnapshot`. `dataFilesPerCheckpoint(N)` now loses `FlinkWriteResult1` even though it was never committed and contains only `FlinkWriteResult2` - `notifyCheckpointComplete(checkpoint N+1)` - Iceberg commit is started, but contains only `FlinkWriteResult2` and `FlinkWriteResult3`, not `FlinkWriteResult1`. This sequence, or equivalently one where checkpoint N does not time out, but completes after checkpoint N+1, therefore leads to data loss. This explanation aligns with the effects that we're seeing. The root cause seems to be that `snapshotState->writeToManifestUpToLatestCheckpoint->writeResultsSinceLastSnapshot` seems to implicitly assume that all records for the checkpoints have already been processed when `snapshotState()` is called - i.e. it assumes aligned checkpoints. If this assumption breaks, AND in addition a later checkpoint is snapshotted before an earlier one was notified complete, the issue described above is observed. Additionally, we suspect there's a second failure mode with unaligned checkpoints that loses data on job recovery: If our above theory is correct, then `IcebergFilesCommitter` has an issue with elements for checkpoint N being processed after `snapshotState(N)`. But the iceberg commit triggered during `notifyComplete(N)` only commits the records from the snapshot. On recovery, this means one of two things: Either `initializeState()->.tailMap(N)` loses information about not-yet-committed records; or even if it remembered to do so, it would think that these records should be committed as part of Flink checkpoint N. It feels like there's a problem when a checkpoint ID can appear in `dataFilesPerCheckpoint` after it has already been used as `maxCommittedCheckpointId`, and the strict tailMap exclusion has no way to distinguish "already committed" from "deferred post-barrier" data. ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [x] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
