stevenzwu commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1685640900
##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -426,30 +425,44 @@ private void commitOperation(
}
@Override
- public void processElement(StreamRecord<WriteResult> element) {
- this.writeResultsOfCurrentCkpt.add(element.getValue());
+ public void processElement(StreamRecord<FlinkWriteResult> element) {
+ FlinkWriteResult flinkWriteResult = element.getValue();
+ List<WriteResult> writeResults =
+ writeResultsSinceLastSnapshot.computeIfAbsent(
+ flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
+ writeResults.add(flinkWriteResult.writeResult());
}
@Override
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
- long currentCheckpointId = Long.MAX_VALUE;
- dataFilesPerCheckpoint.put(currentCheckpointId,
writeToManifest(currentCheckpointId));
- writeResultsOfCurrentCkpt.clear();
-
+ long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
+ writeToManifestSinceLastSnapshot(currentCheckpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId,
currentCheckpointId);
}
+ private void writeToManifestSinceLastSnapshot(long checkpointId) throws
IOException {
+ if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+ dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
+ }
+
+ for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :
Review Comment:
Checked out the linked comment. It is the same scenario. But I think it
could happen. let me call out the step where it can be problematic when
concurrent checkpoints are allowed. cc @pvary
- Writer 1 writes data
- Writer 2 writes data
- Writer 1 prepareSnapshotPreBarrier for Checkpoint 1, File 1-1 with
checkpoint 1 created
- Committer receives File 1-1
- Writer 2 prepareSnapshotPreBarrier for Checkpoint 1, File 1-2 with
checkpoint 1 created
- Committer receives File 1-2
- Writer 1 writes data
- Writer 2 writes data
- Writer 1 prepareSnapshotPreBarrier for Checkpoint 2, File 2-1 with
checkpoint 2 created
- Committer receives File 2-1
- Committer starts snapshotState for Checkpoint 1 - At this stage we have
writeResultsSinceLastSnapshot with more data, but not all of them are ready to
commit. `<--- In this step, current implementation will commit files from
checkpoint 2, which is incorrect. commuter should only commit files up to
checkpoint 1.`
- Committer receives File 2-2
- Committer starts snapshotState for Checkpoint 2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]