AHeise commented on code in PR #26433:
URL: https://github.com/apache/flink/pull/26433#discussion_r2036031020
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java:
##########
@@ -151,24 +149,23 @@ public void snapshotState(StateSnapshotContext context)
throws Exception {
@Override
public void endInput() throws Exception {
- endInput = true;
if (!isCheckpointingEnabled || isBatchMode) {
// There will be no final checkpoint, all committables should be
committed here
- commitAndEmitCheckpoints();
+ commitAndEmitCheckpoints(lastCompletedCheckpointId + 1);
}
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
- lastCompletedCheckpointId = Math.max(lastCompletedCheckpointId,
checkpointId);
- commitAndEmitCheckpoints();
+ commitAndEmitCheckpoints(Math.max(lastCompletedCheckpointId,
checkpointId));
}
- private void commitAndEmitCheckpoints() throws IOException,
InterruptedException {
- long completedCheckpointId = endInput ? EOI :
lastCompletedCheckpointId;
+ private void commitAndEmitCheckpoints(long checkpointId)
+ throws IOException, InterruptedException {
+ lastCompletedCheckpointId = checkpointId;
Review Comment:
In general, transient state is lost on error. So whether we update before or
after the loop doesn't matter because the exception will lead to a fail-over
and everything is recalculated on recovery. Since everything is called from the
main task thread (mailbox thread), there is no interleaving possible of this
call and another call like `endInput`.
Now in this specific case, `lastCompletedCheckpointId` refers to the
completed checkpoint id of Flink as a whole. Since this value is primarily set
through `notifyCheckpointCompleted`, the checkpoint is already completed before
the start of the method. So I'd like to keep it as the first statement because
it's easier to read than if it's done at the end of the method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]