becketqin commented on a change in pull request #13044:
URL: https://github.com/apache/flink/pull/13044#discussion_r477159783



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */

Review comment:
       I haven't got time to update the PR yet. The upcoming update has 
separated the PRs into two commits and added test coverage for both cases.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = 
getCheckpointCoordinator(jid, vertex1, vertex2);
+                       AtomicReference<Long> checkpointIdRef = new 
AtomicReference<>();
+
+                       OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+                       OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+                       TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
+                       TaskStateSnapshot taskOperatorSubtaskStates2 = 
mock(TaskStateSnapshot.class);
+                       OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
+                       OperatorSubtaskState subtaskState2 = 
mock(OperatorSubtaskState.class);
+                       
when(taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn(subtaskState1);
+                       
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);
+
+                       coord.addMasterHook(new 
MasterTriggerRestoreHook<Integer>() {
+                               @Override
+                               public String getIdentifier() {
+                                       return "anything";
+                               }
+
+                               @Override
+                               public CompletableFuture<Integer> 
triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws 
Exception {
+                                       // Acknowledge the checkpoint in the 
master hooks so the task snapshots complete before
+                                       // the master state snapshot completes.
+                                       checkpointIdRef.set(checkpointId);
+                                       AcknowledgeCheckpoint 
acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
+                                               jid, attemptID1, checkpointId, 
new CheckpointMetrics(), taskOperatorSubtaskStates1);
+                                       AcknowledgeCheckpoint 
acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
+                                               jid, attemptID2, checkpointId, 
new CheckpointMetrics(), taskOperatorSubtaskStates2);
+                                       
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1, 
TASK_MANAGER_LOCATION_INFO);
+                                       
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2, 
TASK_MANAGER_LOCATION_INFO);
+                                       return null;
+                               }
+
+                               @Override
+                               public void restoreCheckpoint(long 
checkpointId, Integer checkpointData) throws Exception {
+
+                               }
+
+                               @Override
+                               public SimpleVersionedSerializer<Integer> 
createCheckpointDataSerializer() {
+                                       return new 
SimpleVersionedSerializer<Integer>() {
+                                               @Override
+                                               public int getVersion() {
+                                                       return 0;
+                                               }
+
+                                               @Override
+                                               public byte[] serialize(Integer 
obj) throws IOException {
+                                                       return new byte[0];
+                                               }
+
+                                               @Override
+                                               public Integer deserialize(int 
version, byte[] serialized) throws IOException {
+                                                       return null;
+                                               }
+                                       };
+                               }
+                       });
+
+                       assertEquals(0, coord.getNumberOfPendingCheckpoints());
+                       assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+                       assertEquals(0, 
manuallyTriggeredScheduledExecutor.getScheduledTasks().size());
+
+                       // trigger the first checkpoint. this should succeed
+                       final CompletableFuture<CompletedCheckpoint> 
checkpointFuture = coord.triggerCheckpoint(false);
+                       manuallyTriggeredScheduledExecutor.triggerAll();
+                       
assertFalse(checkpointFuture.isCompletedExceptionally());

Review comment:
       Technically speaking, `future.get()` does not provide the same check 
here. It is also possible that the future is not completed after `triggerAll()` 
is invoked, either expected or not. In this case, `future.get()` will block 
forever.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -530,15 +530,22 @@ private void 
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
                                                        
request.getOnCompletionFuture()),
                                                timer);
 
-                       final CompletableFuture<?> masterStatesComplete = 
pendingCheckpointCompletableFuture
-                                       .thenCompose(this::snapshotMasterState);
-
                        final CompletableFuture<?> 
coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
                                        .thenComposeAsync((pendingCheckpoint) ->
                                                        
OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
                                                                        
coordinatorsToCheckpoint, pendingCheckpoint, timer),
                                                        timer);
 
+                       // We have to take the snapshot of the master hooks 
after the coordinator checkpoints has completed.
+                       // This is to ensure the tasks are checkpointed after 
the OperatorCoordinators in case
+                       // ExternallyInducedSource is used.
+                       final CompletableFuture<?> masterStatesComplete = 
coordinatorCheckpointsComplete
+                                       .thenComposeAsync(ignored -> {
+                                               PendingCheckpoint checkpoint =
+                                                       
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

Review comment:
       Adding more comments makes sense to me. We can add a comment saying "The 
pending checkpoint will always be non-null. We use FutureUtil here to catch 
checked exceptions to make the compiler happy." 
   
   However, I am not sure if it is a common practice to add `checkState()` to 
verify the behavior of Java library code. 
   
   Assertions are usually used in three cases:
   1. Sanity check for user provided arguments / functions / pluggables, etc, 
to ensure the interface contract with users are not broken.
   2. Sanity check on something that may change out of the control of the 
current code, e.g. a variable related to wall clock time.
   3. Test the code written by the programmer who puts the assertion there, 
e.g. Java assertions.
   
   The first two cases are essentially sanity check to make sure the code logic 
is still valid. The last case actually sounds an anti-pattern to me and should 
be replaced by proper unit tests IMO.
   
   In Flink, `checkState()` is our version of Java `assertion`. However, Java 
assertion is designed for development and test purpose and thus by default 
turned off so they won't run in production. Even for this case, the assertions 
are used to verify the code written by the programmer themselves, rather than 
the code from a 3rd party library.
   
   In this particular case, the behavior is guaranteed by `CompletableFuture`, 
the assertion here would essentially be verifying `CompletableFuture`, if we do 
this here, should we also put checks everywhere else that other libraries are 
invoked?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();
+
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       final ExecutionAttemptID attemptID2 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+                       ExecutionVertex vertex2 = 
mockExecutionVertex(attemptID2,
+                               (executionAttemptID, jobId, checkpointId, 
timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
+
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = 
getCheckpointCoordinator(jid, vertex1, vertex2);
+                       AtomicReference<Long> checkpointIdRef = new 
AtomicReference<>();
+
+                       OperatorID opID1 = 
OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+                       OperatorID opID2 = 
OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+                       TaskStateSnapshot taskOperatorSubtaskStates1 = 
mock(TaskStateSnapshot.class);
+                       TaskStateSnapshot taskOperatorSubtaskStates2 = 
mock(TaskStateSnapshot.class);
+                       OperatorSubtaskState subtaskState1 = 
mock(OperatorSubtaskState.class);
+                       OperatorSubtaskState subtaskState2 = 
mock(OperatorSubtaskState.class);
+                       
when(taskOperatorSubtaskStates1.getSubtaskStateByOperatorID(opID1)).thenReturn(subtaskState1);
+                       
when(taskOperatorSubtaskStates2.getSubtaskStateByOperatorID(opID2)).thenReturn(subtaskState2);

Review comment:
       This is a legacy class, so I just followed the pattern to make the tests 
consistent in this class. Personally I prefer removing mockito in a separate 
ticket at the unit test class granularity instead of at per test method 
granularity.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
##########
@@ -2260,6 +2263,121 @@ public void testSavepointScheduledInUnalignedMode() 
throws Exception {
                }
        }
 
+       /**
+        * Test that the checkpoint still behave correctly when the task 
checkpoint is triggered by the
+        * master hooks and finished before the master checkpoint.
+        */
+       @Test
+       public void testTaskCheckpointTriggeredByMasterHooks() {
+               try {
+                       final JobID jid = new JobID();

Review comment:
       I don't have a strong opinion on this. `jid` and `coord` are common 
names used by other test methods in this class, and I also feel they are 
intuitive enough, so I just followed the same convention. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to