This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 3a9d144 [FLINK-22596] Active timeout is not triggered if there were no barriers 3a9d144 is described below commit 3a9d14457f886937b8604607dee10d1046323dc7 Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri May 7 16:51:46 2021 +0200 [FLINK-22596] Active timeout is not triggered if there were no barriers The active timeout did not take effect if it elapsed before the first barrier arrived. The reason is that we did not reset the future for checkpoint complete on barrier announcement. Therefore we considered the completed status for previous checkpoint when evaluating the timeout for current checkpoint. --- .../SingleCheckpointBarrierHandler.java | 2 +- .../checkpointing/AlternatingCheckpointsTest.java | 66 +++++++++++++++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java index 24624fd..238eb33 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java @@ -213,7 +213,6 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { } else { markAlignmentStart(barrier.getTimestamp()); } - allBarriersReceivedFuture = new CompletableFuture<>(); } // we must mark alignment end before calling currentState.barrierReceived which might @@ -308,6 +307,7 @@ public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler { } currentCheckpointId = barrierId; numBarriersReceived = 0; + allBarriersReceivedFuture = new CompletableFuture<>(); return true; } return false; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java index 6996b83..8da39c2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java @@ -382,6 +382,64 @@ public class AlternatingCheckpointsTest { assertEquals(1, target.getTriggeredCheckpointCounter()); } + /** + * This test tries to make sure that the first time out happens after processing {@link + * EventAnnouncement} but before/during processing the first {@link CheckpointBarrier} of at + * least second checkpoint. + */ + @Test + public void testTimeoutAlignmentOnAnnouncementForSecondCheckpoint() throws Exception { + int numChannels = 2; + ValidatingCheckpointHandler target = new ValidatingCheckpointHandler(); + CheckpointedInputGate gate = + new TestCheckpointedInputGateBuilder( + numChannels, getTestBarrierHandlerFactory(target)) + .withRemoteChannels() + .withMailboxExecutor() + .build(); + + long alignmentTimeout = 100; + performFirstCheckpoint(numChannels, target, gate, alignmentTimeout); + assertEquals(1, target.getTriggeredCheckpointCounter()); + + Buffer checkpointBarrier = withTimeout(2, alignmentTimeout); + + for (int i = 0; i < numChannels; i++) { + (getChannel(gate, i)).onBuffer(dataBuffer(), 1, 0); + (getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(), 2, 0); + } + + assertEquals(1, target.getTriggeredCheckpointCounter()); + for (int i = 0; i < numChannels; i++) { + assertAnnouncement(gate); + } + assertEquals(1, target.getTriggeredCheckpointCounter()); + + clock.advanceTime(alignmentTimeout * 4, TimeUnit.MILLISECONDS); + // the barrier should overtake the data buffers + assertBarrier(gate); + assertEquals(2, target.getTriggeredCheckpointCounter()); + } + + private void performFirstCheckpoint( + int numChannels, + ValidatingCheckpointHandler target, + CheckpointedInputGate gate, + long alignmentTimeout) + throws IOException, InterruptedException { + Buffer checkpointBarrier = withTimeout(1, alignmentTimeout); + for (int i = 0; i < numChannels; i++) { + (getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(), 0, 0); + } + assertEquals(0, target.getTriggeredCheckpointCounter()); + for (int i = 0; i < numChannels; i++) { + assertAnnouncement(gate); + } + for (int i = 0; i < numChannels; i++) { + assertBarrier(gate); + } + } + @Test public void testPassiveTimeoutAlignmentOnAnnouncement() throws Exception { int numChannels = 2; @@ -984,8 +1042,14 @@ public class AlternatingCheckpointsTest { } private Buffer withTimeout(long alignmentTimeout) throws IOException { + return withTimeout(1, alignmentTimeout); + } + + private Buffer withTimeout(int checkpointId, long alignmentTimeout) throws IOException { return barrier( - 1, clock.relativeTimeMillis(), alignedWithTimeout(getDefault(), alignmentTimeout)); + checkpointId, + clock.relativeTimeMillis(), + alignedWithTimeout(getDefault(), alignmentTimeout)); } private Buffer barrier(long barrierId, long barrierTimestamp, CheckpointOptions options)