This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 3a9d144  [FLINK-22596] Active timeout is not triggered if there were 
no barriers
3a9d144 is described below

commit 3a9d14457f886937b8604607dee10d1046323dc7
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Fri May 7 16:51:46 2021 +0200

    [FLINK-22596] Active timeout is not triggered if there were no barriers
    
    The active timeout did not take effect if it elapsed before the first
    barrier arrived. The reason is that we did not reset the future for
    checkpoint complete on barrier announcement. Therefore we considered the
    completed status for previous checkpoint when evaluating the timeout for
    current checkpoint.
---
 .../SingleCheckpointBarrierHandler.java            |  2 +-
 .../checkpointing/AlternatingCheckpointsTest.java  | 66 +++++++++++++++++++++-
 2 files changed, 66 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
index 24624fd..238eb33 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.java
@@ -213,7 +213,6 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
             } else {
                 markAlignmentStart(barrier.getTimestamp());
             }
-            allBarriersReceivedFuture = new CompletableFuture<>();
         }
 
         // we must mark alignment end before calling 
currentState.barrierReceived which might
@@ -308,6 +307,7 @@ public class SingleCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
             }
             currentCheckpointId = barrierId;
             numBarriersReceived = 0;
+            allBarriersReceivedFuture = new CompletableFuture<>();
             return true;
         }
         return false;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
index 6996b83..8da39c2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/AlternatingCheckpointsTest.java
@@ -382,6 +382,64 @@ public class AlternatingCheckpointsTest {
         assertEquals(1, target.getTriggeredCheckpointCounter());
     }
 
+    /**
+     * This test tries to make sure that the first time out happens after 
processing {@link
+     * EventAnnouncement} but before/during processing the first {@link 
CheckpointBarrier} of at
+     * least second checkpoint.
+     */
+    @Test
+    public void testTimeoutAlignmentOnAnnouncementForSecondCheckpoint() throws 
Exception {
+        int numChannels = 2;
+        ValidatingCheckpointHandler target = new ValidatingCheckpointHandler();
+        CheckpointedInputGate gate =
+                new TestCheckpointedInputGateBuilder(
+                                numChannels, 
getTestBarrierHandlerFactory(target))
+                        .withRemoteChannels()
+                        .withMailboxExecutor()
+                        .build();
+
+        long alignmentTimeout = 100;
+        performFirstCheckpoint(numChannels, target, gate, alignmentTimeout);
+        assertEquals(1, target.getTriggeredCheckpointCounter());
+
+        Buffer checkpointBarrier = withTimeout(2, alignmentTimeout);
+
+        for (int i = 0; i < numChannels; i++) {
+            (getChannel(gate, i)).onBuffer(dataBuffer(), 1, 0);
+            (getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(), 
2, 0);
+        }
+
+        assertEquals(1, target.getTriggeredCheckpointCounter());
+        for (int i = 0; i < numChannels; i++) {
+            assertAnnouncement(gate);
+        }
+        assertEquals(1, target.getTriggeredCheckpointCounter());
+
+        clock.advanceTime(alignmentTimeout * 4, TimeUnit.MILLISECONDS);
+        // the barrier should overtake the data buffers
+        assertBarrier(gate);
+        assertEquals(2, target.getTriggeredCheckpointCounter());
+    }
+
+    private void performFirstCheckpoint(
+            int numChannels,
+            ValidatingCheckpointHandler target,
+            CheckpointedInputGate gate,
+            long alignmentTimeout)
+            throws IOException, InterruptedException {
+        Buffer checkpointBarrier = withTimeout(1, alignmentTimeout);
+        for (int i = 0; i < numChannels; i++) {
+            (getChannel(gate, i)).onBuffer(checkpointBarrier.retainBuffer(), 
0, 0);
+        }
+        assertEquals(0, target.getTriggeredCheckpointCounter());
+        for (int i = 0; i < numChannels; i++) {
+            assertAnnouncement(gate);
+        }
+        for (int i = 0; i < numChannels; i++) {
+            assertBarrier(gate);
+        }
+    }
+
     @Test
     public void testPassiveTimeoutAlignmentOnAnnouncement() throws Exception {
         int numChannels = 2;
@@ -984,8 +1042,14 @@ public class AlternatingCheckpointsTest {
     }
 
     private Buffer withTimeout(long alignmentTimeout) throws IOException {
+        return withTimeout(1, alignmentTimeout);
+    }
+
+    private Buffer withTimeout(int checkpointId, long alignmentTimeout) throws 
IOException {
         return barrier(
-                1, clock.relativeTimeMillis(), 
alignedWithTimeout(getDefault(), alignmentTimeout));
+                checkpointId,
+                clock.relativeTimeMillis(),
+                alignedWithTimeout(getDefault(), alignmentTimeout));
     }
 
     private Buffer barrier(long barrierId, long barrierTimestamp, 
CheckpointOptions options)

Reply via email to