This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8c57e5a [FLINK-12364] Introduce CheckpointFailureManager for centralized checkpoint failure handling 8c57e5a is described below commit 8c57e5aed871b42d51e6218620ce7bd7991338c0 Author: vinoyang <yanghua1...@gmail.com> AuthorDate: Wed Jun 19 17:00:59 2019 +0800 [FLINK-12364] Introduce CheckpointFailureManager for centralized checkpoint failure handling --- flink-end-to-end-tests/test-scripts/common.sh | 2 +- .../jobmanager/JMXJobManagerMetricTest.java | 3 +- .../runtime/checkpoint/CheckpointCoordinator.java | 74 ++-- .../checkpoint/CheckpointFailureManager.java | 135 +++++++ .../checkpoint/CheckpointFailureReason.java | 18 +- .../runtime/checkpoint/CheckpointIDCounter.java | 7 + .../runtime/checkpoint/PendingCheckpoint.java | 2 +- .../checkpoint/StandaloneCheckpointIDCounter.java | 5 + .../checkpoint/ZooKeeperCheckpointIDCounter.java | 21 +- .../decline/AlignmentLimitExceededException.java | 33 -- .../decline/CheckpointDeclineException.java | 35 -- ...pointDeclineOnCancellationBarrierException.java | 32 -- .../CheckpointDeclineSubsumedException.java | 32 -- ...kpointDeclineTaskNotCheckpointingException.java | 32 -- .../CheckpointDeclineTaskNotReadyException.java | 32 -- .../decline/InputEndOfStreamException.java | 32 -- .../runtime/executiongraph/ExecutionGraph.java | 32 +- .../executiongraph/ExecutionGraphBuilder.java | 9 +- .../tasks/CheckpointCoordinatorConfiguration.java | 22 +- .../messages/checkpoint/DeclineCheckpoint.java | 16 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 7 +- .../exceptions/CheckpointException.java | 41 -- .../org/apache/flink/runtime/taskmanager/Task.java | 7 +- .../CheckpointCoordinatorFailureTest.java | 14 +- .../CheckpointCoordinatorMasterHooksTest.java | 18 +- .../checkpoint/CheckpointCoordinatorTest.java | 418 ++++++++++++++++----- .../checkpoint/CheckpointFailureManagerTest.java | 117 ++++++ .../checkpoint/CheckpointIDCounterTest.java | 6 + .../CheckpointSettingsSerializableTest.java | 3 +- .../checkpoint/CheckpointStateRestoreTest.java | 43 ++- .../checkpoint/CheckpointStatsTrackerTest.java | 3 +- .../ExecutionGraphCheckpointCoordinatorTest.java | 25 +- .../executiongraph/ArchivedExecutionGraphTest.java | 11 +- ...ncurrentFailoverStrategyExecutionGraphTest.java | 12 +- .../ExecutionGraphDeploymentTest.java | 3 +- .../runtime/executiongraph/FailoverRegionTest.java | 18 +- .../flink/runtime/jobgraph/JobGraphTest.java | 7 +- .../tasks/JobCheckpointingSettingsTest.java | 3 +- .../flink/runtime/jobmaster/JobMasterTest.java | 3 +- .../api/environment/CheckpointConfig.java | 27 +- .../api/graph/StreamingJobGraphGenerator.java | 5 +- .../flink/streaming/runtime/io/BarrierBuffer.java | 31 +- .../flink/streaming/runtime/io/BarrierTracker.java | 6 +- .../io/BarrierBufferAlignmentLimitTest.java | 8 +- .../runtime/io/BarrierBufferTestBase.java | 63 +++- .../tasks/StreamTaskCancellationBarrierTest.java | 11 +- .../jobmaster/JobMasterStopWithSavepointIT.java | 3 +- .../jobmaster/JobMasterTriggerSavepointITCase.java | 3 +- .../test/streaming/runtime/IterateITCase.java | 4 +- 49 files changed, 943 insertions(+), 551 deletions(-) diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 361e49a..4eac9f4 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -361,7 +361,7 @@ function check_logs_for_exceptions { | grep -v "java.io.InvalidClassException: org.apache.flink.formats.avro.typeutils.AvroSerializer" \ | grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \ | grep -v "java.lang.Exception: Artificial failure" \ - | grep -v "org.apache.flink.runtime.checkpoint.decline" \ + | grep -v "org.apache.flink.runtime.checkpoint.CheckpointException" \ | grep -v "org.elasticsearch.ElasticsearchException" \ | grep -v "Elasticsearch exception" \ | grep -ic "exception" || true) diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index b946896..7156adb 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -101,7 +101,8 @@ public class JMXJobManagerMetricTest extends TestLogger { 5, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, - false), + false, + 0), null)); ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index b6f5a81..3dc5c1d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; import org.apache.flink.runtime.checkpoint.hooks.MasterHooks; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; @@ -33,6 +32,7 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView; @@ -185,15 +185,13 @@ public class CheckpointCoordinator { private boolean isPreferCheckpointForRecovery; + private final CheckpointFailureManager failureManager; + // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( JobID job, - long baseInterval, - long checkpointTimeout, - long minPauseBetweenCheckpoints, - int maxConcurrentCheckpointAttempts, - CheckpointRetentionPolicy retentionPolicy, + CheckpointCoordinatorConfiguration chkConfig, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, @@ -202,31 +200,29 @@ public class CheckpointCoordinator { StateBackend checkpointStateBackend, Executor executor, SharedStateRegistryFactory sharedStateRegistryFactory, - boolean isPreferCheckpointForRecovery) { + CheckpointFailureManager failureManager) { // sanity checks checkNotNull(checkpointStateBackend); - checkArgument(baseInterval > 0, "Checkpoint base interval must be larger than zero"); - checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero"); - checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0"); - checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1"); // max "in between duration" can be one year - this is to prevent numeric overflows + long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints(); if (minPauseBetweenCheckpoints > 365L * 24 * 60 * 60 * 1_000) { minPauseBetweenCheckpoints = 365L * 24 * 60 * 60 * 1_000; } // it does not make sense to schedule checkpoints more often then the desired // time between checkpoints + long baseInterval = chkConfig.getCheckpointInterval(); if (baseInterval < minPauseBetweenCheckpoints) { baseInterval = minPauseBetweenCheckpoints; } this.job = checkNotNull(job); this.baseInterval = baseInterval; - this.checkpointTimeout = checkpointTimeout; + this.checkpointTimeout = chkConfig.getCheckpointTimeout(); this.minPauseBetweenCheckpointsNanos = minPauseBetweenCheckpoints * 1_000_000; - this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts; + this.maxConcurrentCheckpointAttempts = chkConfig.getMaxConcurrentCheckpoints(); this.tasksToTrigger = checkNotNull(tasksToTrigger); this.tasksToWaitFor = checkNotNull(tasksToWaitFor); this.tasksToCommitTo = checkNotNull(tasksToCommitTo); @@ -236,7 +232,8 @@ public class CheckpointCoordinator { this.executor = checkNotNull(executor); this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory); this.sharedStateRegistry = sharedStateRegistryFactory.create(executor); - this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery; + this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery(); + this.failureManager = checkNotNull(failureManager); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.masterHooks = new HashMap<>(); @@ -249,7 +246,7 @@ public class CheckpointCoordinator { this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - this.checkpointProperties = CheckpointProperties.forCheckpoint(retentionPolicy); + this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy()); try { this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job); @@ -342,7 +339,7 @@ public class CheckpointCoordinator { // clear and discard all pending checkpoints for (PendingCheckpoint pending : pendingCheckpoints.values()) { - pending.abort(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN); + failPendingCheckpoint(pending, CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN); } pendingCheckpoints.clear(); @@ -439,6 +436,10 @@ public class CheckpointCoordinator { triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); return true; } catch (CheckpointException e) { + long latestGeneratedCheckpointId = getCheckpointIdCounter().get(); + // here we can not get the failed pending checkpoint's id, + // so we pass the negative latest generated checkpoint id as a special flag + failureManager.handleCheckpointException(e, -1 * latestGeneratedCheckpointId); return false; } } @@ -459,7 +460,7 @@ public class CheckpointCoordinator { synchronized (lock) { // abort if the coordinator has been shutdown in the meantime if (shutdown) { - throw new CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN); + throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN); } // Don't allow periodic checkpoint if scheduling has been disabled @@ -599,7 +600,7 @@ public class CheckpointCoordinator { if (!checkpoint.isDiscarded()) { LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job); - checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED); + failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); @@ -614,7 +615,7 @@ public class CheckpointCoordinator { // since we released the lock in the meantime, we need to re-check // that the conditions still hold. if (shutdown) { - throw new CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN); + throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN); } else if (!props.forceCheckpoint()) { if (triggerRequestQueued) { @@ -699,7 +700,7 @@ public class CheckpointCoordinator { checkpointID, job, numUnsuccessful, t); if (!checkpoint.isDiscarded()) { - checkpoint.abort(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t); + failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t); } try { @@ -891,11 +892,12 @@ public class CheckpointCoordinator { try { try { completedCheckpoint = pendingCheckpoint.finalizeCheckpoint(); + failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId()); } catch (Exception e1) { // abort the current pending checkpoint if we fails to finalize the pending checkpoint. if (!pendingCheckpoint.isDiscarded()) { - pendingCheckpoint.abort(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); + failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); } throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', @@ -1002,7 +1004,7 @@ public class CheckpointCoordinator { // remove all pending checkpoints that are lesser than the current completed checkpoint if (p.getCheckpointId() < checkpointId && p.canBeSubsumed()) { rememberRecentCheckpointId(p.getCheckpointId()); - p.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED); + failPendingCheckpoint(p, CheckpointFailureReason.CHECKPOINT_SUBSUMED); entries.remove(); } } @@ -1275,7 +1277,7 @@ public class CheckpointCoordinator { public void abortPendingCheckpoints(CheckpointException exception) { synchronized (lock) { for (PendingCheckpoint p : pendingCheckpoints.values()) { - p.abort(exception.getCheckpointFailureReason()); + failPendingCheckpoint(p, exception.getCheckpointFailureReason()); } pendingCheckpoints.clear(); @@ -1329,10 +1331,13 @@ public class CheckpointCoordinator { LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); - if (cause == null || cause instanceof CheckpointDeclineException) { - pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause); + if (cause == null) { + failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED); + } else if (cause instanceof CheckpointException) { + CheckpointException exception = (CheckpointException) cause; + failPendingCheckpoint(pendingCheckpoint, exception.getCheckpointFailureReason(), cause); } else { - pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause); + failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause); } rememberRecentCheckpointId(checkpointId); @@ -1384,4 +1389,21 @@ public class CheckpointCoordinator { }); } } + + private void failPendingCheckpoint( + final PendingCheckpoint pendingCheckpoint, + final CheckpointFailureReason reason, + final Throwable cause) { + + CheckpointException exception = new CheckpointException(reason, cause); + pendingCheckpoint.abort(reason, cause); + failureManager.handleCheckpointException(exception, pendingCheckpoint.getCheckpointId()); + } + + private void failPendingCheckpoint( + final PendingCheckpoint pendingCheckpoint, + final CheckpointFailureReason reason) { + + failPendingCheckpoint(pendingCheckpoint, reason, null); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java new file mode 100644 index 0000000..4a95cdd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The checkpoint failure manager which centralized manage checkpoint failure processing logic. + */ +public class CheckpointFailureManager { + + private final static int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE; + + private final int tolerableCpFailureNumber; + private final FailJobCallback failureCallback; + private final AtomicInteger continuousFailureCounter; + private final Set<Long> countedCheckpointIds; + + public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) { + checkArgument(tolerableCpFailureNumber >= 0, + "The tolerable checkpoint failure number is illegal, " + + "it must be greater than or equal to 0 ."); + this.tolerableCpFailureNumber = tolerableCpFailureNumber; + this.continuousFailureCounter = new AtomicInteger(0); + this.failureCallback = checkNotNull(failureCallback); + this.countedCheckpointIds = ConcurrentHashMap.newKeySet(); + } + + /** + * Handle checkpoint exception with a handler callback. + * + * @param exception the checkpoint exception. + * @param checkpointId the failed checkpoint id used to count the continuous failure number based on + * checkpoint id sequence. In trigger phase, we may not get the checkpoint id when the failure + * happens before the checkpoint id generation. In this case, it will be specified a negative + * latest generated checkpoint id as a special flag. + */ + public void handleCheckpointException(CheckpointException exception, long checkpointId) { + if (tolerableCpFailureNumber == UNLIMITED_TOLERABLE_FAILURE_NUMBER) { + return; + } + + CheckpointFailureReason reason = exception.getCheckpointFailureReason(); + switch (reason) { + case PERIODIC_SCHEDULER_SHUTDOWN: + case ALREADY_QUEUED: + case TOO_MANY_CONCURRENT_CHECKPOINTS: + case MINIMUM_TIME_BETWEEN_CHECKPOINTS: + case NOT_ALL_REQUIRED_TASKS_RUNNING: + case CHECKPOINT_SUBSUMED: + case CHECKPOINT_COORDINATOR_SUSPEND: + case CHECKPOINT_COORDINATOR_SHUTDOWN: + case JOB_FAILURE: + case JOB_FAILOVER_REGION: + //for compatibility purposes with user job behavior + case CHECKPOINT_DECLINED_TASK_NOT_READY: + case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING: + case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED: + case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER: + case CHECKPOINT_DECLINED_SUBSUMED: + case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM: + + case EXCEPTION: + case CHECKPOINT_EXPIRED: + case TASK_CHECKPOINT_FAILURE: + case TRIGGER_CHECKPOINT_FAILURE: + case FINALIZE_CHECKPOINT_FAILURE: + //ignore + break; + + case CHECKPOINT_DECLINED: + //we should make sure one checkpoint only be counted once + if (countedCheckpointIds.add(checkpointId)) { + continuousFailureCounter.incrementAndGet(); + } + + break; + + default: + throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + reason.name()); + } + + if (continuousFailureCounter.get() > tolerableCpFailureNumber) { + clearCount(); + failureCallback.failJob(); + } + } + + /** + * Handle checkpoint success. + * + * @param checkpointId the failed checkpoint id used to count the continuous failure number based on + * checkpoint id sequence. + */ + public void handleCheckpointSuccess(long checkpointId) { + clearCount(); + } + + private void clearCount() { + continuousFailureCounter.set(0); + countedCheckpointIds.clear(); + } + + /** + * A callback interface about how to fail a job. + */ + public interface FailJobCallback { + + void failJob(); + + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java index 35f457a..e00cce7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java @@ -23,8 +23,6 @@ package org.apache.flink.runtime.checkpoint; */ public enum CheckpointFailureReason { - COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."), - PERIODIC_SCHEDULER_SHUTDOWN("Periodic checkpoint scheduler is shut down."), ALREADY_QUEUED("Another checkpoint request has already been queued."), @@ -38,13 +36,23 @@ public enum CheckpointFailureReason { EXCEPTION("An Exception occurred while triggering the checkpoint."), - EXPIRED("The checkpoint expired before triggering was complete"), - CHECKPOINT_EXPIRED("Checkpoint expired before completing."), CHECKPOINT_SUBSUMED("Checkpoint has been subsumed."), - CHECKPOINT_DECLINED("Checkpoint was declined (tasks not ready)."), + CHECKPOINT_DECLINED("Checkpoint was declined."), + + CHECKPOINT_DECLINED_TASK_NOT_READY("Checkpoint was declined (tasks not ready)"), + + CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING("Task does not support checkpointing"), + + CHECKPOINT_DECLINED_SUBSUMED("Checkpoint was canceled because a barrier from newer checkpoint was received."), + + CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER("Task received cancellation from one of its inputs"), + + CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED("The checkpoint alignment phase needed to buffer more than the configured maximum bytes"), + + CHECKPOINT_DECLINED_INPUT_END_OF_STREAM("Checkpoint was declined because one input stream is finished"), CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java index 48cec7d..1af6730 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java @@ -48,6 +48,13 @@ public interface CheckpointIDCounter { long getAndIncrement() throws Exception; /** + * Atomically gets the current checkpoint ID. + * + * @return The current checkpoint ID + */ + long get(); + + /** * Sets the current checkpoint ID. * * @param newId The new ID diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index d03c28f..ac086ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -406,7 +406,7 @@ public class PendingCheckpoint { /** * Aborts a checkpoint with reason and cause. */ - public void abort(CheckpointFailureReason reason, Throwable cause) { + public void abort(CheckpointFailureReason reason, @Nullable Throwable cause) { try { CheckpointException exception = new CheckpointException(reason, cause); onCompletionPromise.completeExceptionally(exception); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java index f43df5a..f63b0d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java @@ -44,6 +44,11 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter { } @Override + public long get() { + return checkpointIdCounter.get(); + } + + @Override public void setCount(long newCount) { checkpointIdCounter.set(newCount); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java index 7d15fab..2d08d58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java @@ -111,11 +111,7 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { @Override public long getAndIncrement() throws Exception { while (true) { - ConnectionState connState = connStateListener.getLastState(); - - if (connState != null) { - throw new IllegalStateException("Connection state: " + connState); - } + checkConnectionState(); VersionedValue<Integer> current = sharedCount.getVersionedValue(); int newCount = current.getValue() + 1; @@ -133,6 +129,13 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { } @Override + public long get() { + checkConnectionState(); + + return sharedCount.getVersionedValue().getValue(); + } + + @Override public void setCount(long newId) throws Exception { ConnectionState connState = connStateListener.getLastState(); @@ -149,6 +152,14 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { sharedCount.setCount((int) newId); } + private void checkConnectionState() { + ConnectionState connState = connStateListener.getLastState(); + + if (connState != null) { + throw new IllegalStateException("Connection state: " + connState); + } + } + /** * Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link * ConnectionState#LOST} we are not guaranteed to read a current count from ZooKeeper. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java deleted file mode 100644 index 64d57bc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/AlignmentLimitExceededException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.decline; - -/** - * Exception indicating that a checkpoint was declined because too many bytes were - * buffered in the alignment phase. - */ -public final class AlignmentLimitExceededException extends CheckpointDeclineException { - - private static final long serialVersionUID = 1L; - - public AlignmentLimitExceededException(long numBytes) { - super("The checkpoint alignment phase needed to buffer more than the configured maximum (" - + numBytes + " bytes)."); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java deleted file mode 100644 index 8a2802c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineException.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.decline; - -/** - * Base class of all exceptions that indicate a declined checkpoint. - */ -public abstract class CheckpointDeclineException extends Exception { - - private static final long serialVersionUID = 1L; - - public CheckpointDeclineException(String message) { - super(message); - } - - public CheckpointDeclineException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java deleted file mode 100644 index 9ae4096..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineOnCancellationBarrierException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.decline; - -/** - * Exception indicating that a checkpoint was declined because a cancellation - * barrier was received. - */ -public final class CheckpointDeclineOnCancellationBarrierException extends CheckpointDeclineException { - - private static final long serialVersionUID = 1L; - - public CheckpointDeclineOnCancellationBarrierException() { - super("Task received cancellation from one of its inputs"); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java deleted file mode 100644 index 5380469..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineSubsumedException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.decline; - -/** - * Exception indicating that a checkpoint was declined because a newer checkpoint - * barrier was received on an input before the pending checkpoint's barrier. - */ -public final class CheckpointDeclineSubsumedException extends CheckpointDeclineException { - - private static final long serialVersionUID = 1L; - - public CheckpointDeclineSubsumedException(long newCheckpointId) { - super("Checkpoint was canceled because a barrier from newer checkpoint " + newCheckpointId + " was received."); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java deleted file mode 100644 index e5773d1..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotCheckpointingException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.decline; - -/** - * Exception indicating that a checkpoint was declined because a task does not support - * checkpointing. - */ -public final class CheckpointDeclineTaskNotCheckpointingException extends CheckpointDeclineException { - - private static final long serialVersionUID = 1L; - - public CheckpointDeclineTaskNotCheckpointingException(String taskName) { - super("Task '" + taskName + "'does not support checkpointing"); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java deleted file mode 100644 index a1214fe..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/CheckpointDeclineTaskNotReadyException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.decline; - -/** - * Exception indicating that a checkpoint was declined because a task was not - * ready to perform a checkpoint. - */ -public final class CheckpointDeclineTaskNotReadyException extends CheckpointDeclineException { - - private static final long serialVersionUID = 1L; - - public CheckpointDeclineTaskNotReadyException(String taskName) { - super("Task " + taskName + " was not running"); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java deleted file mode 100644 index 86b29dc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/decline/InputEndOfStreamException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.decline; - -/** - * Exception indicating that a checkpoint was declined because one of the input - * stream reached its end before the alignment was complete. - */ -public final class InputEndOfStreamException extends CheckpointDeclineException { - - private static final long serialVersionUID = 1L; - - public InputEndOfStreamException() { - super("Checkpoint was declined because one input stream is finished"); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index fa4a393..0c91276 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -34,8 +34,8 @@ import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; @@ -72,6 +72,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.types.Either; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedThrowable; @@ -110,7 +111,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.stream.Collectors; -import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -517,11 +517,7 @@ public class ExecutionGraph implements AccessExecutionGraph { } public void enableCheckpointing( - long interval, - long checkpointTimeout, - long minPauseBetweenCheckpoints, - int maxConcurrentCheckpoints, - CheckpointRetentionPolicy retentionPolicy, + CheckpointCoordinatorConfiguration chkConfig, List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, @@ -529,12 +525,7 @@ public class ExecutionGraph implements AccessExecutionGraph { CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, - CheckpointStatsTracker statsTracker, - boolean isPreferCheckpointForRecovery) { - - // simple sanity checks - checkArgument(interval >= 10, "checkpoint interval must not be below 10ms"); - checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms"); + CheckpointStatsTracker statsTracker) { checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); checkState(checkpointCoordinator == null, "checkpointing already enabled"); @@ -545,14 +536,15 @@ public class ExecutionGraph implements AccessExecutionGraph { checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); + CheckpointFailureManager failureManager = new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(), () -> + getJobMasterMainThreadExecutor().execute(() -> + failGlobal(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold.")) + )); + // create the coordinator that triggers and commits checkpoints and holds the state checkpointCoordinator = new CheckpointCoordinator( jobInformation.getJobId(), - interval, - checkpointTimeout, - minPauseBetweenCheckpoints, - maxConcurrentCheckpoints, - retentionPolicy, + chkConfig, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, @@ -561,7 +553,7 @@ public class ExecutionGraph implements AccessExecutionGraph { checkpointStateBackend, ioExecutor, SharedStateRegistry.DEFAULT_FACTORY, - isPreferCheckpointForRecovery); + failureManager); // register the master hooks on the checkpoint coordinator for (MasterTriggerRestoreHook<?> hook : masterHooks) { @@ -574,7 +566,7 @@ public class ExecutionGraph implements AccessExecutionGraph { // interval of max long value indicates disable periodic checkpoint, // the CheckpointActivatorDeactivator should be created only if the interval is not max value - if (interval != Long.MAX_VALUE) { + if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) { // the periodic checkpoint scheduler is activated and deactivated as a result of // job status changes (running -> on, all other states -> off) registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 117b7b2..fa194e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -289,11 +289,7 @@ public class ExecutionGraphBuilder { final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration(); executionGraph.enableCheckpointing( - chkConfig.getCheckpointInterval(), - chkConfig.getCheckpointTimeout(), - chkConfig.getMinPauseBetweenCheckpoints(), - chkConfig.getMaxConcurrentCheckpoints(), - chkConfig.getCheckpointRetentionPolicy(), + chkConfig, triggerVertices, ackVertices, confirmVertices, @@ -301,8 +297,7 @@ public class ExecutionGraphBuilder { checkpointIdCounter, completedCheckpoints, rootBackend, - checkpointStatsTracker, - chkConfig.isPreferCheckpointForRecovery()); + checkpointStatsTracker); } // create all the metrics for the Execution Graph diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index 9e57d12..cff5777 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -42,6 +42,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { private final int maxConcurrentCheckpoints; + private final int tolerableCheckpointFailureNumber; + /** Settings for what to do with checkpoints when a job finishes. */ private final CheckpointRetentionPolicy checkpointRetentionPolicy; @@ -63,11 +65,13 @@ public class CheckpointCoordinatorConfiguration implements Serializable { int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, - boolean isPerfetCheckpointForRecovery) { + boolean isPerfetCheckpointForRecovery, + int tolerableCpFailureNumber) { // sanity checks - if (checkpointInterval < 1 || checkpointTimeout < 1 || - minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { + if (checkpointInterval < 10 || checkpointTimeout < 10 || + minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1 || + tolerableCpFailureNumber < 0) { throw new IllegalArgumentException(); } @@ -78,6 +82,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy); this.isExactlyOnce = isExactlyOnce; this.isPreferCheckpointForRecovery = isPerfetCheckpointForRecovery; + this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber; } public long getCheckpointInterval() { @@ -108,6 +113,10 @@ public class CheckpointCoordinatorConfiguration implements Serializable { return isPreferCheckpointForRecovery; } + public int getTolerableCheckpointFailureNumber() { + return tolerableCheckpointFailureNumber; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -123,7 +132,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { maxConcurrentCheckpoints == that.maxConcurrentCheckpoints && isExactlyOnce == that.isExactlyOnce && checkpointRetentionPolicy == that.checkpointRetentionPolicy && - isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery; + isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery && + tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber; } @Override @@ -135,7 +145,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { maxConcurrentCheckpoints, checkpointRetentionPolicy, isExactlyOnce, - isPreferCheckpointForRecovery); + isPreferCheckpointForRecovery, + tolerableCheckpointFailureNumber); } @Override @@ -146,6 +157,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { ", minPauseBetweenCheckpoints=" + minPauseBetweenCheckpoints + ", maxConcurrentCheckpoints=" + maxConcurrentCheckpoints + ", checkpointRetentionPolicy=" + checkpointRetentionPolicy + + ", tolerableCheckpointFailureNumber=" + tolerableCheckpointFailureNumber + '}'; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java index 43bbc21..40fcf2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/DeclineCheckpoint.java @@ -19,12 +19,7 @@ package org.apache.flink.runtime.messages.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException; -import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.util.SerializedThrowable; @@ -48,14 +43,7 @@ public class DeclineCheckpoint extends AbstractCheckpointMessage implements java public DeclineCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, Throwable reason) { super(job, taskExecutionId, checkpointId); - if (reason == null || - reason.getClass() == AlignmentLimitExceededException.class || - reason.getClass() == CheckpointDeclineOnCancellationBarrierException.class || - reason.getClass() == CheckpointDeclineSubsumedException.class || - reason.getClass() == CheckpointDeclineTaskNotCheckpointingException.class || - reason.getClass() == CheckpointDeclineTaskNotReadyException.class || - reason.getClass() == InputEndOfStreamException.class) { - // null or known common exceptions that cannot reference any dynamically loaded code + if (reason == null || reason instanceof CheckpointException) { this.reason = reason; } else { // some other exception. replace with a serialized throwable, to be on the safe side diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 4238d71..4153182 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.TransientBlobCache; import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; @@ -78,7 +80,6 @@ import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.state.TaskLocalStateStore; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImpl; -import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException; import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; @@ -700,7 +701,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.'; log.debug(message); - return FutureUtils.completedExceptionally(new CheckpointException(message)); + return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE)); } } @@ -721,7 +722,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { final String message = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.'; log.debug(message); - return FutureUtils.completedExceptionally(new CheckpointException(message)); + return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java deleted file mode 100644 index 80f2aa0..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/CheckpointException.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor.exceptions; - -import org.apache.flink.runtime.taskexecutor.TaskExecutor; - -/** - * Exception indicating a problem with checkpointing on the {@link TaskExecutor} side. - */ -public class CheckpointException extends TaskManagerException { - - private static final long serialVersionUID = 3366394086880327955L; - - public CheckpointException(String message) { - super(message); - } - - public CheckpointException(String message, Throwable cause) { - super(message, cause); - } - - public CheckpointException(Throwable cause) { - super(cause); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 5512c54..2084819 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -33,9 +33,10 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -1121,7 +1122,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid if (!success) { checkpointResponder.declineCheckpoint( getJobID(), getExecutionId(), checkpointID, - new CheckpointDeclineTaskNotReadyException(taskName)); + new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY)); } } catch (Throwable t) { @@ -1149,7 +1150,7 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid // send back a message that we did not do the checkpoint checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID, - new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask)); + new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 1709fae..2edbb1e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -65,14 +66,21 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { final long triggerTimestamp = 1L; + CheckpointFailureManager failureManager = new CheckpointFailureManager(0, () -> {}); + // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[]{vertex}, new ExecutionVertex[]{vertex}, new ExecutionVertex[]{vertex}, @@ -81,7 +89,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); coord.triggerCheckpoint(triggerTimestamp, false); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index 762b805..9990772 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.memory.MemoryStateBackend; @@ -421,13 +422,18 @@ public class CheckpointCoordinatorMasterHooksTest { // ------------------------------------------------------------------------ private static CheckpointCoordinator instantiateCheckpointCoordinator(JobID jid, ExecutionVertex... ackVertices) { + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 10000000L, + 600000L, + 0L, + 1, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); return new CheckpointCoordinator( jid, - 10000000L, - 600000L, - 0L, - 1, - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + chkConfig, new ExecutionVertex[0], ackVertices, new ExecutionVertex[0], @@ -436,7 +442,7 @@ public class CheckpointCoordinatorMasterHooksTest { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + new CheckpointFailureManager(0, () -> {})); } private static <T> T mockGeneric(Class<?> clazz) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 1676b01..fcd7150 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; @@ -63,6 +64,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -117,9 +119,16 @@ public class CheckpointCoordinatorTest extends TestLogger { private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location"; + private CheckpointFailureManager failureManager; + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Before + public void setUp() throws Exception { + failureManager = new CheckpointFailureManager(0, () -> {}); + } + @Test public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() { try { @@ -137,13 +146,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, @@ -152,7 +166,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -199,13 +213,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, @@ -214,7 +233,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -252,13 +271,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex ackVertex2 = mock(ExecutionVertex.class); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, @@ -267,7 +291,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -288,6 +312,77 @@ public class CheckpointCoordinatorTest extends TestLogger { } } + @Test + public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() { + final JobID jid = new JobID(); + final long timestamp = System.currentTimeMillis(); + + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + final ExecutionAttemptID attemptID2 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + ExecutionVertex vertex2 = mockExecutionVertex(attemptID2); + + final String errorMsg = "Exceeded checkpoint failure tolerance number!"; + + CheckpointFailureManager checkpointFailureManager = new CheckpointFailureManager(0, () -> { + throw new RuntimeException(errorMsg); + }); + + // set up the coordinator + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 600000, + 600000, + 0, + Integer.MAX_VALUE, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new ExecutionVertex[] { vertex1, vertex2 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + new MemoryStateBackend(), + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY, + checkpointFailureManager); + + try { + // trigger the checkpoint. this should succeed + assertTrue(coord.triggerCheckpoint(timestamp, false)); + + long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId); + + // acknowledge from one of the tasks + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO); + assertFalse(checkpoint.isDiscarded()); + assertFalse(checkpoint.isFullyAcknowledged()); + + // decline checkpoint from the other task + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO); + + fail("Test failed."); + } + catch (Exception e) { + //expected + assertTrue(e instanceof RuntimeException); + assertEquals(errorMsg, e.getMessage()); + } finally { + try { + coord.shutdown(JobStatus.FINISHED); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + } + /** * This test triggers a checkpoint and then sends a decline checkpoint message from * one of the tasks. The expected behaviour is that said checkpoint is discarded and a new @@ -306,13 +401,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex vertex2 = mockExecutionVertex(attemptID2); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, @@ -321,7 +421,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -410,13 +510,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex vertex2 = mockExecutionVertex(attemptID2); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, @@ -425,7 +530,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -531,13 +636,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex vertex2 = mockExecutionVertex(attemptID2); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, @@ -546,7 +656,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -700,13 +810,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, new ExecutionVertex[] { commitVertex }, @@ -715,7 +830,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -832,13 +947,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, new ExecutionVertex[] { commitVertex }, @@ -847,7 +967,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -997,14 +1117,18 @@ public class CheckpointCoordinatorTest extends TestLogger { // set up the coordinator // the timeout for the checkpoint is a 200 milliseconds - - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 200, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] { commitVertex }, @@ -1013,7 +1137,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // trigger a checkpoint, partially acknowledged assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1077,13 +1201,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex ackVertex2 = mockExecutionVertex(ackAttemptID2); ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 200000, 200000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] { commitVertex }, @@ -1092,7 +1221,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1142,13 +1271,18 @@ public class CheckpointCoordinatorTest extends TestLogger { final long timestamp = 1L; - CheckpointCoordinator coord = new CheckpointCoordinator( - jobId, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 20000L, 20000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jobId, + chkConfig, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] {triggerVertex, ackVertex1, ackVertex2}, new ExecutionVertex[0], @@ -1157,7 +1291,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1276,13 +1410,18 @@ public class CheckpointCoordinatorTest extends TestLogger { } }).when(execution).triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class)); - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 10, // periodic interval is 10 ms 200000, // timeout is very long (200 s) 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, @@ -1291,8 +1430,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); - + failureManager); coord.startCheckpointScheduler(); @@ -1367,13 +1505,18 @@ public class CheckpointCoordinatorTest extends TestLogger { final long delay = 50; + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 12, // periodic interval is 12 ms + 200_000, // timeout is very long (200 s) + delay, // 50 ms delay between checkpoints + 1, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); final CheckpointCoordinator coord = new CheckpointCoordinator( jid, - 2, // periodic interval is 2 ms - 200_000, // timeout is very long (200 s) - delay, // 50 ms delay between checkpoints - 1, - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + chkConfig, new ExecutionVertex[] { vertex }, new ExecutionVertex[] { vertex }, new ExecutionVertex[] { vertex }, @@ -1382,7 +1525,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); try { coord.startCheckpointScheduler(); @@ -1442,13 +1585,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex vertex2 = mockExecutionVertex(attemptID2); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, @@ -1457,7 +1605,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1595,13 +1743,18 @@ public class CheckpointCoordinatorTest extends TestLogger { StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter(); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, @@ -1610,7 +1763,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -1690,13 +1843,18 @@ public class CheckpointCoordinatorTest extends TestLogger { return null; }).when(execution).notifyCheckpointComplete(anyLong(), anyLong()); - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 10, // periodic interval is 10 ms 200000, // timeout is very long (200 s) 0L, // no extra delay maxConcurrentAttempts, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, @@ -1705,7 +1863,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); coord.startCheckpointScheduler(); @@ -1765,13 +1923,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID); ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID); - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 10, // periodic interval is 10 ms 200000, // timeout is very long (200 s) 0L, // no extra delay maxConcurrentAttempts, // max two concurrent checkpoints CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, @@ -1780,7 +1943,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); coord.startCheckpointScheduler(); @@ -1843,13 +2006,18 @@ public class CheckpointCoordinatorTest extends TestLogger { final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED); when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(invocation -> currentState.get()); - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 10, // periodic interval is 10 ms 200000, // timeout is very long (200 s) 0L, // no extra delay 2, // max two concurrent checkpoints CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, @@ -1858,7 +2026,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); coord.startCheckpointScheduler(); @@ -1897,13 +2065,18 @@ public class CheckpointCoordinatorTest extends TestLogger { StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter(); - CheckpointCoordinator coord = new CheckpointCoordinator( - jobId, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 100000, 200000, 0L, 1, // max one checkpoint at a time => should not affect savepoints CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jobId, + chkConfig, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, @@ -1912,7 +2085,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>(); @@ -1952,13 +2125,18 @@ public class CheckpointCoordinatorTest extends TestLogger { final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); - CheckpointCoordinator coord = new CheckpointCoordinator( - jobId, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 100000, 200000, 100000000L, // very long min delay => should not affect savepoints 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jobId, + chkConfig, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, @@ -1967,7 +2145,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -2016,13 +2194,18 @@ public class CheckpointCoordinatorTest extends TestLogger { CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, @@ -2031,7 +2214,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2132,13 +2315,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex[] arrayExecutionVertices = allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, @@ -2147,7 +2335,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2262,13 +2450,18 @@ public class CheckpointCoordinatorTest extends TestLogger { CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(2); - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + isPreferCheckpoint, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { stateful1, stateless1 }, new ExecutionVertex[] { stateful1, stateless1 }, new ExecutionVertex[] { stateful1, stateless1 }, @@ -2277,7 +2470,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - isPreferCheckpoint); + failureManager); //trigger a checkpoint and wait to become a completed checkpoint assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -2423,13 +2616,18 @@ public class CheckpointCoordinatorTest extends TestLogger { allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, @@ -2438,7 +2636,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2709,13 +2907,18 @@ public class CheckpointCoordinatorTest extends TestLogger { when(standaloneCompletedCheckpointStore.getLatestCheckpoint(false)).thenReturn(completedCheckpoint); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, newJobVertex1.getTaskVertices(), newJobVertex1.getTaskVertices(), newJobVertex1.getTaskVertices(), @@ -2724,7 +2927,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); coord.restoreLatestCheckpointedState(tasks, false, true); @@ -2862,13 +3065,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.RETAIN_ON_FAILURE, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, @@ -2877,7 +3085,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -3298,13 +3506,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, @@ -3313,7 +3526,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // Periodic try { @@ -3520,13 +3733,18 @@ public class CheckpointCoordinatorTest extends TestLogger { ExecutionVertex vertex1 = mockExecutionVertex(new ExecutionAttemptID()); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, @@ -3535,7 +3753,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); coord.setCheckpointStatsTracker(tracker); @@ -3560,13 +3778,18 @@ public class CheckpointCoordinatorTest extends TestLogger { StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, new ExecutionVertex[]{vertex1}, @@ -3575,7 +3798,7 @@ public class CheckpointCoordinatorTest extends TestLogger { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); store.addCheckpoint(new CompletedCheckpoint( new JobID(), @@ -3624,13 +3847,18 @@ public class CheckpointCoordinatorTest extends TestLogger { final List<SharedStateRegistry> createdSharedStateRegistries = new ArrayList<>(2); // set up the coordinator and validate the initial state - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 600000, 600000, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, @@ -3643,7 +3871,7 @@ public class CheckpointCoordinatorTest extends TestLogger { createdSharedStateRegistries.add(instance); return instance; }, - false); + failureManager); final int numCheckpoints = 3; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java new file mode 100644 index 0000000..2f9c151 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManagerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the checkpoint failure manager. + */ +public class CheckpointFailureManagerTest extends TestLogger { + + @Test + public void testContinuousFailure() { + TestFailJobCallback callback = new TestFailJobCallback(); + CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback); + + failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1); + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2); + + //ignore this + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3); + + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 4); + assertEquals(1, callback.getInvokeCounter()); + } + + @Test + public void testBreakContinuousFailure() { + TestFailJobCallback callback = new TestFailJobCallback(); + CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback); + + failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.EXCEPTION), 1); + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2); + + //ignore this + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3); + + //reset + failureManager.handleCheckpointSuccess(4); + + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED), 5); + assertEquals(0, callback.getInvokeCounter()); + } + + @Test + public void testTotalCountValue() { + TestFailJobCallback callback = new TestFailJobCallback(); + CheckpointFailureManager failureManager = new CheckpointFailureManager(0, callback); + for (CheckpointFailureReason reason : CheckpointFailureReason.values()) { + failureManager.handleCheckpointException(new CheckpointException(reason), -1); + } + + assertEquals(1, callback.getInvokeCounter()); + } + + @Test + public void testIgnoreOneCheckpointRepeatedlyCountMultiTimes() { + TestFailJobCallback callback = new TestFailJobCallback(); + CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback); + + failureManager.handleCheckpointException(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 1); + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2); + + //ignore this + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION), 3); + + //ignore repeatedly report from one checkpoint + failureManager.handleCheckpointException( + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED), 2); + assertEquals(0, callback.getInvokeCounter()); + } + + /** + * A failure handler callback for testing. + */ + private static class TestFailJobCallback implements CheckpointFailureManager.FailJobCallback { + + private int invokeCounter = 0; + + @Override + public void failJob() { + invokeCounter++; + } + + public int getInvokeCounter() { + return invokeCounter; + } + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java index 9ece607..fa89018 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java @@ -116,8 +116,11 @@ public abstract class CheckpointIDCounterTest extends TestLogger { counter.start(); assertEquals(1, counter.getAndIncrement()); + assertEquals(2, counter.get()); assertEquals(2, counter.getAndIncrement()); + assertEquals(3, counter.get()); assertEquals(3, counter.getAndIncrement()); + assertEquals(4, counter.get()); assertEquals(4, counter.getAndIncrement()); } finally { @@ -177,6 +180,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger { } // The final count + assertEquals(expectedTotal + 1, counter.get()); assertEquals(expectedTotal + 1, counter.getAndIncrement()); } finally { @@ -198,7 +202,9 @@ public abstract class CheckpointIDCounterTest extends TestLogger { // Test setCount counter.setCount(1337); + assertEquals(1337, counter.get()); assertEquals(1337, counter.getAndIncrement()); + assertEquals(1338, counter.get()); assertEquals(1338, counter.getAndIncrement()); counter.shutdown(JobStatus.FINISHED); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index bf438aa..24e0b85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -91,7 +91,8 @@ public class CheckpointSettingsSerializableTest extends TestLogger { 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, - false), + false, + 0), new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)), serHooks); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 8f4f607..08a7a8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -37,6 +38,7 @@ import org.apache.flink.util.SerializableObject; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import org.mockito.hamcrest.MockitoHamcrest; @@ -61,6 +63,13 @@ public class CheckpointStateRestoreTest { private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location"; + private CheckpointFailureManager failureManager; + + @Before + public void setUp() throws Exception { + failureManager = new CheckpointFailureManager(0, () -> {}); + } + /** * Tests that on restore the task state is reset for each stateful task. */ @@ -97,13 +106,18 @@ public class CheckpointStateRestoreTest { map.put(statefulId, stateful); map.put(statelessId, stateless); - CheckpointCoordinator coord = new CheckpointCoordinator( - jid, + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 200000L, 200000L, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + chkConfig, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[0], @@ -112,7 +126,7 @@ public class CheckpointStateRestoreTest { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // create ourselves a checkpoint with state final long timestamp = 34623786L; @@ -176,13 +190,18 @@ public class CheckpointStateRestoreTest { @Test public void testNoCheckpointAvailable() { try { - CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 200000L, 200000L, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, new ExecutionVertex[] { mock(ExecutionVertex.class) }, new ExecutionVertex[] { mock(ExecutionVertex.class) }, new ExecutionVertex[0], @@ -191,7 +210,7 @@ public class CheckpointStateRestoreTest { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); try { coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false); @@ -235,13 +254,19 @@ public class CheckpointStateRestoreTest { tasks.put(jobVertexId1, jobVertex1); tasks.put(jobVertexId2, jobVertex2); - CheckpointCoordinator coord = new CheckpointCoordinator( - new JobID(), + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( Integer.MAX_VALUE, Integer.MAX_VALUE, 0, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + chkConfig, new ExecutionVertex[] {}, new ExecutionVertex[] {}, new ExecutionVertex[] {}, @@ -250,7 +275,7 @@ public class CheckpointStateRestoreTest { new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, - false); + failureManager); // --- (2) Checkpoint misses state for a jobVertex (should work) --- Map<OperatorID, OperatorState> checkpointTaskStates = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index fce0f3a..69d8997 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -65,7 +65,8 @@ public class CheckpointStatsTrackerTest { 123, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, - false + false, + 0 ), null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index bdb9975..267b685 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -153,12 +154,18 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger { executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 100, + 100, + 100, + 1, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + executionGraph.enableCheckpointing( - 100, - 100, - 100, - 1, - CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + chkConfig, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), @@ -166,8 +173,7 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger { counter, store, new MemoryStateBackend(), - CheckpointStatsTrackerTest.createTestTracker(), - false); + CheckpointStatsTrackerTest.createTestTracker()); JobVertex jobVertex = new JobVertex("MockVertex"); jobVertex.setInvokableClass(AbstractInvokable.class); @@ -199,6 +205,11 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger { } @Override + public long get() { + throw new UnsupportedOperationException("Not implemented."); + } + + @Override public void setCount(long newId) { throw new UnsupportedOperationException("Not implemented."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 4098643..0d481d6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -127,12 +127,18 @@ public class ArchivedExecutionGraphTest extends TestLogger { mock(CheckpointCoordinatorConfiguration.class), new UnregisteredMetricsGroup()); - runtimeGraph.enableCheckpointing( + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( 100, 100, 100, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + true, + false, + 0); + + runtimeGraph.enableCheckpointing( + chkConfig, Collections.<ExecutionJobVertex>emptyList(), Collections.<ExecutionJobVertex>emptyList(), Collections.<ExecutionJobVertex>emptyList(), @@ -140,8 +146,7 @@ public class ArchivedExecutionGraphTest extends TestLogger { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), - statsTracker, - false); + statsTracker); runtimeGraph.setJsonPlan("{}"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java index 9675631..64c9ae4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java @@ -363,7 +363,8 @@ public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger { 3, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, - false); + false, + 0); final ExecutionGraph graph = createSampleGraph( jid, @@ -386,11 +387,7 @@ public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger { final StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter(); graph.enableCheckpointing( - checkpointCoordinatorConfiguration.getCheckpointInterval(), - checkpointCoordinatorConfiguration.getCheckpointTimeout(), - checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(), - checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(), - checkpointCoordinatorConfiguration.getCheckpointRetentionPolicy(), + checkpointCoordinatorConfiguration, allVertices, allVertices, allVertices, @@ -402,8 +399,7 @@ public class ConcurrentFailoverStrategyExecutionGraphTest extends TestLogger { 1, allVertices, checkpointCoordinatorConfiguration, - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()), - false); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup())); final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index fac040a..85d3258 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -803,7 +803,8 @@ public class ExecutionGraphDeploymentTest extends TestLogger { 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, - false), + false, + 0), null)); final Time timeout = Time.seconds(10L); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java index c953b5d..7575dcd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java @@ -605,12 +605,17 @@ public class FailoverRegionTest extends TestLogger { private static void enableCheckpointing(ExecutionGraph eg) { ArrayList<ExecutionJobVertex> jobVertices = new ArrayList<>(eg.getAllVertices().values()); + CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration( + 1000, + 100, + 0, + 1, + CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, + true, + false, + 0); eg.enableCheckpointing( - 1000, - 100, - 0, - 1, - CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, + chkConfig, jobVertices, jobVertices, jobVertices, @@ -622,8 +627,7 @@ public class FailoverRegionTest extends TestLogger { 0, jobVertices, mock(CheckpointCoordinatorConfiguration.class), - new UnregisteredMetricsGroup()), - false); + new UnregisteredMetricsGroup())); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index d778cf0..02bf4c5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -324,9 +324,9 @@ public class JobGraphTest extends TestLogger { } @Test - public void checkpointingIsEnabledIfIntervalIsPositive() { + public void checkpointingIsEnabledIfIntervalIsqAndLegal() { final JobGraph jobGraph = new JobGraph(); - jobGraph.setSnapshotSettings(createCheckpointSettingsWithInterval(1)); + jobGraph.setSnapshotSettings(createCheckpointSettingsWithInterval(10)); assertTrue(jobGraph.isCheckpointingEnabled()); } @@ -347,7 +347,8 @@ public class JobGraphTest extends TestLogger { Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, - false); + false, + 0); return new JobCheckpointingSettings( Collections.emptyList(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java index 4665a63..45f5773 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java @@ -50,7 +50,8 @@ public class JobCheckpointingSettingsTest { 12, CheckpointRetentionPolicy.RETAIN_ON_FAILURE, false, - false), + false, + 0), new SerializedValue<>(new MemoryStateBackend())); JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index c2b0bac..58de718 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1789,7 +1789,8 @@ public class JobMasterTest extends TestLogger { 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, - false); + false, + 0); final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings( Collections.emptyList(), Collections.emptyList(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 8b9ad55..c2c3536 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -75,6 +75,9 @@ public class CheckpointConfig implements java.io.Serializable { /** Determines if a job will fallback to checkpoint when there is a more recent savepoint. **/ private boolean preferCheckpointForRecovery = false; + /** Determines the threshold that we tolerance checkpoint failure number. */ + private int tolerableCheckpointFailureNumber = 0; + // ------------------------------------------------------------------------ /** @@ -125,8 +128,8 @@ public class CheckpointConfig implements java.io.Serializable { * @param checkpointInterval The checkpoint interval, in milliseconds. */ public void setCheckpointInterval(long checkpointInterval) { - if (checkpointInterval <= 0) { - throw new IllegalArgumentException("Checkpoint interval must be larger than zero"); + if (checkpointInterval < 10) { + throw new IllegalArgumentException("Checkpoint interval must be larger than or equal to 10ms"); } this.checkpointInterval = checkpointInterval; } @@ -146,8 +149,8 @@ public class CheckpointConfig implements java.io.Serializable { * @param checkpointTimeout The checkpoint timeout, in milliseconds. */ public void setCheckpointTimeout(long checkpointTimeout) { - if (checkpointTimeout <= 0) { - throw new IllegalArgumentException("Checkpoint timeout must be larger than zero"); + if (checkpointTimeout < 10) { + throw new IllegalArgumentException("Checkpoint timeout must be larger than or equal to 10ms"); } this.checkpointTimeout = checkpointTimeout; } @@ -253,6 +256,22 @@ public class CheckpointConfig implements java.io.Serializable { } /** + * Get the tolerable checkpoint failure number which used by the checkpoint failure manager + * to determine when we need to fail the job. + */ + public int getTolerableCheckpointFailureNumber() { + return tolerableCheckpointFailureNumber; + } + + /** + * Set the tolerable checkpoint failure number, the default value is 0 that means + * we do not tolerance any checkpoint failure. + */ + public void setTolerableCheckpointFailureNumber(int tolerableCheckpointFailureNumber) { + this.tolerableCheckpointFailureNumber = tolerableCheckpointFailureNumber; + } + + /** * Enables checkpoints to be persisted externally. * * <p>Externalized checkpoints write their meta data out to persistent diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index edf0314..e191dea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -606,7 +606,7 @@ public class StreamingJobGraphGenerator { CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval = cfg.getCheckpointInterval(); - if (interval > 0) { + if (interval >= 10) { ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); // propagate the expected behaviour for checkpoint errors to task. executionConfig.setFailTaskOnCheckpointError(cfg.isFailOnCheckpointingErrors()); @@ -724,7 +724,8 @@ public class StreamingJobGraphGenerator { cfg.getMaxConcurrentCheckpoints(), retentionAfterTermination, isExactlyOnce, - cfg.isPreferCheckpointForRecovery()), + cfg.isPreferCheckpointForRecovery(), + cfg.getTolerableCheckpointFailureNumber()), serializedStateBackend, serializedHooks); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index b507b38..63fa1ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -19,13 +19,10 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; -import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -277,7 +274,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler { currentCheckpointId); // let the task know we are not completing this - notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); + notifyAbort(currentCheckpointId, + new CheckpointException( + "Barrier id: " + barrierId, + CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)); // abort the current checkpoint releaseBlocksAndResetBarriers(); @@ -359,7 +359,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { startOfAlignmentTimestamp = 0L; latestAlignmentDurationNanos = 0L; - notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); + notifyAbort(currentCheckpointId, + new CheckpointException( + "Barrier id: " + barrierId, + CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED + )); notifyAbortOnCancellationBarrier(barrierId); } @@ -394,7 +398,8 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (numBarriersReceived > 0) { // let the task know we skip a checkpoint - notifyAbort(currentCheckpointId, new InputEndOfStreamException()); + notifyAbort(currentCheckpointId, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); // no chance to complete this checkpoint releaseBlocksAndResetBarriers(); @@ -420,10 +425,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception { - notifyAbort(checkpointId, new CheckpointDeclineOnCancellationBarrierException()); + notifyAbort(checkpointId, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); } - private void notifyAbort(long checkpointId, CheckpointDeclineException cause) throws Exception { + private void notifyAbort(long checkpointId, CheckpointException cause) throws Exception { if (toNotifyOnCheckpoint != null) { toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause); } @@ -438,7 +444,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler { maxBufferedBytes); releaseBlocksAndResetBarriers(); - notifyAbort(currentCheckpointId, new AlignmentLimitExceededException(maxBufferedBytes)); + notifyAbort(currentCheckpointId, + new CheckpointException( + "Max buffered bytes: " + maxBufferedBytes, + CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED)); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java index ee0b3a2..49d2991 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java @@ -19,10 +19,11 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; @@ -290,7 +291,8 @@ public class BarrierTracker implements CheckpointBarrierHandler { private void notifyAbort(long checkpointId) throws Exception { if (toNotifyOnCheckpoint != null) { toNotifyOnCheckpoint.abortCheckpointOnBarrier( - checkpointId, new CheckpointDeclineOnCancellationBarrierException()); + checkpointId, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java index 2e4ba51..0a284e1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -20,10 +20,10 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -136,7 +136,8 @@ public class BarrierBufferAlignmentLimitTest { // trying to pull the next makes the alignment overflow - so buffered buffers are replayed check(sequence[5], buffer.pollNext().get()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), + argThat(new BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED))); // playing back buffered events check(sequence[7], buffer.pollNext().get()); @@ -231,7 +232,8 @@ public class BarrierBufferAlignmentLimitTest { // checkpoint alignment aborted due to too much data check(sequence[4], buffer.pollNext().get()); validateAlignmentTime(startTs, buffer); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), + argThat(new BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED))); // replay buffered data - in the middle, the alignment for checkpoint 4 starts check(sequence[6], buffer.pollNext().get()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java index 6475bfc..c9981b5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java @@ -20,12 +20,11 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; -import org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -50,7 +49,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; @@ -498,7 +496,8 @@ public abstract class BarrierBufferTestBase { check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); assertEquals(3L, buffer.getCurrentCheckpointId()); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); - verify(toNotify).abortCheckpointOnBarrier(eq(2L), isA(CheckpointDeclineSubsumedException.class)); + verify(toNotify).abortCheckpointOnBarrier(eq(2L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED))); check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 alignment in progress @@ -506,7 +505,8 @@ public abstract class BarrierBufferTestBase { // checkpoint 3 aborted (end of partition) check(sequence[20], buffer.pollNext().get(), PAGE_SIZE); - verify(toNotify).abortCheckpointOnBarrier(eq(3L), isA(InputEndOfStreamException.class)); + verify(toNotify).abortCheckpointOnBarrier(eq(3L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM))); // replay buffered data from checkpoint 3 check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); @@ -854,13 +854,15 @@ public abstract class BarrierBufferTestBase { check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); assertEquals(5L, buffer.getCurrentCheckpointId()); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); assertEquals(6L, buffer.getCurrentCheckpointId()); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); assertEquals(0L, buffer.getAlignmentDurationNanos()); } @@ -923,7 +925,8 @@ public abstract class BarrierBufferTestBase { // canceled checkpoint on last barrier startTs = System.nanoTime(); check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); @@ -938,7 +941,8 @@ public abstract class BarrierBufferTestBase { // this checkpoint gets immediately canceled check(sequence[24], buffer.pollNext().get(), PAGE_SIZE); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); assertEquals(0L, buffer.getAlignmentDurationNanos()); // some buffers @@ -954,7 +958,8 @@ public abstract class BarrierBufferTestBase { check(sequence[33], buffer.pollNext().get(), PAGE_SIZE); check(sequence[37], buffer.pollNext().get(), PAGE_SIZE); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); assertEquals(0L, buffer.getAlignmentDurationNanos()); } @@ -1008,7 +1013,8 @@ public abstract class BarrierBufferTestBase { // re-read the queued cancellation barriers check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); @@ -1025,7 +1031,8 @@ public abstract class BarrierBufferTestBase { // no further checkpoint (abort) notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); - verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); } /** @@ -1085,7 +1092,8 @@ public abstract class BarrierBufferTestBase { // cancelled by cancellation barrier check(sequence[4], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); - verify(toNotify).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify).abortCheckpointOnBarrier(eq(1L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); // the next checkpoint alignment starts now startTs = System.nanoTime(); @@ -1160,7 +1168,8 @@ public abstract class BarrierBufferTestBase { // future barrier aborts checkpoint startTs = System.nanoTime(); check(sequence[3], buffer.pollNext().get(), PAGE_SIZE); - verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), + argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED))); check(sequence[4], buffer.pollNext().get(), PAGE_SIZE); // alignment of next checkpoint @@ -1329,4 +1338,28 @@ public abstract class BarrierBufferTestBase { description.appendText("CheckpointMetaData - id = " + checkpointId); } } + + /** + * A validation matcher for checkpoint exception against failure reason. + */ + public static class CheckpointExceptionMatcher extends BaseMatcher<CheckpointException> { + + private final CheckpointFailureReason failureReason; + + public CheckpointExceptionMatcher(CheckpointFailureReason failureReason) { + this.failureReason = failureReason; + } + + @Override + public boolean matches(Object o) { + return o != null && + o.getClass() == CheckpointException.class && + ((CheckpointException) o).getCheckpointFailureReason().equals(failureReason); + } + + @Override + public void describeTo(Description description) { + description.appendText("CheckpointException - reason = " + failureReason); + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java index d1b3697..56c3889 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationBarrierTest.java @@ -20,9 +20,9 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.operators.co.CoStreamMap; +import org.apache.flink.streaming.runtime.io.BarrierBufferTestBase; import org.junit.Test; @@ -38,11 +39,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; /** * Test checkpoint cancellation barrier. @@ -108,7 +109,8 @@ public class StreamTaskCancellationBarrierTest { testHarness.waitForInputProcessing(); // the decline call should go to the coordinator - verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(environment, times(1)).declineCheckpoint(eq(2L), + argThat(new BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); // a cancellation barrier should be downstream Object result = testHarness.getOutput().poll(); @@ -152,7 +154,8 @@ public class StreamTaskCancellationBarrierTest { testHarness.waitForInputProcessing(); // the decline call should go to the coordinator - verify(environment, times(1)).declineCheckpoint(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(environment, times(1)).declineCheckpoint(eq(2L), + argThat(new BarrierBufferTestBase.CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); // a cancellation barrier should be downstream Object result = testHarness.getOutput().poll(); diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java index 57a5121..1e383f8 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java @@ -245,7 +245,8 @@ public class JobMasterStopWithSavepointIT extends AbstractTestBase { 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, - false), + false, + 0), null)); clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java index 6635e31..9a1b4c1 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java @@ -107,7 +107,8 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase { 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, - false), + false, + 0), null)); clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java index 332584d..bd9cd75 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java @@ -609,7 +609,7 @@ public class IterateITCase extends AbstractTestBase { // Test force checkpointing try { - env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false); + env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE, false); env.execute(); // this statement should never be reached @@ -618,7 +618,7 @@ public class IterateITCase extends AbstractTestBase { // expected behaviour } - env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true); + env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE, true); env.getStreamGraph().getJobGraph(); break; // success