This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f9071d930285a895f8a415bdae81d0cb6fb34cf3
Author: DG-Wangtao <taow...@deepglint.com>
AuthorDate: Sun Aug 2 17:33:38 2020 +0800

    [FLINK-18748][Runtime/Checkpointing] trigger checkpoint immediately if it's 
unperiodic
---
 .../flink/runtime/checkpoint/CheckpointRequestDecider.java  | 13 +++++++------
 .../runtime/checkpoint/CheckpointRequestDeciderTest.java    |  4 ++--
 2 files changed, 9 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 0850c28..d054561 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -136,6 +136,11 @@ class CheckpointRequestDecider {
                                .map(unused -> queuedRequests.pollFirst());
                }
 
+               CheckpointTriggerRequest first = queuedRequests.first();
+               if (first.isForce() || !first.isPeriodic) {
+                       return Optional.of(queuedRequests.pollFirst());
+               }
+
                long nextTriggerDelayMillis = 
nextTriggerDelayMillis(lastCompletionMs);
                if (nextTriggerDelayMillis > 0) {
                        return onTooEarly(nextTriggerDelayMillis);
@@ -146,15 +151,11 @@ class CheckpointRequestDecider {
 
        private Optional<CheckpointTriggerRequest> onTooEarly(long 
nextTriggerDelayMillis) {
                CheckpointTriggerRequest first = queuedRequests.first();
-               if (first.isForce()) {
-                       return Optional.of(queuedRequests.pollFirst());
-               } else if (first.isPeriodic) {
+               if (first.isPeriodic) {
                        queuedRequests.pollFirst().completeExceptionally(new 
CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS));
                        rescheduleTrigger.accept(nextTriggerDelayMillis);
-                       return Optional.empty();
-               } else {
-                       return Optional.empty();
                }
+               return Optional.empty();
        }
 
        private long nextTriggerDelayMillis(long 
lastCheckpointCompletionRelativeTime) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index 7facf11..f2a3536 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -119,13 +119,13 @@ public class CheckpointRequestDeciderTest {
        public void testSavepointTiming() {
                testTiming(regularSavepoint(), TriggerExpectation.IMMEDIATELY);
                testTiming(periodicSavepoint(), TriggerExpectation.IMMEDIATELY);
-               testTiming(nonForcedSavepoint(), 
TriggerExpectation.AFTER_PAUSE);
+               testTiming(nonForcedSavepoint(), 
TriggerExpectation.IMMEDIATELY);
        }
 
        @Test
        public void testCheckpointTiming() {
                testTiming(regularCheckpoint(), TriggerExpectation.DROPPED);
-               testTiming(manualCheckpoint(), TriggerExpectation.AFTER_PAUSE);
+               testTiming(manualCheckpoint(), TriggerExpectation.IMMEDIATELY);
        }
 
        private enum TriggerExpectation {IMMEDIATELY, AFTER_PAUSE, DROPPED}

Reply via email to