This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 19468a25a906fd1d8bc18113ee6facd2128d5855 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Wed Aug 12 15:21:52 2020 +0200 [FLINK-18856][checkpointing] Synchronize access to CheckpointCoordinator.lastCheckpointCompletionRelativeTime --- .../runtime/checkpoint/CheckpointCoordinator.java | 29 ++++++++---- .../checkpoint/CheckpointRequestDecider.java | 52 +++++++--------------- .../checkpoint/CheckpointCoordinatorTest.java | 43 ++++++++++++++++++ .../checkpoint/CheckpointRequestDeciderTest.java | 4 +- 4 files changed, 83 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 76f9efd..d7b5cee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -317,8 +317,7 @@ public class CheckpointCoordinator { this::rescheduleTrigger, this.clock, this.minPauseBetweenCheckpoints, - this.pendingCheckpoints::size, - this.lock); + this.pendingCheckpoints::size); } // -------------------------------------------------------------------------------------------- @@ -498,9 +497,7 @@ public class CheckpointCoordinator { } CheckpointTriggerRequest request = new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic, advanceToEndOfTime); - requestDecider - .chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime) - .ifPresent(this::startTriggeringCheckpoint); + chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint); return request.onCompletionPromise; } @@ -833,7 +830,19 @@ public class CheckpointCoordinator { } private void executeQueuedRequest() { - requestDecider.chooseQueuedRequestToExecute(isTriggering, lastCheckpointCompletionRelativeTime).ifPresent(this::startTriggeringCheckpoint); + chooseQueuedRequestToExecute().ifPresent(this::startTriggeringCheckpoint); + } + + private Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute() { + synchronized (lock) { + return requestDecider.chooseQueuedRequestToExecute(isTriggering, lastCheckpointCompletionRelativeTime); + } + } + + private Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest request) { + synchronized (lock) { + return requestDecider.chooseRequestToExecute(request, isTriggering, lastCheckpointCompletionRelativeTime); + } } // -------------------------------------------------------------------------------------------- @@ -1412,7 +1421,9 @@ public class CheckpointCoordinator { @Deprecated @VisibleForTesting PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() { - return requestDecider.getTriggerRequestQueue(); + synchronized (lock) { + return requestDecider.getTriggerRequestQueue(); + } } public boolean isTriggering() { @@ -1548,7 +1559,9 @@ public class CheckpointCoordinator { } int getNumQueuedRequests() { - return requestDecider.getNumQueuedRequests(); + synchronized (lock) { + return requestDecider.getNumQueuedRequests(); + } } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java index 6a9f779..124dc13 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java @@ -25,8 +25,6 @@ import org.apache.flink.util.clock.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.concurrent.GuardedBy; - import java.util.Comparator; import java.util.NavigableSet; import java.util.Optional; @@ -51,8 +49,6 @@ class CheckpointRequestDecider { private final Clock clock; private final long minPauseBetweenCheckpoints; private final Supplier<Integer> pendingCheckpointsSizeSupplier; - private final Object lock; - @GuardedBy("lock") private final NavigableSet<CheckpointTriggerRequest> queuedRequests = new TreeSet<>(checkpointTriggerRequestsComparator()); private final int maxQueuedRequests; @@ -61,15 +57,13 @@ class CheckpointRequestDecider { Consumer<Long> rescheduleTrigger, Clock clock, long minPauseBetweenCheckpoints, - Supplier<Integer> pendingCheckpointsSizeSupplier, - Object lock) { + Supplier<Integer> pendingCheckpointsSizeSupplier) { this( maxConcurrentCheckpointAttempts, rescheduleTrigger, clock, minPauseBetweenCheckpoints, pendingCheckpointsSizeSupplier, - lock, DEFAULT_MAX_QUEUED_REQUESTS ); } @@ -80,7 +74,6 @@ class CheckpointRequestDecider { Clock clock, long minPauseBetweenCheckpoints, Supplier<Integer> pendingCheckpointsSizeSupplier, - Object lock, int maxQueuedRequests) { Preconditions.checkArgument(maxConcurrentCheckpointAttempts > 0); Preconditions.checkArgument(maxQueuedRequests > 0); @@ -89,43 +82,37 @@ class CheckpointRequestDecider { this.clock = clock; this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; this.pendingCheckpointsSizeSupplier = pendingCheckpointsSizeSupplier; - this.lock = lock; this.maxQueuedRequests = maxQueuedRequests; } Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean isTriggering, long lastCompletionMs) { - synchronized (lock) { - if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) { - // there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one - newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)); - return Optional.empty(); - } else { - queuedRequests.add(newRequest); - if (queuedRequests.size() > maxQueuedRequests) { - queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)); - } - Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs); - request.ifPresent(CheckpointRequestDecider::logInQueueTime); - return request; + if (queuedRequests.size() >= maxQueuedRequests && !queuedRequests.last().isPeriodic) { + // there are only non-periodic (ie user-submitted) requests enqueued - retain them and drop the new one + newRequest.completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)); + return Optional.empty(); + } else { + queuedRequests.add(newRequest); + if (queuedRequests.size() > maxQueuedRequests) { + queuedRequests.pollLast().completeExceptionally(new CheckpointException(TOO_MANY_CHECKPOINT_REQUESTS)); } - } - } - - Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) { - synchronized (lock) { Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs); request.ifPresent(CheckpointRequestDecider::logInQueueTime); return request; } } + Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean isTriggering, long lastCompletionMs) { + Optional<CheckpointTriggerRequest> request = chooseRequestToExecute(isTriggering, lastCompletionMs); + request.ifPresent(CheckpointRequestDecider::logInQueueTime); + return request; + } + /** * Choose the next {@link CheckpointTriggerRequest request} to execute based on the provided candidate and the * current state. Acquires a lock and may update the state. * @return request to execute, if any. */ private Optional<CheckpointTriggerRequest> chooseRequestToExecute(boolean isTriggering, long lastCompletionMs) { - Preconditions.checkState(Thread.holdsLock(lock)); if (isTriggering || queuedRequests.isEmpty()) { return Optional.empty(); } @@ -156,22 +143,17 @@ class CheckpointRequestDecider { @VisibleForTesting @Deprecated PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() { - synchronized (lock) { - return new PriorityQueue<>(queuedRequests); - } + return new PriorityQueue<>(queuedRequests); } void abortAll(CheckpointException exception) { - Preconditions.checkState(Thread.holdsLock(lock)); while (!queuedRequests.isEmpty()) { queuedRequests.pollFirst().completeExceptionally(exception); } } int getNumQueuedRequests() { - synchronized (lock) { - return queuedRequests.size(); - } + return queuedRequests.size(); } private static Comparator<CheckpointTriggerRequest> checkpointTriggerRequestsComparator() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 82f5889..eb96a1b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -77,7 +78,9 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -122,6 +125,46 @@ public class CheckpointCoordinatorTest extends TestLogger { } @Test + public void testMinCheckpointPause() throws Exception { + // will use a different thread to allow checkpoint triggering before exiting from receiveAcknowledgeMessage + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + try { + int pause = 1000; + JobID jobId = new JobID(); + ExecutionAttemptID attemptId = new ExecutionAttemptID(); + ExecutionVertex vertex = mockExecutionVertex(attemptId); + + CheckpointCoordinator coordinator = new CheckpointCoordinatorBuilder() + .setTimer(new ScheduledExecutorServiceAdapter(executorService)) + .setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder() + .setCheckpointInterval(pause) + .setCheckpointTimeout(Long.MAX_VALUE) + .setMaxConcurrentCheckpoints(1) + .setMinPauseBetweenCheckpoints(pause) + .build()) + .setTasksToTrigger(new ExecutionVertex[]{vertex}) + .setTasksToWaitFor(new ExecutionVertex[]{vertex}) + .setTasksToCommitTo(new ExecutionVertex[]{vertex}) + .setJobId(jobId) + .build(); + coordinator.startCheckpointScheduler(); + + coordinator.triggerCheckpoint(true); // trigger, execute, and later complete by receiveAcknowledgeMessage + coordinator.triggerCheckpoint(true); // enqueue and later see if it gets executed in the middle of receiveAcknowledgeMessage + while (coordinator.getNumberOfPendingCheckpoints() == 0) { // wait for at least 1 request to be fully processed + Thread.sleep(10); + } + coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptId, 1L), TASK_MANAGER_LOCATION_INFO); + Thread.sleep(pause / 2); + assertEquals(0, coordinator.getNumberOfPendingCheckpoints()); + Thread.sleep(pause); + assertEquals(1, coordinator.getNumberOfPendingCheckpoints()); + } finally { + executorService.shutdownNow(); + } + } + + @Test public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() { try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java index 55b19b6..90967b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDeciderTest.java @@ -169,7 +169,7 @@ public class CheckpointRequestDeciderTest { private void testTiming(CheckpointTriggerRequest request, TriggerExpectation expectation) { final long pause = 10; final ManualClock clock = new ManualClock(); - final CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, new Object(), Integer.MAX_VALUE); + final CheckpointRequestDecider decider = new CheckpointRequestDecider(1, NO_OP, clock, pause, () -> 0, Integer.MAX_VALUE); final long lastCompletionMs = clock.relativeTimeMillis(); final boolean isTriggering = false; @@ -220,7 +220,7 @@ public class CheckpointRequestDeciderTest { private CheckpointRequestDecider decider(int maxQueued, int maxPending, int minPause, AtomicInteger currentPending) { ManualClock clock = new ManualClock(); clock.advanceTime(1, TimeUnit.DAYS); - return new CheckpointRequestDecider(maxPending, NO_OP, clock, minPause, currentPending::get, new Object(), maxQueued); + return new CheckpointRequestDecider(maxPending, NO_OP, clock, minPause, currentPending::get, maxQueued); } private static final Consumer<Long> NO_OP = unused -> {