Izeren commented on code in PR #27050:
URL: https://github.com/apache/flink/pull/27050#discussion_r2428969813


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
             }
         }
     }
+
+    /**
+     * Tests that Checkpoint CompletableFuture completion happens after 
reportCompletedCheckpoint
+     * finishes. This ensures that when external components are notified via 
the CompletableFuture
+     * that a checkpoint is complete, all statistics have already been updated.
+     */
+    @Test
+    void testCompletionFutureCompletesAfterReporting() throws Exception {
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ControllableCheckpointStatsTracker tracker = new 
ControllableCheckpointStatsTracker();
+
+        CheckpointCoordinator coordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointStatsTracker(tracker)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build(graph);
+
+        CompletableFuture<CompletedCheckpoint> checkpointFuture =
+                coordinator.triggerCheckpoint(false);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        CompletableFuture<Void> ackTask =
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                ackCheckpoint(
+                                        1L,
+                                        coordinator,
+                                        jobVertexID,
+                                        graph,
+                                        handle(),
+                                        handle(),
+                                        handle());
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
+                .as("reportCompletedCheckpoint should be started soon when 
checkpoint is acked.")
+                .isNull();
+
+        for (int i = 0; i < 30; i++) {

Review Comment:
   In this case, how would your test distinguish between: `Future wasn't 
complete because of "happens before" condition` vs `Future wasn't complete 
because VM froze and responsible thread was not making progress for more than 3 
seconds`. 
   
   I am less concerned about this one as it shouldn't introduce flakiness, but 
testing it this way you have weaker guarantees of "happens before" condition 
being actually tested. 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
             }
         }
     }
+
+    /**
+     * Tests that Checkpoint CompletableFuture completion happens after 
reportCompletedCheckpoint
+     * finishes. This ensures that when external components are notified via 
the CompletableFuture
+     * that a checkpoint is complete, all statistics have already been updated.
+     */
+    @Test
+    void testCompletionFutureCompletesAfterReporting() throws Exception {
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ControllableCheckpointStatsTracker tracker = new 
ControllableCheckpointStatsTracker();
+
+        CheckpointCoordinator coordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointStatsTracker(tracker)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build(graph);
+
+        CompletableFuture<CompletedCheckpoint> checkpointFuture =
+                coordinator.triggerCheckpoint(false);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        CompletableFuture<Void> ackTask =
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                ackCheckpoint(
+                                        1L,
+                                        coordinator,
+                                        jobVertexID,
+                                        graph,
+                                        handle(),
+                                        handle(),
+                                        handle());
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))

Review Comment:
   The reason I brought it up is that I got similar feedback from @dmvk in the 
past, where he suggested that CI VM can freeze for 15 min even if the test is 
quick, because there are multiple tests that are running and you don't have 
guarantee that your particular test will be always executed quickly. 
   
   My overall view on this is the following, If otherwise quick test for some 
reason takes longer than 15 minutes then either it faced something like a 
deadlock or overall CI run was impacted by "bad change"/"external factors". 
Unless you have a deadlock in your own test, the whole CI run is more likely to 
timeout than not, so it doesn't make things worse. For the cases when you do 
have a deadlock, per test timeout could allow you to verify more tests in a 
failed run, which is beneficial, but the benefit is limited to the non-parallel 
suit that fails. 
   
   To sum up, I don't see big difference between 15 min and 1 hour, but 20 
seconds is very likely not enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to