pnowojski commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r480989742



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                        
request.getOnCompletionFuture()),
                                                timer);
 
-                       final CompletableFuture<?> masterStatesComplete = 
pendingCheckpointCompletableFuture
-                                       .thenCompose(this::snapshotMasterState);
-
                        final CompletableFuture<?> 
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
                                        .thenComposeAsync((pendingCheckpoint) ->
                                                        
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
+                       // We have to take the snapshot of the master hooks 
after the coordinator checkpoints has completed.
+                       // This is to ensure the tasks are checkpointed after 
the OperatorCoordinators in case
+                       // ExternallyInducedSource is used.
+                       final CompletableFuture<?> masterStatesComplete = 
coordinatorCheckpointsComplete
+                                       .thenComposeAsync(ignored -> {
+                                               PendingCheckpoint checkpoint =
+                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
       > the behavior is guaranteed by CompletableFuture, the assertion here 
would essentially be verifying CompletableFuture,
   
   Not exactly. It would be verifying that you chained a couple of futures and 
callbacks correctly. That the callback `foo()` is using `future1` result and is 
triggered once `future2` completes, and that `future1` and `future2` are 
chained (or am I still mis understanding this code?). Java library doesn't 
guarantee you that, but your code that is chaining the futures does. Which is 
outside of the `foo()`'s control, so from `foo()`s perspective, that's an 
external assumption, and falls under:
   
   > ensure the interface contract with users are not broken.
   
   Where "users" are function's callers.
   
   And as I wrote before. If something violates this assumption, and even if 
some unit test fail, it's a bit easier to understand a `checkState` compared to 
`NPE`. Note performance overhead of one if check doesn't matter here at all. 
Also it's harder of `checkState` to become outdated and misleading over time.
   
   But it's not big issue, so if you have so strong feelings about it, put a 
comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to