Izeren commented on code in PR #27050:
URL: https://github.com/apache/flink/pull/27050#discussion_r2428969813
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
+ .as("reportCompletedCheckpoint should be started soon when
checkpoint is acked.")
+ .isNull();
+
+ for (int i = 0; i < 30; i++) {
Review Comment:
In this case, how would your test distinguish between: `Future wasn't
complete because of "happens before" condition` vs `Future wasn't complete
because VM froze and responsible thread was not making progress for more than 3
seconds`.
I am less concerned about this one as it shouldn't introduce flakiness, but
testing it this way you have weaker guarantees of "happens before" condition
being actually tested.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
Review Comment:
The reason I brought it up is that I got similar feedback from @dmvk in the
past, where he suggested that CI VM can freeze for 15 min even if the test is
quick, because there are multiple tests that are running and you don't have
guarantee that your particular test will be always executed quickly.
My overall view on this is the following, If otherwise quick test for some
reason takes longer than 15 minutes then either it faced something like a
deadlock or overall CI run was impacted by "bad change"/"external factors".
Unless you have a deadlock in your own test, the whole CI run is more likely to
timeout than not, so it doesn't make things worse. For the cases when you do
have a deadlock, per test timeout could allow you to verify more tests in a
failed run, which is beneficial, but the benefit is limited to the non-parallel
suit that fails.
To sum up, I don't see big difference between 15 min and 1 hour, but 20
seconds is very likely not enough
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]