zentol commented on a change in pull request #17693:
URL: https://github.com/apache/flink/pull/17693#discussion_r744779287
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
##########
@@ -578,6 +580,55 @@ public void testReportTaskFinishedOperators() throws
IOException {
contains(ACK_TASKS.get(0).getVertex()));
}
+ /**
+ * We need to trigger disposal prior completing the checkpoint future to
avoid a race condition
+ * with the CheckpointCleaner shutdown.
+ */
+ @Test
+ public void testAbortTriggersDisposalPriorCompletingCheckpointFuture()
throws Exception {
+ final PendingCheckpoint pending =
+ createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
+ final CompletableFuture<Void> completionFuture =
+ pending.getCompletionFuture()
+ .handle(
+ (result, error) -> {
+ assertNull(result);
+ assertNotNull(error);
+ assertTrue(pending.isDisposed());
+ return null;
+ });
Review comment:
```suggestion
.thenAccept(
result -> assertTrue(pending.isDisposed()));
```
If the future fails we will fail later on anyway, so we can focus on whether
the checkpoint is disposed or not for the assertion.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
##########
@@ -578,6 +580,55 @@ public void testReportTaskFinishedOperators() throws
IOException {
contains(ACK_TASKS.get(0).getVertex()));
}
+ /**
+ * We need to trigger disposal prior completing the checkpoint future to
avoid a race condition
+ * with the CheckpointCleaner shutdown.
+ */
+ @Test
+ public void testAbortTriggersDisposalPriorCompletingCheckpointFuture()
throws Exception {
+ final PendingCheckpoint pending =
+ createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
+ final CompletableFuture<Void> completionFuture =
+ pending.getCompletionFuture()
+ .handle(
+ (result, error) -> {
+ assertNull(result);
+ assertNotNull(error);
+ assertTrue(pending.isDisposed());
+ return null;
+ });
+ abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
+ completionFuture.get();
+ }
+
+ /**
+ * We need to trigger disposal prior completing the checkpoint future to
avoid a race condition
+ * with the CheckpointCleaner shutdown.
+ */
+ @Test
+ public void testFinalizeTriggersDisposalPriorCompletingCheckpointFuture()
throws Exception {
+ final PendingCheckpoint pending =
+ createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
+ pending.acknowledgeTask(ATTEMPT_ID, null, new CheckpointMetrics(),
null);
+ assertTrue(pending.areTasksFullyAcknowledged());
+ final CompletableFuture<Void> completionFuture =
+ pending.getCompletionFuture()
+ .handle(
+ (result, error) -> {
+ assertNotNull(result);
+ assertNull(error);
Review comment:
same as above
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
##########
@@ -578,6 +580,55 @@ public void testReportTaskFinishedOperators() throws
IOException {
contains(ACK_TASKS.get(0).getVertex()));
}
+ /**
+ * We need to trigger disposal prior completing the checkpoint future to
avoid a race condition
+ * with the CheckpointCleaner shutdown.
+ */
+ @Test
+ public void testAbortTriggersDisposalPriorCompletingCheckpointFuture()
throws Exception {
+ final PendingCheckpoint pending =
+ createPendingCheckpoint(
+ CheckpointProperties.forCheckpoint(
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
+ final CompletableFuture<Void> completionFuture =
+ pending.getCompletionFuture()
+ .handle(
+ (result, error) -> {
+ assertNull(result);
+ assertNotNull(error);
+ assertTrue(pending.isDisposed());
+ return null;
+ });
+ abort(pending, CheckpointFailureReason.CHECKPOINT_DECLINED);
+ completionFuture.get();
+ }
+
+ /**
+ * We need to trigger disposal prior completing the checkpoint future to
avoid a race condition
+ * with the CheckpointCleaner shutdown.
Review comment:
It is a bit weird that we mention an issue with the CheckpointCleaner
shutdown, but then not actually do that in the test; at least it threw me off a
bit.
I'm wondering if we shouldn't add a test for that scenario, essentially just
```
f = pending.getCompletionFuture().thenRun(cleaner.shutdownAsync())
pending.abort(...)
f.get()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]