Myasuka commented on a change in pull request #16582:
URL: https://github.com/apache/flink/pull/16582#discussion_r751027356
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1405,59 +1406,80 @@ private void declineCheckpoint(
}
public void notifyCheckpointComplete(final long checkpointID) {
- final TaskInvokable invokable = this.invokable;
-
- if (executionState == ExecutionState.RUNNING) {
- checkState(invokable instanceof CheckpointableTask, "invokable is
not checkpointable");
- try {
- ((CheckpointableTask)
invokable).notifyCheckpointCompleteAsync(checkpointID);
- } catch (RejectedExecutionException ex) {
- // This may happen if the mailbox is closed. It means that the
task is shutting
- // down, so we just ignore it.
- LOG.debug(
- "Notify checkpoint complete {} for {} ({}) was
rejected by the mailbox",
- checkpointID,
- taskNameWithSubtask,
- executionId);
- } catch (Throwable t) {
- if (getExecutionState() == ExecutionState.RUNNING) {
- // fail task if checkpoint confirmation failed.
- failExternally(new RuntimeException("Error while
confirming checkpoint", t));
- }
- }
- } else {
- LOG.debug(
- "Ignoring checkpoint commit notification for non-running
task {}.",
- taskNameWithSubtask);
- }
+ notifyCheckpoint(
+ checkpointID,
+ CheckpointStoreUtil.INVALID_CHECKPOINT_ID,
+ NotifyCheckpointOperation.COMPLETE);
}
public void notifyCheckpointAborted(
final long checkpointID, final long latestCompletedCheckpointId) {
- final TaskInvokable invokable = this.invokable;
+ notifyCheckpoint(
+ checkpointID, latestCompletedCheckpointId,
NotifyCheckpointOperation.ABORT);
+ }
- if (executionState == ExecutionState.RUNNING) {
+ public void notifyCheckpointSubsumed(long checkpointID) {
+ notifyCheckpoint(
+ checkpointID,
+ CheckpointStoreUtil.INVALID_CHECKPOINT_ID,
+ NotifyCheckpointOperation.SUBSUME);
+ }
+
+ private void notifyCheckpoint(
+ long checkpointId,
+ long latestCompletedCheckpointId,
+ NotifyCheckpointOperation notifyCheckpointOperation) {
+ TaskInvokable invokable = this.invokable;
+
+ if (executionState == ExecutionState.RUNNING && invokable != null) {
checkState(invokable instanceof CheckpointableTask, "invokable is
not checkpointable");
try {
- ((CheckpointableTask) invokable)
- .notifyCheckpointAbortAsync(checkpointID,
latestCompletedCheckpointId);
+ switch (notifyCheckpointOperation) {
+ case ABORT:
+ ((CheckpointableTask) invokable)
+ .notifyCheckpointAbortAsync(
+ checkpointId,
latestCompletedCheckpointId);
+ break;
+ case COMPLETE:
+ ((CheckpointableTask) invokable)
+ .notifyCheckpointCompleteAsync(checkpointId);
+ break;
+ case SUBSUME:
+ ((CheckpointableTask) invokable)
+ .notifyCheckpointSubsumedAsync(checkpointId);
+ }
} catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the
task is shutting
// down, so we just ignore it.
LOG.debug(
- "Notify checkpoint abort {} for {} ({}) was rejected
by the mailbox",
- checkpointID,
+ "Notify checkpoint {}} {} for {} ({}) was rejected by
the mailbox.",
+ notifyCheckpointOperation,
+ checkpointId,
taskNameWithSubtask,
executionId);
} catch (Throwable t) {
- if (getExecutionState() == ExecutionState.RUNNING) {
- // fail task if checkpoint aborted notification failed.
- failExternally(new RuntimeException("Error while aborting
checkpoint", t));
+ switch (notifyCheckpointOperation) {
+ case ABORT:
+ case COMPLETE:
+ if (getExecutionState() == ExecutionState.RUNNING) {
+ failExternally(
+ new RuntimeException(
+ String.format(
+ "Error while notify
checkpoint %s.",
+ notifyCheckpointOperation),
+ t));
+ }
+ break;
+ case SUBSUME:
+ // just rethrow the throwable out as we do not expect
notification of
+ // subsume could fail the task.
+ ExceptionUtils.rethrow(t);
Review comment:
From my point of view, we need to ensure task failover directly on the
case of checkpoint complete notification to avoid unexpected data lost.
However, I don't think we should also apply this rule on checkpoint subsumed
notification as there is no strict data completeness request here. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]