Izeren commented on code in PR #27050:
URL: https://github.com/apache/flink/pull/27050#discussion_r2432769036


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
             }
         }
     }
+
+    /**
+     * Tests that Checkpoint CompletableFuture completion happens after 
reportCompletedCheckpoint
+     * finishes. This ensures that when external components are notified via 
the CompletableFuture
+     * that a checkpoint is complete, all statistics have already been updated.
+     */
+    @Test
+    void testCompletionFutureCompletesAfterReporting() throws Exception {
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ControllableCheckpointStatsTracker tracker = new 
ControllableCheckpointStatsTracker();
+
+        CheckpointCoordinator coordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointStatsTracker(tracker)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .build(graph);
+
+        CompletableFuture<CompletedCheckpoint> checkpointFuture =
+                coordinator.triggerCheckpoint(false);
+        manuallyTriggeredScheduledExecutor.triggerAll();
+
+        CompletableFuture<Void> ackTask =
+                CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                ackCheckpoint(
+                                        1L,
+                                        coordinator,
+                                        jobVertexID,
+                                        graph,
+                                        handle(),
+                                        handle(),
+                                        handle());
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
+                .as("reportCompletedCheckpoint should be started soon when 
checkpoint is acked.")
+                .isNull();
+
+        for (int i = 0; i < 30; i++) {

Review Comment:
   If it is non-trivial to rewrite this test without busy wait to avoid this 
issue, I am happy to accept this tests implementation as is. Most of the time 
this test will not be a subject to VM freeze. Even if some bad change will be 
lucky enough to get green CI tests and be merged, we will see tests being red 
in other runs shortly after. Assuming that we will still be able to pinpoint 
the issue promptly. I don't have objections to keep it as is. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to