This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit f9071d930285a895f8a415bdae81d0cb6fb34cf3 Author: DG-Wangtao <taow...@deepglint.com> AuthorDate: Sun Aug 2 17:33:38 2020 +0800 [FLINK-18748][Runtime/Checkpointing] trigger checkpoint immediately if it's unperiodic --- .../flink/runtime/checkpoint/CheckpointRequestDecider.java | 13 +++++++------ .../runtime/checkpoint/CheckpointRequestDeciderTest.java | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java index 0850c28..d054561 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java @@ -136,6 +136,11 @@ class CheckpointRequestDecider { .map(unused -> queuedRequests.pollFirst()); } + CheckpointTriggerRequest first = queuedRequests.first(); + if (first.isForce() || !first.isPeriodic) { + return Optional.of(queuedRequests.pollFirst()); + } + long nextTriggerDelayMillis = nextTriggerDelayMillis(lastCompletionMs); if (nextTriggerDelayMillis > 0) { return onTooEarly(nextTriggerDelayMillis); @@ -146,15 +151,11 @@ class CheckpointRequestDecider { private Optional<CheckpointTriggerRequest> onTooEarly(long nextTriggerDelayMillis) { CheckpointTriggerRequest first = queuedRequests.first(); - if (first.isForce()) { - return Optional.of(queuedRequests.pollFirst()); - } else if (first.isPeriodic) { + if (first.isPeriodic) { queuedRequests.pollFirst().completeExceptionally(new CheckpointException(MINIMUM_TIME_BETWEEN_CHECKPOINTS)); rescheduleTrigger.accept(nextTriggerDelayMillis); - return Optional.empty(); - } else { - return Optional.empty(); } + return Optional.empty(); } private long nextTriggerDelayMillis(long lastCheckpointCompletionRelativeTime) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java index 7facf11..f2a3536 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java @@ -119,13 +119,13 @@ public class CheckpointRequestDeciderTest { public void testSavepointTiming() { testTiming(regularSavepoint(), TriggerExpectation.IMMEDIATELY); testTiming(periodicSavepoint(), TriggerExpectation.IMMEDIATELY); - testTiming(nonForcedSavepoint(), TriggerExpectation.AFTER_PAUSE); + testTiming(nonForcedSavepoint(), TriggerExpectation.IMMEDIATELY); } @Test public void testCheckpointTiming() { testTiming(regularCheckpoint(), TriggerExpectation.DROPPED); - testTiming(manualCheckpoint(), TriggerExpectation.AFTER_PAUSE); + testTiming(manualCheckpoint(), TriggerExpectation.IMMEDIATELY); } private enum TriggerExpectation {IMMEDIATELY, AFTER_PAUSE, DROPPED}