1996fanrui commented on code in PR #27050:
URL: https://github.com/apache/flink/pull/27050#discussion_r2424915961
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
Review Comment:
I would like to clarify two types of CI timeouts: one is the total timeout
of CI, and the other is the timeout of a single unit test or some logical
timeout within the unit test.
- For the former, I think 15 minutes or more is reasonable.
- For the latter, if the timeout for each single test is 15 minutes, the
total CI duration will be terrible. Flink may have more than 10k tests.
The test sometimes is unstable, but not that bad. The default policy of
Flink CI is to fail if there is no progress for 15 consecutive minutes.
However, these are generally caused by bugs, for example, deadlock or something
like this. It's rare to see a CI process stuck for 15 minutes due to a lack of
resources.
For this case, `tracker.getReportStartedFuture()` is so quick on my local,
it always be less than 100 ms, that is why I think 20 seconds is safe here.
For other examples, I checked some callers[1][2] from flink code, some of
them are 10 seconds, and some of them are 60 seconds. I could update it from 20
seconds to 60 seconds if you think 20 seconds is not safe enough. TBH, 15
minutes is a little long, it will delay the exception or CI if there are some
bugs.
[1]
https://github.com/apache/flink/blob/cc55c56ace4401c2a4023153d9e17001ab5fcc85/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java#L79
[2].
https://github.com/apache/flink/blob/cc55c56ace4401c2a4023153d9e17001ab5fcc85/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java#L245
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]