This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 19468a25a906fd1d8bc18113ee6facd2128d5855
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Wed Aug 12 15:21:52 2020 +0200

    [FLINK-18856][checkpointing] Synchronize access to 
CheckpointCoordinator.lastCheckpointCompletionRelativeTime
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 29 ++++++++----
 .../checkpoint/CheckpointRequestDecider.java       | 52 +++++++---------------
 .../checkpoint/CheckpointCoordinatorTest.java      | 43 ++++++++++++++++++
 .../checkpoint/CheckpointRequestDeciderTest.java   |  4 +-
 4 files changed, 83 insertions(+), 45 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 76f9efd..d7b5cee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -317,8 +317,7 @@ public class CheckpointCoordinator {
                        this::rescheduleTrigger,
                        this.clock,
                        this.minPauseBetweenCheckpoints,
-                       this.pendingCheckpoints::size,
-                       this.lock);
+                       this.pendingCheckpoints::size);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -498,9 +497,7 @@ public class CheckpointCoordinator {
                }
 
                CheckpointTriggerRequest request = new 
CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, 
advanceToEndOfTime);
-               requestDecider
-                       .chooseRequestToExecute(request, isTriggering, 
lastCheckpointCompletionRelativeTime)
-                       .ifPresent(this::startTriggeringCheckpoint);
+               
chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);
                return request.onCompletionPromise;
        }
 
@@ -833,7 +830,19 @@ public class CheckpointCoordinator {
        }
 
        private void executeQueuedRequest() {
-               requestDecider.chooseQueuedRequestToExecute(isTriggering, 
lastCheckpointCompletionRelativeTime).ifPresent(this::startTriggeringCheckpoint);
+               
chooseQueuedRequestToExecute().ifPresent(this::startTriggeringCheckpoint);
+       }
+
+       private Optional<CheckpointTriggerRequest> 
chooseQueuedRequestToExecute() {
+               synchronized (lock) {
+                       return 
requestDecider.chooseQueuedRequestToExecute(isTriggering, 
lastCheckpointCompletionRelativeTime);
+               }
+       }
+
+       private Optional<CheckpointTriggerRequest> 
chooseRequestToExecute(CheckpointTriggerRequest request) {
+               synchronized (lock) {
+                       return requestDecider.chooseRequestToExecute(request, 
isTriggering, lastCheckpointCompletionRelativeTime);
+               }
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -1412,7 +1421,9 @@ public class CheckpointCoordinator {
        @Deprecated
        @VisibleForTesting
        PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
-               return requestDecider.getTriggerRequestQueue();
+               synchronized (lock) {
+                       return requestDecider.getTriggerRequestQueue();
+               }
        }
 
        public boolean isTriggering() {
@@ -1548,7 +1559,9 @@ public class CheckpointCoordinator {
        }
 
        int getNumQueuedRequests() {
-               return requestDecider.getNumQueuedRequests();
+               synchronized (lock) {
+                       return requestDecider.getNumQueuedRequests();
+               }
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
index 6a9f779..124dc13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
@@ -25,8 +25,6 @@ import org.apache.flink.util.clock.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
-
 import java.util.Comparator;
 import java.util.NavigableSet;
 import java.util.Optional;
@@ -51,8 +49,6 @@ class CheckpointRequestDecider {
        private final Clock clock;
        private final long minPauseBetweenCheckpoints;
        private final Supplier<Integer> pendingCheckpointsSizeSupplier;
-       private final Object lock;
-       @GuardedBy("lock")
        private final NavigableSet<CheckpointTriggerRequest> queuedRequests = 
new TreeSet<>(checkpointTriggerRequestsComparator());
        private final int maxQueuedRequests;
 
@@ -61,15 +57,13 @@ class CheckpointRequestDecider {
                        Consumer<Long> rescheduleTrigger,
                        Clock clock,
                        long minPauseBetweenCheckpoints,
-                       Supplier<Integer> pendingCheckpointsSizeSupplier,
-                       Object lock) {
+                       Supplier<Integer> pendingCheckpointsSizeSupplier) {
                this(
                        maxConcurrentCheckpointAttempts,
                        rescheduleTrigger,
                        clock,
                        minPauseBetweenCheckpoints,
                        pendingCheckpointsSizeSupplier,
-                       lock,
                        DEFAULT_MAX_QUEUED_REQUESTS
                );
        }
@@ -80,7 +74,6 @@ class CheckpointRequestDecider {
                        Clock clock,
                        long minPauseBetweenCheckpoints,
                        Supplier<Integer> pendingCheckpointsSizeSupplier,
-                       Object lock,
                        int maxQueuedRequests) {
                Preconditions.checkArgument(maxConcurrentCheckpointAttempts > 
0);
                Preconditions.checkArgument(maxQueuedRequests > 0);
@@ -89,43 +82,37 @@ class CheckpointRequestDecider {
                this.clock = clock;
                this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
                this.pendingCheckpointsSizeSupplier = 
pendingCheckpointsSizeSupplier;
-               this.lock = lock;
                this.maxQueuedRequests = maxQueuedRequests;
        }
 
        Optional<CheckpointTriggerRequest> 
chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean 
isTriggering, long lastCompletionMs) {
-               synchronized (lock) {
-                       if (queuedRequests.size() >= maxQueuedRequests && 
!queuedRequests.last().isPeriodic) {
-                               // there are only non-periodic (ie 
user-submitted) requests enqueued - retain them and drop the new one
-                               newRequest.completeExceptionally(new 
CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
-                               return Optional.empty();
-                       } else {
-                               queuedRequests.add(newRequest);
-                               if (queuedRequests.size() > maxQueuedRequests) {
-                                       
queuedRequests.pollLast().completeExceptionally(new 
CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
-                               }
-                               Optional<CheckpointTriggerRequest> request = 
chooseRequestToExecute(isTriggering, lastCompletionMs);
-                               
request.ifPresent(CheckpointRequestDecider::logInQueueTime);
-                               return request;
+               if (queuedRequests.size() >= maxQueuedRequests && 
!queuedRequests.last().isPeriodic) {
+                       // there are only non-periodic (ie user-submitted) 
requests enqueued - retain them and drop the new one
+                       newRequest.completeExceptionally(new 
CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
+                       return Optional.empty();
+               } else {
+                       queuedRequests.add(newRequest);
+                       if (queuedRequests.size() > maxQueuedRequests) {
+                               
queuedRequests.pollLast().completeExceptionally(new 
CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS));
                        }
-               }
-       }
-
-       Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean 
isTriggering, long lastCompletionMs) {
-               synchronized (lock) {
                        Optional<CheckpointTriggerRequest> request = 
chooseRequestToExecute(isTriggering, lastCompletionMs);
                        
request.ifPresent(CheckpointRequestDecider::logInQueueTime);
                        return request;
                }
        }
 
+       Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean 
isTriggering, long lastCompletionMs) {
+               Optional<CheckpointTriggerRequest> request = 
chooseRequestToExecute(isTriggering, lastCompletionMs);
+               request.ifPresent(CheckpointRequestDecider::logInQueueTime);
+               return request;
+       }
+
        /**
         * Choose the next {@link CheckpointTriggerRequest request} to execute 
based on the provided candidate and the
         * current state. Acquires a lock and may update the state.
         * @return request to execute, if any.
         */
        private Optional<CheckpointTriggerRequest> 
chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) {
-               Preconditions.checkState(Thread.holdsLock(lock));
                if (isTriggering || queuedRequests.isEmpty()) {
                        return Optional.empty();
                }
@@ -156,22 +143,17 @@ class CheckpointRequestDecider {
        @VisibleForTesting
        @Deprecated
        PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
-               synchronized (lock) {
-                       return new PriorityQueue<>(queuedRequests);
-               }
+               return new PriorityQueue<>(queuedRequests);
        }
 
        void abortAll(CheckpointException exception) {
-               Preconditions.checkState(Thread.holdsLock(lock));
                while (!queuedRequests.isEmpty()) {
                        
queuedRequests.pollFirst().completeExceptionally(exception);
                }
        }
 
        int getNumQueuedRequests() {
-               synchronized (lock) {
-                       return queuedRequests.size();
-               }
+               return queuedRequests.size();
        }
 
        private static Comparator<CheckpointTriggerRequest> 
checkpointTriggerRequestsComparator() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 82f5889..eb96a1b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -77,7 +78,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -122,6 +125,46 @@ public class CheckpointCoordinatorTest extends TestLogger {
        }
 
        @Test
+       public void testMinCheckpointPause() throws Exception {
+               // will use a different thread to allow checkpoint triggering 
before exiting from receiveAcknowledgeMessage
+               ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+               try {
+                       int pause = 1000;
+                       JobID jobId = new JobID();
+                       ExecutionAttemptID attemptId = new ExecutionAttemptID();
+                       ExecutionVertex vertex = mockExecutionVertex(attemptId);
+
+                       CheckpointCoordinator coordinator = new 
CheckpointCoordinatorBuilder()
+                               .setTimer(new 
ScheduledExecutorServiceAdapter(executorService))
+                                       
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+                                               .setCheckpointInterval(pause)
+                                               
.setCheckpointTimeout(Long.MAX_VALUE)
+                                               .setMaxConcurrentCheckpoints(1)
+                                               
.setMinPauseBetweenCheckpoints(pause)
+                                               .build())
+                                       .setTasksToTrigger(new 
ExecutionVertex[]{vertex})
+                                       .setTasksToWaitFor(new 
ExecutionVertex[]{vertex})
+                                       .setTasksToCommitTo(new 
ExecutionVertex[]{vertex})
+                                       .setJobId(jobId)
+                               .build();
+                       coordinator.startCheckpointScheduler();
+
+                       coordinator.triggerCheckpoint(true); // trigger, 
execute, and later complete by receiveAcknowledgeMessage
+                       coordinator.triggerCheckpoint(true); // enqueue and 
later see if it gets executed in the middle of receiveAcknowledgeMessage
+                       while (coordinator.getNumberOfPendingCheckpoints() == 
0) { // wait for at least 1 request to be fully processed
+                               Thread.sleep(10);
+                       }
+                       coordinator.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jobId, attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
+                       Thread.sleep(pause / 2);
+                       assertEquals(0, 
coordinator.getNumberOfPendingCheckpoints());
+                       Thread.sleep(pause);
+                       assertEquals(1, 
coordinator.getNumberOfPendingCheckpoints());
+               } finally {
+                       executorService.shutdownNow();
+               }
+       }
+
+       @Test
        public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
                try {
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
index 55b19b6..90967b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java
@@ -169,7 +169,7 @@ public class CheckpointRequestDeciderTest {
        private void testTiming(CheckpointTriggerRequest request, 
TriggerExpectation expectation) {
                final long pause = 10;
                final ManualClock clock = new ManualClock();
-               final CheckpointRequestDecider decider = new 
CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, new Object(), 
Integer.MAX_VALUE);
+               final CheckpointRequestDecider decider = new 
CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, Integer.MAX_VALUE);
 
                final long lastCompletionMs = clock.relativeTimeMillis();
                final boolean isTriggering = false;
@@ -220,7 +220,7 @@ public class CheckpointRequestDeciderTest {
        private CheckpointRequestDecider decider(int maxQueued, int maxPending, 
int minPause, AtomicInteger currentPending) {
                ManualClock clock = new ManualClock();
                clock.advanceTime(1, TimeUnit.DAYS);
-               return new CheckpointRequestDecider(maxPending, NO_OP, clock, 
minPause, currentPending::get, new Object(), maxQueued);
+               return new CheckpointRequestDecider(maxPending, NO_OP, clock, 
minPause, currentPending::get, maxQueued);
        }
 
        private static final Consumer<Long> NO_OP = unused -> {

Reply via email to