pvary commented on code in PR #10526:
URL: https://github.com/apache/iceberg/pull/10526#discussion_r1695119335
##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -426,30 +425,44 @@ private void commitOperation(
}
@Override
- public void processElement(StreamRecord<WriteResult> element) {
- this.writeResultsOfCurrentCkpt.add(element.getValue());
+ public void processElement(StreamRecord<FlinkWriteResult> element) {
+ FlinkWriteResult flinkWriteResult = element.getValue();
+ List<WriteResult> writeResults =
+ writeResultsSinceLastSnapshot.computeIfAbsent(
+ flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
+ writeResults.add(flinkWriteResult.writeResult());
}
@Override
public void endInput() throws IOException {
// Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
- long currentCheckpointId = Long.MAX_VALUE;
- dataFilesPerCheckpoint.put(currentCheckpointId,
writeToManifest(currentCheckpointId));
- writeResultsOfCurrentCkpt.clear();
-
+ long currentCheckpointId = IcebergStreamWriter.END_INPUT_CHECKPOINT_ID;
+ writeToManifestSinceLastSnapshot(currentCheckpointId);
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId,
currentCheckpointId);
}
+ private void writeToManifestSinceLastSnapshot(long checkpointId) throws
IOException {
+ if (!writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
+ dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
+ }
+
+ for (Map.Entry<Long, List<WriteResult>> writeResultsOfCkpt :
Review Comment:
I think the process above will translate to the following messages on the
wire:
- Writer-1 prepareSnapshotBarrier will emit File 1-1 generated by the method
- Writer-1 will emits the checkpoint barrier for CHK-1
- Writer-2 prepareSnapshotBarrier will emit File 1-2 generated by the method
- Writer-2 will emits checkpoint barrier for CHK-1
- Committer receives File 1-1
- Committer receives File 1-2
- Writer-1 prepareSnapshotBarrier will emit File 2-1 generated by the method
- Writer-1 will emits checkpoint barrier for CHK-2
- Committer will **NOT** receive File 2-1 until it received the checkpoint
barrier for every upstream operator, and executed the snapshotState method.
Based on this the fix should be working if aligned checkpoints are used. I
think all bets are out of the window, when unaligned checkpoints are used, as
this case the checkpoint barriers can "overtake" normal records.
@gyfora: Could you please validate if my understanding above is correct?
If I am right, it would be good to state this somewhere in the Iceberg Flink
connector documentation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]