pvary commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1650849668
##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -426,30 +425,44 @@ private void commitOperation(
}
@Override
- public void processElement(StreamRecord<WriteResult> element) {
- this.writeResultsOfCurrentCkpt.add(element.getValue());
+ public void processElement(StreamRecord<FlinkWriteResult> element) {
+ FlinkWriteResult flinkWriteResult = element.getValue();
+ List<WriteResult> writeResults =
+ writeResultsSinceLastSnapshot.computeIfAbsent(
+ flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
+ writeResults.add(flinkWriteResult.writeResult());
}
@Override
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
- long currentCheckpointId = Long.MAX_VALUE;
- dataFilesPerCheckpoint.put(currentCheckpointId,
writeToManifest(currentCheckpointId));
- writeResultsOfCurrentCkpt.clear();
-
+ long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
+ writeToManifestSinceLastSnapshot(currentCheckpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId,
currentCheckpointId);
}
+ private void writeToManifestSinceLastSnapshot(long checkpointId) throws
IOException {
+ if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+ dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
+ }
+
+ for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :
+ writeResultsSinceLastSnapshot.entrySet()) {
+ dataFilesPerCheckpoint.put(
Review Comment:
Is this a scenario which could happen?
- Writer 1 writes data
- Writer 2 writes data
- Writer 1 prepareSnapshotPreBarrier for Checkpoint 1, File 1-1 with
checkpoint 1 created
- Committer receives File 1-1
- Writer 2 prepareSnapshotPreBarrier for Checkpoint 1, File 1-2 with
checkpoint 1 created
- Committer receives File 1-2
- Writer 1 writes data
- Writer 2 writes data
- Writer 1 prepareSnapshotPreBarrier for Checkpoint 2, File 2-1 with
checkpoint 2 created
- Committer receives File 2-1
- Committer starts snapshotState for Checkpoint 1 - At this stage we have
`writeResultsSinceLastSnapshot` with more data, but not all of them are ready
to commit.
- Committer receives File 2-1
- Committer starts snapshotState for Checkpoint 2
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]