This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b0e8f  [FLINK-10724] Refactor failure handling in check point 
coordinator
c4b0e8f is described below

commit c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df
Author: vinoyang <yanghua1...@gmail.com>
AuthorDate: Mon Apr 29 15:56:04 2019 +0800

    [FLINK-10724] Refactor failure handling in check point coordinator
    
    This closes #7571.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 89 +++++++++++----------
 .../runtime/checkpoint/CheckpointException.java    | 30 +++++--
 ...ineReason.java => CheckpointFailureReason.java} | 28 ++++++-
 .../checkpoint/CheckpointTriggerException.java     | 42 ----------
 .../checkpoint/CheckpointTriggerResult.java        | 92 ----------------------
 .../runtime/checkpoint/PendingCheckpoint.java      | 51 +++---------
 .../executiongraph/failover/FailoverRegion.java    |  5 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 12 +--
 .../checkpoint/CheckpointCoordinatorTest.java      | 38 +++++----
 .../runtime/checkpoint/PendingCheckpointTest.java  | 30 +++----
 .../jobmaster/JobMasterTriggerSavepointITCase.java |  4 +-
 .../test/streaming/runtime/TimestampITCase.java    |  4 +-
 12 files changed, 160 insertions(+), 265 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e6cc5d3..c7f59a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -338,7 +338,7 @@ public class CheckpointCoordinator {
 
                                // clear and discard all pending checkpoints
                                for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
-                                       pending.abortError(new 
Exception("Checkpoint Coordinator is shutting down"));
+                                       
pending.abort(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
                                }
                                pendingCheckpoints.clear();
 
@@ -405,17 +405,17 @@ public class CheckpointCoordinator {
 
                checkNotNull(checkpointProperties);
 
-               CheckpointTriggerResult triggerResult = triggerCheckpoint(
-                               timestamp,
-                               checkpointProperties,
-                               targetLocation,
-                               false,
-                               advanceToEndOfEventTime);
-
-               if (triggerResult.isSuccess()) {
-                       return 
triggerResult.getPendingCheckpoint().getCompletionFuture();
-               } else {
-                       Throwable cause = new 
CheckpointTriggerException("Failed to trigger savepoint.", 
triggerResult.getFailureReason());
+               try {
+                       PendingCheckpoint pendingCheckpoint = triggerCheckpoint(
+                                       timestamp,
+                                       checkpointProperties,
+                                       targetLocation,
+                                       false,
+                                       advanceToEndOfEventTime);
+
+                       return pendingCheckpoint.getCompletionFuture();
+               } catch (CheckpointException e) {
+                       Throwable cause = new CheckpointException("Failed to 
trigger savepoint.", e.getCheckpointFailureReason());
                        return FutureUtils.completedExceptionally(cause);
                }
        }
@@ -431,16 +431,21 @@ public class CheckpointCoordinator {
         * @return <code>true</code> if triggering the checkpoint succeeded.
         */
        public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
-               return triggerCheckpoint(timestamp, checkpointProperties, null, 
isPeriodic, false).isSuccess();
+               try {
+                       triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
+                       return true;
+               } catch (CheckpointException e) {
+                       return false;
+               }
        }
 
        @VisibleForTesting
-       public CheckpointTriggerResult triggerCheckpoint(
+       public PendingCheckpoint triggerCheckpoint(
                        long timestamp,
                        CheckpointProperties props,
                        @Nullable String externalSavepointLocation,
                        boolean isPeriodic,
-                       boolean advanceToEndOfTime) {
+                       boolean advanceToEndOfTime) throws CheckpointException {
 
                if (advanceToEndOfTime && !(props.isSynchronous() && 
props.isSavepoint())) {
                        throw new IllegalArgumentException("Only synchronous 
savepoints are allowed to advance the watermark to MAX.");
@@ -450,12 +455,12 @@ public class CheckpointCoordinator {
                synchronized (lock) {
                        // abort if the coordinator has been shutdown in the 
meantime
                        if (shutdown) {
-                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
+                               throw new 
CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN);
                        }
 
                        // Don't allow periodic checkpoint if scheduling has 
been disabled
                        if (isPeriodic && !periodicScheduling) {
-                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
+                               throw new 
CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
                        }
 
                        // validate whether the checkpoint can be triggered, 
with respect to the limit of
@@ -465,7 +470,7 @@ public class CheckpointCoordinator {
                                // sanity check: there should never be more 
than one trigger request queued
                                if (triggerRequestQueued) {
                                        LOG.warn("Trying to trigger another 
checkpoint for job {} while one was queued already.", job);
-                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
+                                       throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
                                }
 
                                // if too many checkpoints are currently in 
progress, we need to mark that a request is queued
@@ -475,7 +480,7 @@ public class CheckpointCoordinator {
                                                
currentPeriodicTrigger.cancel(false);
                                                currentPeriodicTrigger = null;
                                        }
-                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+                                       throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                                }
 
                                // make sure the minimum interval between 
checkpoints has passed
@@ -492,7 +497,7 @@ public class CheckpointCoordinator {
                                                        new ScheduledTrigger(),
                                                        durationTillNextMillis, 
baseInterval, TimeUnit.MILLISECONDS);
 
-                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
+                                       throw new 
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                }
                        }
                }
@@ -506,7 +511,7 @@ public class CheckpointCoordinator {
                                LOG.info("Checkpoint triggering task {} of job 
{} is not being executed at the moment. Aborting checkpoint.",
                                                
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                                                job);
-                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+                               throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                        } else if (ee.getState() == ExecutionState.RUNNING) {
                                executions[i] = ee;
                        } else {
@@ -515,7 +520,7 @@ public class CheckpointCoordinator {
                                                job,
                                                ExecutionState.RUNNING,
                                                ee.getState());
-                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+                               throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                        }
                }
 
@@ -531,7 +536,7 @@ public class CheckpointCoordinator {
                                LOG.info("Checkpoint acknowledging task {} of 
job {} is not being executed at the moment. Aborting checkpoint.",
                                                
ev.getTaskNameWithSubtaskIndex(),
                                                job);
-                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
+                               throw new 
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                        }
                }
 
@@ -561,7 +566,7 @@ public class CheckpointCoordinator {
                                                job,
                                                numUnsuccessful,
                                                t);
-                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+                               throw new 
CheckpointException(CheckpointFailureReason.EXCEPTION, t);
                        }
 
                        final PendingCheckpoint checkpoint = new 
PendingCheckpoint(
@@ -590,7 +595,7 @@ public class CheckpointCoordinator {
                                        if (!checkpoint.isDiscarded()) {
                                                LOG.info("Checkpoint {} of job 
{} expired before completing.", checkpointID, job);
 
-                                               checkpoint.abortExpired();
+                                               
checkpoint.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
                                                
pendingCheckpoints.remove(checkpointID);
                                                
rememberRecentCheckpointId(checkpointID);
 
@@ -605,12 +610,12 @@ public class CheckpointCoordinator {
                                        // since we released the lock in the 
meantime, we need to re-check
                                        // that the conditions still hold.
                                        if (shutdown) {
-                                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
+                                               throw new 
CheckpointException(CheckpointFailureReason.COORDINATOR_SHUTDOWN);
                                        }
                                        else if (!props.forceCheckpoint()) {
                                                if (triggerRequestQueued) {
                                                        LOG.warn("Trying to 
trigger another checkpoint for job {} while one was queued already.", job);
-                                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
+                                                       throw new 
CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
                                                }
 
                                                if (pendingCheckpoints.size() 
>= maxConcurrentCheckpointAttempts) {
@@ -619,7 +624,7 @@ public class CheckpointCoordinator {
                                                                
currentPeriodicTrigger.cancel(false);
                                                                
currentPeriodicTrigger = null;
                                                        }
-                                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
+                                                       throw new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                                                }
 
                                                // make sure the minimum 
interval between checkpoints has passed
@@ -637,7 +642,7 @@ public class CheckpointCoordinator {
                                                                        new 
ScheduledTrigger(),
                                                                        
durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
 
-                                                       return new 
CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
+                                                       throw new 
CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                                }
                                        }
 
@@ -677,7 +682,7 @@ public class CheckpointCoordinator {
                                }
 
                                numUnsuccessfulCheckpointsTriggers.set(0);
-                               return new CheckpointTriggerResult(checkpoint);
+                               return checkpoint;
                        }
                        catch (Throwable t) {
                                // guard the map against concurrent 
modifications
@@ -690,7 +695,7 @@ public class CheckpointCoordinator {
                                                checkpointID, job, 
numUnsuccessful, t);
 
                                if (!checkpoint.isDiscarded()) {
-                                       checkpoint.abortError(new 
Exception("Failed to trigger checkpoint", t));
+                                       
checkpoint.abort(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
                                }
 
                                try {
@@ -700,7 +705,7 @@ public class CheckpointCoordinator {
                                        LOG.warn("Cannot dispose failed 
checkpoint storage location {}", checkpointStorageLocation, t2);
                                }
 
-                               return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
+                               throw new 
CheckpointException(CheckpointFailureReason.EXCEPTION, t);
                        }
 
                } // end trigger lock
@@ -879,10 +884,11 @@ public class CheckpointCoordinator {
                        catch (Exception e1) {
                                // abort the current pending checkpoint if we 
fails to finalize the pending checkpoint.
                                if (!pendingCheckpoint.isDiscarded()) {
-                                       pendingCheckpoint.abortError(e1);
+                                       
pendingCheckpoint.abort(CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, 
e1);
                                }
 
-                               throw new CheckpointException("Could not 
finalize the pending checkpoint " + checkpointId + '.', e1);
+                               throw new CheckpointException("Could not 
finalize the pending checkpoint " + checkpointId + '.',
+                                       
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
                        }
 
                        // the pending checkpoint must be discarded after the 
finalization
@@ -903,7 +909,8 @@ public class CheckpointCoordinator {
                                        }
                                });
 
-                               throw new CheckpointException("Could not 
complete the pending checkpoint " + checkpointId + '.', exception);
+                               throw new CheckpointException("Could not 
complete the pending checkpoint " + checkpointId + '.',
+                                       
CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
                        }
                } finally {
                        pendingCheckpoints.remove(checkpointId);
@@ -984,7 +991,7 @@ public class CheckpointCoordinator {
                        // remove all pending checkpoints that are lesser than 
the current completed checkpoint
                        if (p.getCheckpointId() < checkpointId && 
p.canBeSubsumed()) {
                                rememberRecentCheckpointId(p.getCheckpointId());
-                               p.abortSubsumed();
+                               
p.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
                                entries.remove();
                        }
                }
@@ -1244,7 +1251,7 @@ public class CheckpointCoordinator {
                                currentPeriodicTrigger = null;
                        }
 
-                       abortPendingCheckpoints(new Exception("Checkpoint 
Coordinator is suspending."));
+                       abortPendingCheckpoints(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND));
 
                        numUnsuccessfulCheckpointsTriggers.set(0);
                }
@@ -1254,10 +1261,10 @@ public class CheckpointCoordinator {
         * Aborts all the pending checkpoints due to en exception.
         * @param exception The exception.
         */
-       public void abortPendingCheckpoints(Exception exception) {
+       public void abortPendingCheckpoints(CheckpointException exception) {
                synchronized (lock) {
                        for (PendingCheckpoint p : pendingCheckpoints.values()) 
{
-                               p.abortError(exception);
+                               p.abort(exception.getCheckpointFailureReason());
                        }
 
                        pendingCheckpoints.clear();
@@ -1312,9 +1319,9 @@ public class CheckpointCoordinator {
                LOG.info("Discarding checkpoint {} of job {}.", checkpointId, 
job, cause);
 
                if (cause == null || cause instanceof 
CheckpointDeclineException) {
-                       pendingCheckpoint.abortDeclined();
+                       
pendingCheckpoint.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, cause);
                } else {
-                       pendingCheckpoint.abortError(cause);
+                       
pendingCheckpoint.abort(CheckpointFailureReason.JOB_FAILURE, cause);
                }
 
                rememberRecentCheckpointId(checkpointId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
index 707878c..c0bc2d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointException.java
@@ -18,18 +18,38 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * Base class for checkpoint related exceptions.
  */
 public class CheckpointException extends Exception {
 
-       private static final long serialVersionUID = -4341865597039002540L;
+       private static final long serialVersionUID = 3257526119022486948L;
+
+       private final CheckpointFailureReason checkpointFailureReason;
+
+       public CheckpointException(CheckpointFailureReason failureReason) {
+               super(failureReason.message());
+               this.checkpointFailureReason = 
Preconditions.checkNotNull(failureReason);
+       }
+
+       public CheckpointException(String message, CheckpointFailureReason 
failureReason) {
+               super(message + " Failure reason: " + failureReason.message());
+               this.checkpointFailureReason = 
Preconditions.checkNotNull(failureReason);
+       }
+
+       public CheckpointException(CheckpointFailureReason failureReason, 
Throwable cause) {
+               super(failureReason.message(), cause);
+               this.checkpointFailureReason = 
Preconditions.checkNotNull(failureReason);
+       }
 
-       public CheckpointException(String message, Throwable cause) {
-               super(message, cause);
+       public CheckpointException(String message, CheckpointFailureReason 
failureReason, Throwable cause) {
+               super(message + " Failure reason: " + failureReason.message(), 
cause);
+               this.checkpointFailureReason = 
Preconditions.checkNotNull(failureReason);
        }
 
-       public CheckpointException(String message) {
-               super(message);
+       public CheckpointFailureReason getCheckpointFailureReason() {
+               return checkpointFailureReason;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
similarity index 69%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 41c50cc0..35f457a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.checkpoint;
 
 /**
- * Various reasons why a checkpoint was declined.
+ * Various reasons why a checkpoint was failure.
  */
-public enum CheckpointDeclineReason {
+public enum CheckpointFailureReason {
 
        COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."),
 
@@ -38,13 +38,33 @@ public enum CheckpointDeclineReason {
 
        EXCEPTION("An Exception occurred while triggering the checkpoint."),
 
-       EXPIRED("The checkpoint expired before triggering was complete");
+       EXPIRED("The checkpoint expired before triggering was complete"),
+
+       CHECKPOINT_EXPIRED("Checkpoint expired before completing."),
+
+       CHECKPOINT_SUBSUMED("Checkpoint has been subsumed."),
+
+       CHECKPOINT_DECLINED("Checkpoint was declined (tasks not ready)."),
+
+       CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
+
+       CHECKPOINT_COORDINATOR_SUSPEND("Checkpoint Coordinator is suspending."),
+
+       JOB_FAILURE("The job has failed."),
+
+       JOB_FAILOVER_REGION("FailoverRegion is restarting."),
+
+       TASK_CHECKPOINT_FAILURE("Task local checkpoint failure."),
+
+       FINALIZE_CHECKPOINT_FAILURE("Failure to finalize checkpoint."),
+
+       TRIGGER_CHECKPOINT_FAILURE("Trigger checkpoint failure.");
 
        // 
------------------------------------------------------------------------
 
        private final String message;
 
-       CheckpointDeclineReason(String message) {
+       CheckpointFailureReason(String message) {
                this.message = message;
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
deleted file mode 100644
index cb0402a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Exceptions which indicate that a checkpoint triggering has failed.
- *
- */
-public class CheckpointTriggerException extends FlinkException {
-
-       private static final long serialVersionUID = -3330160816161901752L;
-
-       private final CheckpointDeclineReason checkpointDeclineReason;
-
-       public CheckpointTriggerException(String message, 
CheckpointDeclineReason checkpointDeclineReason) {
-               super(message + " Decline reason: " + 
checkpointDeclineReason.message());
-               this.checkpointDeclineReason = 
Preconditions.checkNotNull(checkpointDeclineReason);
-       }
-
-       public CheckpointDeclineReason getCheckpointDeclineReason() {
-               return checkpointDeclineReason;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
deleted file mode 100644
index 8689f72..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointTriggerResult.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The result of triggering a checkpoint. May either be a declined checkpoint
- * trigger attempt, or a pending checkpoint.
- */
-public class CheckpointTriggerResult {
-
-       /** If success, the pending checkpoint created after the successfully 
trigger, otherwise null */
-       private final PendingCheckpoint success;
-
-       /** If failure, the reason why the triggering was declined, otherwise 
null. */
-       private final CheckpointDeclineReason failure;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a successful checkpoint trigger result.
-        * 
-        * @param success The pending checkpoint created after the successfully 
trigger.
-        */
-       CheckpointTriggerResult(PendingCheckpoint success) {
-               this.success = checkNotNull(success);
-               this.failure = null;
-       }
-
-       /**
-        * Creates a failed checkpoint trigger result. 
-        * 
-        * @param failure The reason why the checkpoint could not be triggered.
-        */
-       CheckpointTriggerResult(CheckpointDeclineReason failure) {
-               this.success = null;
-               this.failure = checkNotNull(failure);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       public boolean isSuccess() {
-               return success != null;
-       }
-
-       public boolean isFailure() {
-               return failure != null;
-       }
-
-       public PendingCheckpoint getPendingCheckpoint() {
-               if (success != null) {
-                       return success;
-               } else {
-                       throw new IllegalStateException("Checkpoint triggering 
failed");
-               }
-       }
-
-       public CheckpointDeclineReason getFailureReason() {
-               if (failure != null) {
-                       return failure;
-               } else {
-                       throw new IllegalStateException("Checkpoint triggering 
was successful");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "CheckpointTriggerResult(" +
-                               (isSuccess() ?
-                                               ("success: " + success) :
-                                               ("failure: " + 
failure.message())) + ")";
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 1bc6b0e..d03c28f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -34,7 +34,6 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -405,54 +404,30 @@ public class PendingCheckpoint {
        // 
------------------------------------------------------------------------
 
        /**
-        * Aborts a checkpoint because it expired (took too long).
+        * Aborts a checkpoint with reason and cause.
         */
-       public void abortExpired() {
+       public void abort(CheckpointFailureReason reason, Throwable cause) {
                try {
-                       Exception cause = new Exception("Checkpoint expired 
before completing");
-                       onCompletionPromise.completeExceptionally(cause);
-                       reportFailedCheckpoint(cause);
+                       CheckpointException exception = new 
CheckpointException(reason, cause);
+                       onCompletionPromise.completeExceptionally(exception);
+                       reportFailedCheckpoint(exception);
+                       assertAbortSubsumedForced(reason);
                } finally {
                        dispose(true);
                }
        }
 
        /**
-        * Aborts the pending checkpoint because a newer completed checkpoint 
subsumed it.
+        * Aborts a checkpoint with reason and cause.
         */
-       public void abortSubsumed() {
-               try {
-                       Exception cause = new Exception("Checkpoints has been 
subsumed");
-                       onCompletionPromise.completeExceptionally(cause);
-                       reportFailedCheckpoint(cause);
-
-                       if (props.forceCheckpoint()) {
-                               throw new IllegalStateException("Bug: forced 
checkpoints must never be subsumed");
-                       }
-               } finally {
-                       dispose(true);
-               }
-       }
-
-
-       public void abortDeclined() {
-               abortWithCause(new Exception("Checkpoint was declined (tasks 
not ready)"));
-       }
-
-       /**
-        * Aborts the pending checkpoint due to an error.
-        * @param cause The error's exception.
-        */
-       public void abortError(@Nonnull Throwable cause) {
-               abortWithCause(new Exception("Checkpoint failed: " + 
cause.getMessage(), cause));
+       public void abort(CheckpointFailureReason reason) {
+               abort(reason, null);
        }
 
-       private void abortWithCause(@Nonnull Exception cause) {
-               try {
-                       onCompletionPromise.completeExceptionally(cause);
-                       reportFailedCheckpoint(cause);
-               } finally {
-                       dispose(true);
+       private void assertAbortSubsumedForced(CheckpointFailureReason reason) {
+               if (props.forceCheckpoint() && reason == 
CheckpointFailureReason.CHECKPOINT_SUBSUMED) {
+                       throw new IllegalStateException("Bug: forced 
checkpoints must never be subsumed, " +
+                               "the abort reason is : " + reason.message());
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 635a7f5..4cc2dc6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.executiongraph.failover;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -210,7 +212,8 @@ public class FailoverRegion {
                                        // we restart the checkpoint scheduler 
for
                                        // i) enable new checkpoint could be 
triggered without waiting for last checkpoint expired.
                                        // ii) ensure the EXACTLY_ONCE 
semantics if needed.
-                                       
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(new 
Exception("FailoverRegion is restarting."));
+                                       
executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+                                               new 
CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
 
                                        
executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
                                                tasks, false, true);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1bbfb62..f912f45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -31,9 +31,9 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
@@ -1508,13 +1508,13 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                (String savepointPath, Throwable throwable) -> {
                                        if (throwable != null) {
                                                final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
-                                               if (strippedThrowable 
instanceof CheckpointTriggerException) {
-                                                       final 
CheckpointTriggerException checkpointTriggerException = 
(CheckpointTriggerException) strippedThrowable;
+                                               if (strippedThrowable 
instanceof CheckpointException) {
+                                                       final 
CheckpointException checkpointException = (CheckpointException) 
strippedThrowable;
 
-                                                       if 
(checkpointTriggerException.getCheckpointDeclineReason() == 
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+                                                       if 
(checkpointException.getCheckpointFailureReason() == 
CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
                                                                return 
lastInternalSavepoint;
                                                        } else {
-                                                               throw new 
CompletionException(checkpointTriggerException);
+                                                               throw new 
CompletionException(checkpointException);
                                                        }
                                                } else {
                                                        throw new 
CompletionException(strippedThrowable);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 437b084..e1f8f9c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -3139,25 +3139,29 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        SharedStateRegistry.DEFAULT_FACTORY);
 
                // Periodic
-               CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
-                               System.currentTimeMillis(),
-                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               null,
-                               true,
-                               false);
-
-               assertTrue(triggerResult.isFailure());
-               
assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, 
triggerResult.getFailureReason());
+               try {
+                       coord.triggerCheckpoint(
+                                       System.currentTimeMillis(),
+                                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                                       null,
+                                       true,
+                                       false);
+                       fail("The triggerCheckpoint call expected an 
exception");
+               } catch (CheckpointException e) {
+                       
assertEquals(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN, 
e.getCheckpointFailureReason());
+               }
 
                // Not periodic
-               triggerResult = coord.triggerCheckpoint(
-                               System.currentTimeMillis(),
-                               
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
-                               null,
-                               false,
-                               false);
-
-               assertFalse(triggerResult.isFailure());
+               try {
+                       coord.triggerCheckpoint(
+                                       System.currentTimeMillis(),
+                                       
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                                       null,
+                                       false,
+                                       false);
+               } catch (CheckpointException e) {
+                       fail("Unexpected exception : " + 
e.getCheckpointFailureReason().message());
+               }
        }
 
        private void testCreateKeyGroupPartitions(int maxParallelism, int 
parallelism) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 7554d4a..6cce28f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -93,7 +93,7 @@ public class PendingCheckpointTest {
                assertFalse(pending.canBeSubsumed());
 
                try {
-                       pending.abortSubsumed();
+                       
pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
                        fail("Did not throw expected Exception");
                } catch (IllegalStateException ignored) {
                        // Expected
@@ -113,7 +113,7 @@ public class PendingCheckpointTest {
                assertFalse(pending.canBeSubsumed());
 
                try {
-                       pending.abortSubsumed();
+                       
pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
                        fail("Did not throw expected Exception");
                } catch (IllegalStateException ignored) {
                        // Expected
@@ -133,7 +133,7 @@ public class PendingCheckpointTest {
                CompletableFuture<CompletedCheckpoint> future = 
pending.getCompletionFuture();
 
                assertFalse(future.isDone());
-               pending.abortDeclined();
+               pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
                assertTrue(future.isDone());
 
                // Abort expired
@@ -141,7 +141,7 @@ public class PendingCheckpointTest {
                future = pending.getCompletionFuture();
 
                assertFalse(future.isDone());
-               pending.abortExpired();
+               pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
                assertTrue(future.isDone());
 
                // Abort subsumed
@@ -149,7 +149,7 @@ public class PendingCheckpointTest {
                future = pending.getCompletionFuture();
 
                assertFalse(future.isDone());
-               pending.abortSubsumed();
+               pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
                assertTrue(future.isDone());
 
                // Finalize (all ACK'd)
@@ -191,7 +191,7 @@ public class PendingCheckpointTest {
                PendingCheckpoint pending = createPendingCheckpoint(props, 
executor);
                setTaskState(pending, state);
 
-               pending.abortDeclined();
+               pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
                // execute asynchronous discard operation
                executor.runQueuedCommands();
                verify(state, times(1)).discardState();
@@ -202,7 +202,7 @@ public class PendingCheckpointTest {
                pending = createPendingCheckpoint(props, executor);
                setTaskState(pending, state);
 
-               pending.abortError(new Exception("Expected Test Exception"));
+               pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED, new 
Exception("Expected Test Exception"));
                // execute asynchronous discard operation
                executor.runQueuedCommands();
                verify(state, times(1)).discardState();
@@ -213,7 +213,7 @@ public class PendingCheckpointTest {
                pending = createPendingCheckpoint(props, executor);
                setTaskState(pending, state);
 
-               pending.abortExpired();
+               pending.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
                // execute asynchronous discard operation
                executor.runQueuedCommands();
                verify(state, times(1)).discardState();
@@ -224,7 +224,7 @@ public class PendingCheckpointTest {
                pending = createPendingCheckpoint(props, executor);
                setTaskState(pending, state);
 
-               pending.abortSubsumed();
+               pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
                // execute asynchronous discard operation
                executor.runQueuedCommands();
                verify(state, times(1)).discardState();
@@ -256,7 +256,7 @@ public class PendingCheckpointTest {
                                        
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
                        pending.setStatsCallback(callback);
 
-                       pending.abortSubsumed();
+                       
pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED);
                        verify(callback, 
times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
                }
 
@@ -267,7 +267,7 @@ public class PendingCheckpointTest {
                                        
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
                        pending.setStatsCallback(callback);
 
-                       pending.abortDeclined();
+                       
pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
                        verify(callback, 
times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
                }
 
@@ -278,7 +278,7 @@ public class PendingCheckpointTest {
                                        
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
                        pending.setStatsCallback(callback);
 
-                       pending.abortError(new Exception("Expected test 
error"));
+                       
pending.abort(CheckpointFailureReason.CHECKPOINT_SUBSUMED, new 
Exception("Expected test error"));
                        verify(callback, 
times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
                }
 
@@ -289,7 +289,7 @@ public class PendingCheckpointTest {
                                        
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
                        pending.setStatsCallback(callback);
 
-                       pending.abortExpired();
+                       
pending.abort(CheckpointFailureReason.CHECKPOINT_EXPIRED);
                        verify(callback, 
times(1)).reportFailedCheckpoint(anyLong(), any(Exception.class));
                }
        }
@@ -327,7 +327,7 @@ public class PendingCheckpointTest {
                final CheckpointProperties props = new 
CheckpointProperties(false, CheckpointType.CHECKPOINT, true, true, true, true, 
true);
 
                PendingCheckpoint aborted = createPendingCheckpoint(props);
-               aborted.abortDeclined();
+               aborted.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
                assertTrue(aborted.isDiscarded());
                
assertFalse(aborted.setCancellerHandle(mock(ScheduledFuture.class)));
 
@@ -335,7 +335,7 @@ public class PendingCheckpointTest {
                ScheduledFuture<?> canceller = mock(ScheduledFuture.class);
 
                assertTrue(pending.setCancellerHandle(canceller));
-               pending.abortDeclined();
+               pending.abort(CheckpointFailureReason.CHECKPOINT_DECLINED);
                verify(canceller).cancel(false);
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
index b781e41..75e9ed1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
@@ -160,7 +160,7 @@ public class JobMasterTriggerSavepointITCase extends 
AbstractTestBase {
                try {
                        cancelWithSavepoint();
                } catch (Exception e) {
-                       assertThat(ExceptionUtils.findThrowable(e, 
CheckpointTriggerException.class).isPresent(), equalTo(true));
+                       assertThat(ExceptionUtils.findThrowable(e, 
CheckpointException.class).isPresent(), equalTo(true));
                }
 
                final JobStatus jobStatus = 
clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 2665d09..2a1b824 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -27,7 +27,7 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -198,7 +198,7 @@ public class TimestampITCase extends TestLogger {
                                                }
                                                catch (Exception e) {
                                                        if (
-                                                                       
!(e.getCause() instanceof CheckpointTriggerException) ||
+                                                                       
!(e.getCause() instanceof CheckpointException) ||
                                                                        
!e.getCause().getMessage().contains("Not all required tasks are currently 
running.")
                                                        ) {
                                                                throw e;

Reply via email to