UrsSchoenenbergerNu opened a new pull request, #15913: URL: https://github.com/apache/iceberg/pull/15913
Closes #15846 ### Quick Summary When running with unaligned checkpoints, data files may reach IcebergFilesCommitter after the associated checkpoint's first barrier. We then need to account for these files in the state for the 'next' checkpoint to avoid discarding them during recovery or failed checkpoints. ### Details We have recently encountered a case of data loss in an application using FlinkSink (v1), running on Flink 2.x with unaligned checkpoints. What we saw: One commit to the Iceberg table was missing approximately half of the data files that were written by the writer during this checkpoint. Around this time, one checkpoint was started and Flink snapshots were triggered, but this checkpoint timed out before completing. Based on our research, we believe that the culprit might be the internal state tracking in IcebergFilesCommitter. Here's our current theory: For unaligned checkpoints, the order of operations happening on IcebergFilesCommitter can change subtly since processElement can be called for a FlinkWriteResult that is part of checkpoint N even after snapshotState was called for checkpoint N: processElement(FlinkWriteResult1[part of checkpoint N]), recorded in writeResultsSinceLastSnapshot snapshotState(checkpoint N) writeToManifestUptoLatestCheckpoint(N) is called, putting the contents of writeResultsSinceLastSnapshot to dataFilesPerCheckpoint(N) and clearing writeResultsSinceLastSnapshot processElement(FlinkWriteResult2[part of checkpoint N]), recorded in writeResultsSinceLastSnapshot (now the single element there) checkpoint N times out, so notifyCheckpointComplete(N) is never called processElement(FlinkWriteResult3[part of checkpoint N+1]) snapshotState(checkpoint N+1) writeToManifestUptoLatestCheckpoint(N+1) is called, putting the contents of writeResultsSinceLastSnapshot to dataFilesPerCheckpoint(N) and dataFilesPerCheckpoint(N+1) and clearing writeResultsSinceLastSnapshot. dataFilesPerCheckpoint(N) now loses FlinkWriteResult1 even though it was never committed and contains only FlinkWriteResult2 notifyCheckpointComplete(checkpoint N+1) Iceberg commit is started, but contains only FlinkWriteResult2 and FlinkWriteResult3, not FlinkWriteResult1. This sequence, or equivalently one where checkpoint N does not time out, but completes after checkpoint N+1, therefore leads to data loss. This explanation aligns with the effects that we're seeing. The root cause seems to be that snapshotState->writeToManifestUpToLatestCheckpoint->writeResultsSinceLastSnapshot seems to implicitly assume that all records for the checkpoints have already been processed when snapshotState() is called - i.e. it assumes aligned checkpoints. If this assumption breaks, AND in addition a later checkpoint is snapshotted before an earlier one was notified complete, the issue described above is observed. Additionally, we suspect there's a second failure mode with unaligned checkpoints that loses data on job recovery: If our above theory is correct, then IcebergFilesCommitter has an issue with elements for checkpoint N being processed after snapshotState(N). But the iceberg commit triggered during notifyComplete(N) only commits the records from the snapshot. On recovery, this means one of two things: Either initializeState()->.tailMap(N) loses information about not-yet-committed records; or even if it remembered to do so, it would think that these records should be committed as part of Flink checkpoint N. It feels like there's a problem when a checkpoint ID can appear in dataFilesPerCheckpoint after it has already been used as maxCommittedCheckpointId, and the strict tailMap exclusion has no way to distinguish "already committed" from "deferred post-barrier" data. ### Contents of this PR This PR contains three test cases that trigger the test harness in ways that are only possible when using unaligned checkpoints. Without the associated code change in prod code, these test cases fail. ### Disclaimer / disclosure as per contributing guidelines AI tools were used to initially pinpoint the possible issue and draft test cases. The code was subsequently rewritten condensed and clarified by hand. To the best of my knowledge, I believe that the way in which the reproduction test cases trigger the harness are all cases that DO happen with unaligned checkpoints, and the data loss that we had encountered at runtime aligns with the one that happens in these reproduction test cases. ### Caveats The logic that I'm changing here was last changed by https://github.com/apache/iceberg/pull/10526 while fixing a data duplication bug. The test cases that were added for that bug are still green with the changes that I'm proposing here. I would still like to tag @pvary and @zhongqishang and kindly ask them for advice on this issue here and this associated PR, as I'm sure they have a much deeper understanding of the inner workings of `IcebergFileCommitter` than I do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
