gaoyunhaii commented on a change in pull request #16493:
URL: https://github.com/apache/flink/pull/16493#discussion_r670483114
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
}
}
+ private void triggerCheckpointRequest(
+ CheckpointTriggerRequest request, long timestamp,
PendingCheckpoint checkpoint) {
+ if (checkpoint.isDisposed()) {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ checkpoint.getFailureCause()));
+ } else {
+ triggerTasks(request, timestamp, checkpoint)
+ .exceptionally(
Review comment:
I think we might need to execute the callback in the `timer` thread
explicitly ? since `onTriggerFailure()` would need to be run in the `timer`
thread and `acks` would be completed in the akka thread.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
}
}
+ private void triggerCheckpointRequest(
+ CheckpointTriggerRequest request, long timestamp,
PendingCheckpoint checkpoint) {
+ if (checkpoint.isDisposed()) {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ checkpoint.getFailureCause()));
+ } else {
+ triggerTasks(request, timestamp, checkpoint)
+ .exceptionally(
+ failure -> {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ failure));
+ return null;
+ });
+
+ coordinatorsToCheckpoint.forEach(
+ (ctx) ->
ctx.afterSourceBarrierInjection(checkpoint.getCheckpointID()));
+ // It is possible that the tasks has finished
+ // checkpointing at this point.
+ // So we need to complete this pending checkpoint.
+ if (maybeCompleteCheckpoint(checkpoint)) {
+ onTriggerSuccess();
+ }
+ }
+ }
+
+ private CompletableFuture<Void> triggerTasks(
+ CheckpointTriggerRequest request, long timestamp,
PendingCheckpoint checkpoint) {
+ // no exception, no discarding, everything is OK
+ final long checkpointId = checkpoint.getCheckpointId();
Review comment:
nit: better use `getCheckpointID()` since `getCheckpointId()` is
deprecated.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
}
}
+ private void triggerCheckpointRequest(
+ CheckpointTriggerRequest request, long timestamp,
PendingCheckpoint checkpoint) {
+ if (checkpoint.isDisposed()) {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ checkpoint.getFailureCause()));
+ } else {
+ triggerTasks(request, timestamp, checkpoint)
+ .exceptionally(
+ failure -> {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ failure));
+ return null;
+ });
+
+ coordinatorsToCheckpoint.forEach(
+ (ctx) ->
ctx.afterSourceBarrierInjection(checkpoint.getCheckpointID()));
+ // It is possible that the tasks has finished
+ // checkpointing at this point.
+ // So we need to complete this pending checkpoint.
+ if (maybeCompleteCheckpoint(checkpoint)) {
+ onTriggerSuccess();
+ }
+ }
+ }
+
+ private CompletableFuture<Void> triggerTasks(
+ CheckpointTriggerRequest request, long timestamp,
PendingCheckpoint checkpoint) {
+ // no exception, no discarding, everything is OK
+ final long checkpointId = checkpoint.getCheckpointId();
+
+ final CheckpointOptions checkpointOptions =
+ CheckpointOptions.forConfig(
+ request.props.getCheckpointType(),
+
checkpoint.getCheckpointStorageLocation().getLocationReference(),
+ isExactlyOnceMode,
+ unalignedCheckpointsEnabled,
+ alignedCheckpointTimeout);
+
+ // send the messages to the tasks that trigger their checkpoint
Review comment:
nit: `that trigger` -> `to trigger` ?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
}
}
+ private void triggerCheckpointRequest(
+ CheckpointTriggerRequest request, long timestamp,
PendingCheckpoint checkpoint) {
+ if (checkpoint.isDisposed()) {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ checkpoint.getFailureCause()));
+ } else {
+ triggerTasks(request, timestamp, checkpoint)
+ .exceptionally(
+ failure -> {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ failure));
+ return null;
+ });
+
+ coordinatorsToCheckpoint.forEach(
+ (ctx) ->
ctx.afterSourceBarrierInjection(checkpoint.getCheckpointID()));
+ // It is possible that the tasks has finished
+ // checkpointing at this point.
+ // So we need to complete this pending checkpoint.
+ if (maybeCompleteCheckpoint(checkpoint)) {
+ onTriggerSuccess();
Review comment:
There might be one problem for `onTriggerSuccess` here that we might
first call `onTriggerSuccess` here, and later we found trigger failure and call
`onTriggerFailure` in the callback. This might cause some problem like it would
modify `isTriggering = false` and schedule the next trigger task between the
current trigger (which composes of several separate Runnables), and it might
also have some semantics conflict. Perhaps we might change to one of the
followings ?
1. Call `onTriggerSuccess()` also in the callback of trigger tasks finished.
But if the trigger actually failed with timeout, then the trigger result check
would be hold by rpc timeout (default 10s) and between this period we could not
trigger new checkpoints.
2. We think the checkpoint is triggered successfully after this line is
executed, and if we found triggering tasks fails we only call
`abortPendingCheckpoint`. If we execute the callback also in timer thread,
there would be no concurrent issue since `abortPendingCheckpoint` would always
submitted after the current method is done.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -672,6 +644,64 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
}
}
+ private void triggerCheckpointRequest(
+ CheckpointTriggerRequest request, long timestamp,
PendingCheckpoint checkpoint) {
+ if (checkpoint.isDisposed()) {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
+ CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+ checkpoint.getFailureCause()));
+ } else {
+ triggerTasks(request, timestamp, checkpoint)
+ .exceptionally(
+ failure -> {
+ onTriggerFailure(
+ checkpoint,
+ new CheckpointException(
Review comment:
nit: perhaps we do not need to wrapper failure if it is already
`CheckpointException` ? If task does not exist in TM side the failure would be
`CheckpointException`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]