UrsSchoenenbergerNu opened a new issue, #15846:
URL: https://github.com/apache/iceberg/issues/15846

   ### Apache Iceberg version
   
   1.10.0
   
   ### Query engine
   
   Flink
   
   ### Please describe the bug 🐞
   
   Hi,
   
   we have recently encountered a case of data loss in an application using 
`FlinkSink` (v1), running on Flink 2.x with unaligned checkpoints.
   
   What we saw: 
   - One commit to the Iceberg table was missing approximately half of the data 
files that were written by the writer during this checkpoint. 
   - Around this time, one checkpoint was started and Flink snapshots were 
triggered, but this checkpoint timed out before completing. 
   
   Based on our research, we believe that the culprit might be the internal 
state tracking in `IcebergFilesCommitter`. Here's our current theory:
   
   For unaligned checkpoints, the order of operations happening on 
`IcebergFilesCommitter` can change subtly since `processElement` can be called 
for a `FlinkWriteResult` that is part of checkpoint N even _after_ 
`snapshotState` was called for checkpoint N:
   - `processElement(FlinkWriteResult1[part of checkpoint N])`, recorded in 
`writeResultsSinceLastSnapshot`
   - `snapshotState(checkpoint N)`
   - `writeToManifestUptoLatestCheckpoint(N)` is called, putting the contents 
of `writeResultsSinceLastSnapshot` to `dataFilesPerCheckpoint(N)` and clearing 
`writeResultsSinceLastSnapshot`
   - `processElement(FlinkWriteResult2[part of checkpoint N])`, recorded in 
`writeResultsSinceLastSnapshot` (now the single element there)
   - checkpoint N times out, so `notifyCheckpointComplete(N)` is never called
   - `processElement(FlinkWriteResult3[part of checkpoint N+1])`
   - `snapshotState(checkpoint N+1)`
   - `writeToManifestUptoLatestCheckpoint(N+1)` is called, putting the contents 
of `writeResultsSinceLastSnapshot` to `dataFilesPerCheckpoint(N)` and 
`dataFilesPerCheckpoint(N+1)` and clearing `writeResultsSinceLastSnapshot`. 
`dataFilesPerCheckpoint(N)` now loses `FlinkWriteResult1` even though it was 
never committed and contains only `FlinkWriteResult2`
   - `notifyCheckpointComplete(checkpoint N+1)`
   - Iceberg commit is started, but contains only `FlinkWriteResult2` and 
`FlinkWriteResult3`, not `FlinkWriteResult1`.
   This sequence, or equivalently one where checkpoint N does not time out, but 
completes after checkpoint N+1, therefore leads to data loss.
   
   This explanation aligns with the effects that we're seeing. The root cause 
seems to be that 
`snapshotState->writeToManifestUpToLatestCheckpoint->writeResultsSinceLastSnapshot`
 seems to implicitly assume that all records for the checkpoints have already 
been processed when `snapshotState()` is called - i.e. it assumes aligned 
checkpoints. If this assumption breaks, AND in addition a later checkpoint is 
snapshotted before an earlier one was notified complete, the issue described 
above is observed.
   
   Additionally, we suspect there's a second failure mode with unaligned 
checkpoints that loses data on job recovery: If our above theory is correct, 
then `IcebergFilesCommitter` has an issue with elements for checkpoint N being 
processed after `snapshotState(N)`. But the iceberg commit triggered during 
`notifyComplete(N)` only commits the records from the snapshot. On recovery, 
this means one of two things: Either `initializeState()->.tailMap(N)` loses 
information about not-yet-committed records; or even if it remembered to do so, 
it would think that these records should be committed as part of Flink 
checkpoint N.  It feels like there's a problem when a checkpoint ID can appear 
in `dataFilesPerCheckpoint` after it has already been used as 
`maxCommittedCheckpointId`, and the strict tailMap exclusion has no way to 
distinguish "already committed" from "deferred post-barrier" data.
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [x] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to