mateczagany commented on code in PR #27368:
URL: https://github.com/apache/flink/pull/27368#discussion_r2678510298
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointFailureHandlingITCase.java:
##########
@@ -161,24 +163,34 @@ private void buildGraph(StreamExecutionEnvironment env) {
.sinkTo(new DiscardingSink<>());
}
+ /**
+ * Trigger checkpoints until the first failing checkpoint. The exception
should come from {@link
+ * CheckpointStateOutputStream#close()} which should get called by the
channel state writer
+ * after catching an exception in {@link
CheckpointStateOutputStream#closeAndGetHandle()}. If
+ * there are no records in the state, only {@link
CheckpointStateOutputStream#close()} will be
+ * called, so `failOnClose` has to be checked here.
+ */
private void triggerFailingCheckpoint(
- JobID jobID, Class<TestException> expectedException, MiniCluster
miniCluster)
+ JobID jobID,
+ Class<TestException> expectedException,
+ SharedReference<AtomicBoolean> failOnCloseRef,
+ MiniCluster miniCluster)
throws InterruptedException, ExecutionException {
- while (true) {
+ boolean foundCheckpointFailure = false;
+ do {
Optional<Throwable> cpFailure =
miniCluster
.triggerCheckpoint(jobID)
.thenApply(ign -> Optional.empty())
.handle((ign, err) -> Optional.ofNullable(err))
.get();
- if (!cpFailure.isPresent()) {
- Thread.sleep(50); // trigger again - in case of no channel
data was written
- } else if (isCausedBy(cpFailure.get(), expectedException)) {
- return;
- } else {
- rethrow(cpFailure.get());
+
+ if (cpFailure.isPresent()) {
+ if (isCausedBy(cpFailure.get(), expectedException)) {
+ foundCheckpointFailure = true;
+ }
}
- }
+ } while (!foundCheckpointFailure || failOnCloseRef.get().get());
Review Comment:
I'm not sure why exactly, but after this PR, this test case got very flaky.
The issue was that sometimes there were no records in the channel state that
were being written to the checkpoint when calling `triggerFailingCheckpoint`,
so only `CheckpointStateOutputStream#close()` was called, then the second time
`triggerCheckpoint()` was called, another exception was thrown because
`failOnClose` was not set to false by
`CheckpointStateOutputStream#closeAndGetHandle()`, failing the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]