Izeren commented on code in PR #27050:
URL: https://github.com/apache/flink/pull/27050#discussion_r2404605917
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
+ .as("reportCompletedCheckpoint should be started soon when
checkpoint is acked.")
+ .isNull();
+
+ for (int i = 0; i < 30; i++) {
Review Comment:
Similarly to above, I am not sure you can confirm whether expected change
did not occur because of being blocked vs corresponding thread being inactive.
Will be better to wait indefinitely here
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
+ .as("reportCompletedCheckpoint should be started soon when
checkpoint is acked.")
+ .isNull();
+
+ for (int i = 0; i < 30; i++) {
+ assertThat(checkpointFuture)
+ .as(
+ "Checkpoint future should not complete while
reportCompletedCheckpoint is blocked")
+ .isNotDone();
+ Thread.sleep(100);
+ }
+
+ tracker.getReportBlockingFuture().complete(null);
+
+ CompletedCheckpoint result = checkpointFuture.get(5, TimeUnit.SECONDS);
Review Comment:
ditto
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
Review Comment:
That is likely to end up being flaky test. Test in CI could freeze for 15min
and more, so 20 seconds timeout may not be sufficient in general.
I suggest to use indefinite timeout of at least a few hours
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1385,8 +1385,8 @@ private void completePendingCheckpoint(PendingCheckpoint
pendingCheckpoint)
lastSubsumed = null;
}
-
pendingCheckpoint.getCompletionFuture().complete(completedCheckpoint);
Review Comment:
I have concerns that change like this can have potential impacts like:
1. Deadlock / race condition if `reportCompletedCheckpoint` would trigger
any handler that also waits on the checkpoint future before its completion (in
general, unlikely situation, and should be caught by existing test)
2. Checkpoint completion will be slightly delayed, but reporting is a quick
operation, so doesn't seem to be critical
3. If reporting throws exception it will result in checkpoint being
completed exceptionally. Could we confirm that this behaviour matches the
previous one?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -4409,4 +4410,106 @@ public boolean isDiscarded() {
}
}
}
+
+ /**
+ * Tests that Checkpoint CompletableFuture completion happens after
reportCompletedCheckpoint
+ * finishes. This ensures that when external components are notified via
the CompletableFuture
+ * that a checkpoint is complete, all statistics have already been updated.
+ */
+ @Test
+ void testCompletionFutureCompletesAfterReporting() throws Exception {
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ ControllableCheckpointStatsTracker tracker = new
ControllableCheckpointStatsTracker();
+
+ CheckpointCoordinator coordinator =
+ new CheckpointCoordinatorBuilder()
+ .setCheckpointStatsTracker(tracker)
+ .setTimer(manuallyTriggeredScheduledExecutor)
+ .build(graph);
+
+ CompletableFuture<CompletedCheckpoint> checkpointFuture =
+ coordinator.triggerCheckpoint(false);
+ manuallyTriggeredScheduledExecutor.triggerAll();
+
+ CompletableFuture<Void> ackTask =
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ ackCheckpoint(
+ 1L,
+ coordinator,
+ jobVertexID,
+ graph,
+ handle(),
+ handle(),
+ handle());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(tracker.getReportStartedFuture().get(20, TimeUnit.SECONDS))
+ .as("reportCompletedCheckpoint should be started soon when
checkpoint is acked.")
+ .isNull();
+
+ for (int i = 0; i < 30; i++) {
+ assertThat(checkpointFuture)
+ .as(
+ "Checkpoint future should not complete while
reportCompletedCheckpoint is blocked")
+ .isNotDone();
+ Thread.sleep(100);
+ }
+
+ tracker.getReportBlockingFuture().complete(null);
+
+ CompletedCheckpoint result = checkpointFuture.get(5, TimeUnit.SECONDS);
+ assertThat(result)
+ .as("Checkpoint future should complete after
reportCompletedCheckpoint finishes")
+ .isNotNull();
+
+ ackTask.get(5, TimeUnit.SECONDS);
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]