UrsSchoenenbergerNu opened a new pull request, #15913:
URL: https://github.com/apache/iceberg/pull/15913

   Closes #15846
   
   ### Quick Summary
   
   When running with unaligned checkpoints, data files may reach 
IcebergFilesCommitter after the associated checkpoint's first barrier.
   We then need to account for these files in the state for the 'next' 
checkpoint to avoid discarding them during recovery or failed checkpoints.
   
   ### Details
   
   We have recently encountered a case of data loss in an application using 
FlinkSink (v1), running on Flink 2.x with unaligned checkpoints.
   
   What we saw:
   
   One commit to the Iceberg table was missing approximately half of the data 
files that were written by the writer during this checkpoint.
   Around this time, one checkpoint was started and Flink snapshots were 
triggered, but this checkpoint timed out before completing.
   Based on our research, we believe that the culprit might be the internal 
state tracking in IcebergFilesCommitter. Here's our current theory:
   
   For unaligned checkpoints, the order of operations happening on 
IcebergFilesCommitter can change subtly since processElement can be called for 
a FlinkWriteResult that is part of checkpoint N even after snapshotState was 
called for checkpoint N:
   
   processElement(FlinkWriteResult1[part of checkpoint N]), recorded in 
writeResultsSinceLastSnapshot
   snapshotState(checkpoint N)
   writeToManifestUptoLatestCheckpoint(N) is called, putting the contents of 
writeResultsSinceLastSnapshot to dataFilesPerCheckpoint(N) and clearing 
writeResultsSinceLastSnapshot
   processElement(FlinkWriteResult2[part of checkpoint N]), recorded in 
writeResultsSinceLastSnapshot (now the single element there)
   checkpoint N times out, so notifyCheckpointComplete(N) is never called
   processElement(FlinkWriteResult3[part of checkpoint N+1])
   snapshotState(checkpoint N+1)
   writeToManifestUptoLatestCheckpoint(N+1) is called, putting the contents of 
writeResultsSinceLastSnapshot to dataFilesPerCheckpoint(N) and 
dataFilesPerCheckpoint(N+1) and clearing writeResultsSinceLastSnapshot. 
dataFilesPerCheckpoint(N) now loses FlinkWriteResult1 even though it was never 
committed and contains only FlinkWriteResult2
   notifyCheckpointComplete(checkpoint N+1)
   Iceberg commit is started, but contains only FlinkWriteResult2 and 
FlinkWriteResult3, not FlinkWriteResult1.
   This sequence, or equivalently one where checkpoint N does not time out, but 
completes after checkpoint N+1, therefore leads to data loss.
   This explanation aligns with the effects that we're seeing. The root cause 
seems to be that 
snapshotState->writeToManifestUpToLatestCheckpoint->writeResultsSinceLastSnapshot
 seems to implicitly assume that all records for the checkpoints have already 
been processed when snapshotState() is called - i.e. it assumes aligned 
checkpoints. If this assumption breaks, AND in addition a later checkpoint is 
snapshotted before an earlier one was notified complete, the issue described 
above is observed.
   
   Additionally, we suspect there's a second failure mode with unaligned 
checkpoints that loses data on job recovery: If our above theory is correct, 
then IcebergFilesCommitter has an issue with elements for checkpoint N being 
processed after snapshotState(N). But the iceberg commit triggered during 
notifyComplete(N) only commits the records from the snapshot. On recovery, this 
means one of two things: Either initializeState()->.tailMap(N) loses 
information about not-yet-committed records; or even if it remembered to do so, 
it would think that these records should be committed as part of Flink 
checkpoint N. It feels like there's a problem when a checkpoint ID can appear 
in dataFilesPerCheckpoint after it has already been used as 
maxCommittedCheckpointId, and the strict tailMap exclusion has no way to 
distinguish "already committed" from "deferred post-barrier" data.
   
   ### Contents of this PR
   
   This PR contains three test cases that trigger the test harness in ways that 
are only possible when using unaligned checkpoints. Without the associated code 
change in prod code, these test cases fail.
   
   ### Disclaimer / disclosure as per contributing guidelines
   
   AI tools were used to initially pinpoint the possible issue and draft test 
cases. The code was subsequently rewritten condensed and clarified by hand. To 
the best of my knowledge, I believe that the way in which the reproduction test 
cases trigger the harness are all cases that DO happen with unaligned 
checkpoints, and the data loss that we had encountered at runtime aligns with 
the one that happens in these reproduction test cases.
   
   ### Caveats
   
   The logic that I'm changing here was last changed by 
https://github.com/apache/iceberg/pull/10526 while fixing a data duplication 
bug. The test cases that were added for that bug are still green with the 
changes that I'm proposing here. I would still like to tag @pvary and 
@zhongqishang and kindly ask them for advice on this issue here and this 
associated PR, as I'm sure they have a much deeper understanding of the inner 
workings of `IcebergFileCommitter` than  I do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to