This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c4b0e8f [FLINK-10724] Refactor failure handling in check point coordinator c4b0e8f is described below commit c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df Author: vinoyang <yanghua1...@gmail.com> AuthorDate: Mon Apr 29 15:56:04 2019 +0800 [FLINK-10724] Refactor failure handling in check point coordinator This closes #7571. --- .../runtime/checkpoint/CheckpointCoordinator.java | 89 +++++++++++---------- .../runtime/checkpoint/CheckpointException.java | 30 +++++-- ...ineReason.java => CheckpointFailureReason.java} | 28 ++++++- .../checkpoint/CheckpointTriggerException.java | 42 ---------- .../checkpoint/CheckpointTriggerResult.java | 92 ---------------------- .../runtime/checkpoint/PendingCheckpoint.java | 51 +++--------- .../executiongraph/failover/FailoverRegion.java | 5 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 12 +-- .../checkpoint/CheckpointCoordinatorTest.java | 38 +++++---- .../runtime/checkpoint/PendingCheckpointTest.java | 30 +++---- .../jobmaster/JobMasterTriggerSavepointITCase.java | 4 +- .../test/streaming/runtime/TimestampITCase.java | 4 +- 12 files changed, 160 insertions(+), 265 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e6cc5d3..c7f59a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -338,7 +338,7 @@ public class CheckpointCoordinator { // clear and discard all pending checkpoints for (PendingCheckpoint pending : pendingCheckpoints.values()) { - pending.abortError(new Exception("Checkpoint Coordinator is shutting down")); + pending.abort(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN); } pendingCheckpoints.clear(); @@ -405,17 +405,17 @@ public class CheckpointCoordinator { checkNotNull(checkpointProperties); - CheckpointTriggerResult triggerResult = triggerCheckpoint( - timestamp, - checkpointProperties, - targetLocation, - false, - advanceToEndOfEventTime); - - if (triggerResult.isSuccess()) { - return triggerResult.getPendingCheckpoint().getCompletionFuture(); - } else { - Throwable cause = new CheckpointTriggerException("Failed to trigger savepoint.", triggerResult.getFailureReason()); + try { + PendingCheckpoint pendingCheckpoint = triggerCheckpoint( + timestamp, + checkpointProperties, + targetLocation, + false, + advanceToEndOfEventTime); + + return pendingCheckpoint.getCompletionFuture(); + } catch (CheckpointException e) { + Throwable cause = new CheckpointException("Failed to trigger savepoint.", e.getCheckpointFailureReason()); return FutureUtils.completedExceptionally(cause); } } @@ -431,16 +431,21 @@ public class CheckpointCoordinator { * @return <code>true</code> if triggering the checkpoint succeeded. */ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { - return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false).isSuccess(); + try { + triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); + return true; + } catch (CheckpointException e) { + return false; + } } @VisibleForTesting - public CheckpointTriggerResult triggerCheckpoint( + public PendingCheckpoint triggerCheckpoint( long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic, - boolean advanceToEndOfTime) { + boolean advanceToEndOfTime) throws CheckpointException { if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) { throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX."); @@ -450,12 +455,12 @@ public class CheckpointCoordinator { synchronized (lock) { // abort if the coordinator has been shutdown in the meantime if (shutdown) { - return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); + throw new CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN); } // Don't allow periodic checkpoint if scheduling has been disabled if (isPeriodic && !periodicScheduling) { - return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN); + throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN); } // validate whether the checkpoint can be triggered, with respect to the limit of @@ -465,7 +470,7 @@ public class CheckpointCoordinator { // sanity check: there should never be more than one trigger request queued if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job); - return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); + throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED); } // if too many checkpoints are currently in progress, we need to mark that a request is queued @@ -475,7 +480,7 @@ public class CheckpointCoordinator { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } - return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); + throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed @@ -492,7 +497,7 @@ public class CheckpointCoordinator { new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); - return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); + throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } } @@ -506,7 +511,7 @@ public class CheckpointCoordinator { LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job); - return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); + throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } else if (ee.getState() == ExecutionState.RUNNING) { executions[i] = ee; } else { @@ -515,7 +520,7 @@ public class CheckpointCoordinator { job, ExecutionState.RUNNING, ee.getState()); - return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); + throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } @@ -531,7 +536,7 @@ public class CheckpointCoordinator { LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.", ev.getTaskNameWithSubtaskIndex(), job); - return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); + throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } @@ -561,7 +566,7 @@ public class CheckpointCoordinator { job, numUnsuccessful, t); - return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); + throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t); } final PendingCheckpoint checkpoint = new PendingCheckpoint( @@ -590,7 +595,7 @@ public class CheckpointCoordinator { if (!checkpoint.isDiscarded()) { LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job); - checkpoint.abortExpired(); + checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED); pendingCheckpoints.remove(checkpointID); rememberRecentCheckpointId(checkpointID); @@ -605,12 +610,12 @@ public class CheckpointCoordinator { // since we released the lock in the meantime, we need to re-check // that the conditions still hold. if (shutdown) { - return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); + throw new CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN); } else if (!props.forceCheckpoint()) { if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job); - return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); + throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED); } if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { @@ -619,7 +624,7 @@ public class CheckpointCoordinator { currentPeriodicTrigger.cancel(false); currentPeriodicTrigger = null; } - return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS); + throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS); } // make sure the minimum interval between checkpoints has passed @@ -637,7 +642,7 @@ public class CheckpointCoordinator { new ScheduledTrigger(), durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS); - return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); + throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS); } } @@ -677,7 +682,7 @@ public class CheckpointCoordinator { } numUnsuccessfulCheckpointsTriggers.set(0); - return new CheckpointTriggerResult(checkpoint); + return checkpoint; } catch (Throwable t) { // guard the map against concurrent modifications @@ -690,7 +695,7 @@ public class CheckpointCoordinator { checkpointID, job, numUnsuccessful, t); if (!checkpoint.isDiscarded()) { - checkpoint.abortError(new Exception("Failed to trigger checkpoint", t)); + checkpoint.abort(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t); } try { @@ -700,7 +705,7 @@ public class CheckpointCoordinator { LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2); } - return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); + throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t); } } // end trigger lock @@ -879,10 +884,11 @@ public class CheckpointCoordinator { catch (Exception e1) { // abort the current pending checkpoint if we fails to finalize the pending checkpoint. if (!pendingCheckpoint.isDiscarded()) { - pendingCheckpoint.abortError(e1); + pendingCheckpoint.abort(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); } - throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1); + throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', + CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1); } // the pending checkpoint must be discarded after the finalization @@ -903,7 +909,8 @@ public class CheckpointCoordinator { } }); - throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception); + throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', + CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception); } } finally { pendingCheckpoints.remove(checkpointId); @@ -984,7 +991,7 @@ public class CheckpointCoordinator { // remove all pending checkpoints that are lesser than the current completed checkpoint if (p.getCheckpointId() < checkpointId && p.canBeSubsumed()) { rememberRecentCheckpointId(p.getCheckpointId()); - p.abortSubsumed(); + p.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED); entries.remove(); } } @@ -1244,7 +1251,7 @@ public class CheckpointCoordinator { currentPeriodicTrigger = null; } - abortPendingCheckpoints(new Exception("Checkpoint Coordinator is suspending.")); + abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND)); numUnsuccessfulCheckpointsTriggers.set(0); } @@ -1254,10 +1261,10 @@ public class CheckpointCoordinator { * Aborts all the pending checkpoints due to en exception. * @param exception The exception. */ - public void abortPendingCheckpoints(Exception exception) { + public void abortPendingCheckpoints(CheckpointException exception) { synchronized (lock) { for (PendingCheckpoint p : pendingCheckpoints.values()) { - p.abortError(exception); + p.abort(exception.getCheckpointFailureReason()); } pendingCheckpoints.clear(); @@ -1312,9 +1319,9 @@ public class CheckpointCoordinator { LOG.info("Discarding checkpoint {} of job {}.", checkpointId, job, cause); if (cause == null || cause instanceof CheckpointDeclineException) { - pendingCheckpoint.abortDeclined(); + pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause); } else { - pendingCheckpoint.abortError(cause); + pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause); } rememberRecentCheckpointId(checkpointId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java index 707878c..c0bc2d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java @@ -18,18 +18,38 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.util.Preconditions; + /** * Base class for checkpoint related exceptions. */ public class CheckpointException extends Exception { - private static final long serialVersionUID = -4341865597039002540L; + private static final long serialVersionUID = 3257526119022486948L; + + private final CheckpointFailureReason checkpointFailureReason; + + public CheckpointException(CheckpointFailureReason failureReason) { + super(failureReason.message()); + this.checkpointFailureReason = Preconditions.checkNotNull(failureReason); + } + + public CheckpointException(String message, CheckpointFailureReason failureReason) { + super(message + " Failure reason: " + failureReason.message()); + this.checkpointFailureReason = Preconditions.checkNotNull(failureReason); + } + + public CheckpointException(CheckpointFailureReason failureReason, Throwable cause) { + super(failureReason.message(), cause); + this.checkpointFailureReason = Preconditions.checkNotNull(failureReason); + } - public CheckpointException(String message, Throwable cause) { - super(message, cause); + public CheckpointException(String message, CheckpointFailureReason failureReason, Throwable cause) { + super(message + " Failure reason: " + failureReason.message(), cause); + this.checkpointFailureReason = Preconditions.checkNotNull(failureReason); } - public CheckpointException(String message) { - super(message); + public CheckpointFailureReason getCheckpointFailureReason() { + return checkpointFailureReason; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java similarity index 69% rename from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java index 41c50cc0..35f457a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java @@ -19,9 +19,9 @@ package org.apache.flink.runtime.checkpoint; /** - * Various reasons why a checkpoint was declined. + * Various reasons why a checkpoint was failure. */ -public enum CheckpointDeclineReason { +public enum CheckpointFailureReason { COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."), @@ -38,13 +38,33 @@ public enum CheckpointDeclineReason { EXCEPTION("An Exception occurred while triggering the checkpoint."), - EXPIRED("The checkpoint expired before triggering was complete"); + EXPIRED("The checkpoint expired before triggering was complete"), + + CHECKPOINT_EXPIRED("Checkpoint expired before completing."), + + CHECKPOINT_SUBSUMED("Checkpoint has been subsumed."), + + CHECKPOINT_DECLINED("Checkpoint was declined (tasks not ready)."), + + CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."), + + CHECKPOINT_COORDINATOR_SUSPEND("Checkpoint Coordinator is suspending."), + + JOB_FAILURE("The job has failed."), + + JOB_FAILOVER_REGION("FailoverRegion is restarting."), + + TASK_CHECKPOINT_FAILURE("Task local checkpoint failure."), + + FINALIZE_CHECKPOINT_FAILURE("Failure to finalize checkpoint."), + + TRIGGER_CHECKPOINT_FAILURE("Trigger checkpoint failure."); // ------------------------------------------------------------------------ private final String message; - CheckpointDeclineReason(String message) { + CheckpointFailureReason(String message) { this.message = message; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java deleted file mode 100644 index cb0402a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint; - -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; - -/** - * Exceptions which indicate that a checkpoint triggering has failed. - * - */ -public class CheckpointTriggerException extends FlinkException { - - private static final long serialVersionUID = -3330160816161901752L; - - private final CheckpointDeclineReason checkpointDeclineReason; - - public CheckpointTriggerException(String message, CheckpointDeclineReason checkpointDeclineReason) { - super(message + " Decline reason: " + checkpointDeclineReason.message()); - this.checkpointDeclineReason = Preconditions.checkNotNull(checkpointDeclineReason); - } - - public CheckpointDeclineReason getCheckpointDeclineReason() { - return checkpointDeclineReason; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java deleted file mode 100644 index 8689f72..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The result of triggering a checkpoint. May either be a declined checkpoint - * trigger attempt, or a pending checkpoint. - */ -public class CheckpointTriggerResult { - - /** If success, the pending checkpoint created after the successfully trigger, otherwise null */ - private final PendingCheckpoint success; - - /** If failure, the reason why the triggering was declined, otherwise null. */ - private final CheckpointDeclineReason failure; - - // ------------------------------------------------------------------------ - - /** - * Creates a successful checkpoint trigger result. - * - * @param success The pending checkpoint created after the successfully trigger. - */ - CheckpointTriggerResult(PendingCheckpoint success) { - this.success = checkNotNull(success); - this.failure = null; - } - - /** - * Creates a failed checkpoint trigger result. - * - * @param failure The reason why the checkpoint could not be triggered. - */ - CheckpointTriggerResult(CheckpointDeclineReason failure) { - this.success = null; - this.failure = checkNotNull(failure); - } - - // ------------------------------------------------------------------------ - - public boolean isSuccess() { - return success != null; - } - - public boolean isFailure() { - return failure != null; - } - - public PendingCheckpoint getPendingCheckpoint() { - if (success != null) { - return success; - } else { - throw new IllegalStateException("Checkpoint triggering failed"); - } - } - - public CheckpointDeclineReason getFailureReason() { - if (failure != null) { - return failure; - } else { - throw new IllegalStateException("Checkpoint triggering was successful"); - } - } - - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "CheckpointTriggerResult(" + - (isSuccess() ? - ("success: " + success) : - ("failure: " + failure.message())) + ")"; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 1bc6b0e..d03c28f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -34,7 +34,6 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; @@ -405,54 +404,30 @@ public class PendingCheckpoint { // ------------------------------------------------------------------------ /** - * Aborts a checkpoint because it expired (took too long). + * Aborts a checkpoint with reason and cause. */ - public void abortExpired() { + public void abort(CheckpointFailureReason reason, Throwable cause) { try { - Exception cause = new Exception("Checkpoint expired before completing"); - onCompletionPromise.completeExceptionally(cause); - reportFailedCheckpoint(cause); + CheckpointException exception = new CheckpointException(reason, cause); + onCompletionPromise.completeExceptionally(exception); + reportFailedCheckpoint(exception); + assertAbortSubsumedForced(reason); } finally { dispose(true); } } /** - * Aborts the pending checkpoint because a newer completed checkpoint subsumed it. + * Aborts a checkpoint with reason and cause. */ - public void abortSubsumed() { - try { - Exception cause = new Exception("Checkpoints has been subsumed"); - onCompletionPromise.completeExceptionally(cause); - reportFailedCheckpoint(cause); - - if (props.forceCheckpoint()) { - throw new IllegalStateException("Bug: forced checkpoints must never be subsumed"); - } - } finally { - dispose(true); - } - } - - - public void abortDeclined() { - abortWithCause(new Exception("Checkpoint was declined (tasks not ready)")); - } - - /** - * Aborts the pending checkpoint due to an error. - * @param cause The error's exception. - */ - public void abortError(@Nonnull Throwable cause) { - abortWithCause(new Exception("Checkpoint failed: " + cause.getMessage(), cause)); + public void abort(CheckpointFailureReason reason) { + abort(reason, null); } - private void abortWithCause(@Nonnull Exception cause) { - try { - onCompletionPromise.completeExceptionally(cause); - reportFailedCheckpoint(cause); - } finally { - dispose(true); + private void assertAbortSubsumedForced(CheckpointFailureReason reason) { + if (props.forceCheckpoint() && reason == CheckpointFailureReason.CHECKPOINT_SUBSUMED) { + throw new IllegalStateException("Bug: forced checkpoints must never be subsumed, " + + "the abort reason is : " + reason.message()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java index 635a7f5..4cc2dc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.executiongraph.failover; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.Execution; @@ -210,7 +212,8 @@ public class FailoverRegion { // we restart the checkpoint scheduler for // i) enable new checkpoint could be triggered without waiting for last checkpoint expired. // ii) ensure the EXACTLY_ONCE semantics if needed. - executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(new Exception("FailoverRegion is restarting.")); + executionGraph.getCheckpointCoordinator().abortPendingCheckpoints( + new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION)); executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState( tasks, false, true); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 1bbfb62..f912f45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -31,9 +31,9 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; @@ -1508,13 +1508,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast (String savepointPath, Throwable throwable) -> { if (throwable != null) { final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - if (strippedThrowable instanceof CheckpointTriggerException) { - final CheckpointTriggerException checkpointTriggerException = (CheckpointTriggerException) strippedThrowable; + if (strippedThrowable instanceof CheckpointException) { + final CheckpointException checkpointException = (CheckpointException) strippedThrowable; - if (checkpointTriggerException.getCheckpointDeclineReason() == CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { + if (checkpointException.getCheckpointFailureReason() == CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING) { return lastInternalSavepoint; } else { - throw new CompletionException(checkpointTriggerException); + throw new CompletionException(checkpointException); } } else { throw new CompletionException(strippedThrowable); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 437b084..e1f8f9c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -3139,25 +3139,29 @@ public class CheckpointCoordinatorTest extends TestLogger { SharedStateRegistry.DEFAULT_FACTORY); // Periodic - CheckpointTriggerResult triggerResult = coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - true, - false); - - assertTrue(triggerResult.isFailure()); - assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason()); + try { + coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + true, + false); + fail("The triggerCheckpoint call expected an exception"); + } catch (CheckpointException e) { + assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, e.getCheckpointFailureReason()); + } // Not periodic - triggerResult = coord.triggerCheckpoint( - System.currentTimeMillis(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), - null, - false, - false); - - assertFalse(triggerResult.isFailure()); + try { + coord.triggerCheckpoint( + System.currentTimeMillis(), + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + null, + false, + false); + } catch (CheckpointException e) { + fail("Unexpected exception : " + e.getCheckpointFailureReason().message()); + } } private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java index 7554d4a..6cce28f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java @@ -93,7 +93,7 @@ public class PendingCheckpointTest { assertFalse(pending.canBeSubsumed()); try { - pending.abortSubsumed(); + pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED); fail("Did not throw expected Exception"); } catch (IllegalStateException ignored) { // Expected @@ -113,7 +113,7 @@ public class PendingCheckpointTest { assertFalse(pending.canBeSubsumed()); try { - pending.abortSubsumed(); + pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED); fail("Did not throw expected Exception"); } catch (IllegalStateException ignored) { // Expected @@ -133,7 +133,7 @@ public class PendingCheckpointTest { CompletableFuture<CompletedCheckpoint> future = pending.getCompletionFuture(); assertFalse(future.isDone()); - pending.abortDeclined(); + pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED); assertTrue(future.isDone()); // Abort expired @@ -141,7 +141,7 @@ public class PendingCheckpointTest { future = pending.getCompletionFuture(); assertFalse(future.isDone()); - pending.abortExpired(); + pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED); assertTrue(future.isDone()); // Abort subsumed @@ -149,7 +149,7 @@ public class PendingCheckpointTest { future = pending.getCompletionFuture(); assertFalse(future.isDone()); - pending.abortSubsumed(); + pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED); assertTrue(future.isDone()); // Finalize (all ACK'd) @@ -191,7 +191,7 @@ public class PendingCheckpointTest { PendingCheckpoint pending = createPendingCheckpoint(props, executor); setTaskState(pending, state); - pending.abortDeclined(); + pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED); // execute asynchronous discard operation executor.runQueuedCommands(); verify(state, times(1)).discardState(); @@ -202,7 +202,7 @@ public class PendingCheckpointTest { pending = createPendingCheckpoint(props, executor); setTaskState(pending, state); - pending.abortError(new Exception("Expected Test Exception")); + pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, new Exception("Expected Test Exception")); // execute asynchronous discard operation executor.runQueuedCommands(); verify(state, times(1)).discardState(); @@ -213,7 +213,7 @@ public class PendingCheckpointTest { pending = createPendingCheckpoint(props, executor); setTaskState(pending, state); - pending.abortExpired(); + pending.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED); // execute asynchronous discard operation executor.runQueuedCommands(); verify(state, times(1)).discardState(); @@ -224,7 +224,7 @@ public class PendingCheckpointTest { pending = createPendingCheckpoint(props, executor); setTaskState(pending, state); - pending.abortSubsumed(); + pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED); // execute asynchronous discard operation executor.runQueuedCommands(); verify(state, times(1)).discardState(); @@ -256,7 +256,7 @@ public class PendingCheckpointTest { CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); pending.setStatsCallback(callback); - pending.abortSubsumed(); + pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED); verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); } @@ -267,7 +267,7 @@ public class PendingCheckpointTest { CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); pending.setStatsCallback(callback); - pending.abortDeclined(); + pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED); verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); } @@ -278,7 +278,7 @@ public class PendingCheckpointTest { CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); pending.setStatsCallback(callback); - pending.abortError(new Exception("Expected test error")); + pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED, new Exception("Expected test error")); verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); } @@ -289,7 +289,7 @@ public class PendingCheckpointTest { CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)); pending.setStatsCallback(callback); - pending.abortExpired(); + pending.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED); verify(callback, times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class)); } } @@ -327,7 +327,7 @@ public class PendingCheckpointTest { final CheckpointProperties props = new CheckpointProperties(false, CheckpointType.CHECKPOINT, true, true, true, true, true); PendingCheckpoint aborted = createPendingCheckpoint(props); - aborted.abortDeclined(); + aborted.abort(CheckpointFailureReason.CHECKPOINT_DECLINED); assertTrue(aborted.isDiscarded()); assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class))); @@ -335,7 +335,7 @@ public class PendingCheckpointTest { ScheduledFuture<?> canceller = mock(ScheduledFuture.class); assertTrue(pending.setCancellerHandle(canceller)); - pending.abortDeclined(); + pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED); verify(canceller).cancel(false); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java index b781e41..75e9ed1 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java @@ -20,11 +20,11 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; -import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.Environment; @@ -160,7 +160,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase { try { cancelWithSavepoint(); } catch (Exception e) { - assertThat(ExceptionUtils.findThrowable(e, CheckpointTriggerException.class).isPresent(), equalTo(true)); + assertThat(ExceptionUtils.findThrowable(e, CheckpointException.class).isPresent(), equalTo(true)); } final JobStatus jobStatus = clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 2665d09..2a1b824 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -27,7 +27,7 @@ import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.MultiShotLatch; -import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -198,7 +198,7 @@ public class TimestampITCase extends TestLogger { } catch (Exception e) { if ( - !(e.getCause() instanceof CheckpointTriggerException) || + !(e.getCause() instanceof CheckpointException) || !e.getCause().getMessage().contains("Not all required tasks are currently running.") ) { throw e;