http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html index faeac34..d21349c 100644 --- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html +++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html @@ -50,6 +50,10 @@ limitations under the License. <p><strong>Average:</strong><span> {{ jobCheckpointStats['size']['avg'] | humanizeBytes }}</span></p> </td> </tr> + <tr ng-if="jobCheckpointStats['external-path']"> + <td colspan="4"><strong>Latest Checkpoint Path:</strong> {{ jobCheckpointStats['external-path'] }} + </td> + </tr> </tbody> </table> <div ng-if="!showHistory && jobCheckpointStats && jobCheckpointStats['history'].length > 0"><a ng-click="toggleHistory()" class="btn btn-default"><strong>Show history</strong> ({{ jobCheckpointStats['history'].length }}) <i class="fa fa-chevron-down"></i></a></div>
http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e95afe0..ab4bde7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -18,17 +18,20 @@ package org.apache.flink.runtime.checkpoint; -import akka.dispatch.Futures; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; @@ -43,7 +46,6 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.concurrent.Future; import java.util.ArrayDeque; import java.util.ArrayList; @@ -70,7 +72,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class CheckpointCoordinator { - protected static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class); + private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class); /** The number of recent checkpoints whose IDs are remembered */ private static final int NUM_GHOST_CHECKPOINT_IDS = 16; @@ -106,9 +108,9 @@ public class CheckpointCoordinator { * accessing this don't block the job manager actor and run asynchronously. */ private final CompletedCheckpointStore completedCheckpointStore; - /** Store for savepoints. */ - private final SavepointStore savepointStore; - + /** Default directory for persistent checkpoints; <code>null</code> if none configured. */ + private final String checkpointDirectory; + /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */ private final ArrayDeque<Long> recentPendingCheckpoints; @@ -157,6 +159,9 @@ public class CheckpointCoordinator { /** Helper for tracking checkpoint statistics */ private final CheckpointStatsTracker statsTracker; + /** Default checkpoint properties **/ + private final CheckpointProperties checkpointProperties; + // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -165,12 +170,13 @@ public class CheckpointCoordinator { long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpointAttempts, + ExternalizedCheckpointSettings externalizeSettings, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, - SavepointStore savepointStore, + String checkpointDirectory, CheckpointStatsTracker statsTracker) { // sanity checks @@ -179,6 +185,12 @@ public class CheckpointCoordinator { checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0"); checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1"); + if (externalizeSettings.externalizeCheckpoints() && checkpointDirectory == null) { + throw new IllegalStateException("CheckpointConfig says to persist periodic " + + "checkpoints, but no checkpoint directory has been configured. You can " + + "configure configure one via key '" + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'."); + } + // it does not make sense to schedule checkpoints more often then the desired // time between checkpoints if (baseInterval < minPauseBetweenCheckpoints) { @@ -196,12 +208,19 @@ public class CheckpointCoordinator { this.pendingCheckpoints = new LinkedHashMap<>(); this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.completedCheckpointStore = checkNotNull(completedCheckpointStore); - this.savepointStore = checkNotNull(savepointStore); + this.checkpointDirectory = checkpointDirectory; this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.statsTracker = checkNotNull(statsTracker); this.timer = new Timer("Checkpoint Timer", true); + if (externalizeSettings.externalizeCheckpoints()) { + LOG.info("Persisting periodic checkpoints externally at {}.", checkpointDirectory); + checkpointProperties = CheckpointProperties.forExternalizedCheckpoint(externalizeSettings.deleteOnCancellation()); + } else { + checkpointProperties = CheckpointProperties.forStandardCheckpoint(); + } + try { // Make sure the checkpoint ID enumerator is running. Possibly // issues a blocking call to ZooKeeper. @@ -219,33 +238,9 @@ public class CheckpointCoordinator { * Shuts down the checkpoint coordinator. * * <p>After this method has been called, the coordinator does not accept - * and further messages and cannot trigger any further checkpoints. All - * checkpoint state is discarded. - */ - public void shutdown() throws Exception { - shutdown(true); - } - - /** - * Suspends the checkpoint coordinator. - * - * <p>After this method has been called, the coordinator does not accept * and further messages and cannot trigger any further checkpoints. - * - * <p>The difference to shutdown is that checkpoint state in the store - * and counter is kept around if possible to recover later. */ - public void suspend() throws Exception { - shutdown(false); - } - - /** - * Shuts down the checkpoint coordinator. - * - * @param shutdownStoreAndCounter Depending on this flag the checkpoint - * state services are shut down or suspended. - */ - private void shutdown(boolean shutdownStoreAndCounter) throws Exception { + public void shutdown(JobStatus jobStatus) throws Exception { synchronized (lock) { if (!shutdown) { shutdown = true; @@ -263,13 +258,8 @@ public class CheckpointCoordinator { } pendingCheckpoints.clear(); - if (shutdownStoreAndCounter) { - completedCheckpointStore.shutdown(); - checkpointIdCounter.shutdown(); - } else { - completedCheckpointStore.suspend(); - checkpointIdCounter.suspend(); - } + completedCheckpointStore.shutdown(jobStatus); + checkpointIdCounter.shutdown(jobStatus); } } } @@ -282,29 +272,49 @@ public class CheckpointCoordinator { // Handling checkpoints and messages // -------------------------------------------------------------------------------------------- - public Future<String> triggerSavepoint(long timestamp) throws Exception { - CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint()); + /** + * Triggers a savepoint with the given savepoint directory as a target. + * + * @param timestamp The timestamp for the savepoint. + * @param targetDirectory Target directory for the savepoint. + * @return A future to the completed checkpoint + * @throws IllegalStateException If no savepoint directory has been + * specified and no default savepoint directory has been + * configured + * @throws Exception Failures during triggering are forwarded + */ + public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception { + checkNotNull(targetDirectory, "Savepoint target directory"); + + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory); if (result.isSuccess()) { - PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint(); - return savepoint.getCompletionFuture(); - } - else { - return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); + return result.getPendingCheckpoint().getCompletionFuture(); + } else { + Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()); + Future<CompletedCheckpoint> failed = FlinkCompletableFuture.completedExceptionally(cause); + return failed; } } /** - * Triggers a new checkpoint and uses the given timestamp as the checkpoint + * Triggers a new standard checkpoint and uses the given timestamp as the checkpoint * timestamp. * * @param timestamp The timestamp for the checkpoint. + * @return <code>true</code> if triggering the checkpoint succeeded. */ public boolean triggerCheckpoint(long timestamp) throws Exception { - return triggerCheckpoint(timestamp, CheckpointProperties.forStandardCheckpoint()).isSuccess(); + return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory).isSuccess(); } - CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props) throws Exception { + CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props, String targetDirectory) throws Exception { + // Sanity check + if (props.externalizeCheckpoint() && targetDirectory == null) { + throw new IllegalStateException("No target directory specified to persist checkpoint to."); + } + // make some eager pre-checks synchronized (lock) { // abort if the coordinator has been shutdown in the meantime @@ -315,7 +325,7 @@ public class CheckpointCoordinator { // validate whether the checkpoint can be triggered, with respect to the limit of // concurrent checkpoints, and the minimum time between checkpoints. // these checks are not relevant for savepoints - if (!props.isSavepoint()) { + if (!props.forceCheckpoint()) { // sanity check: there should never be more than one trigger request queued if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint while one was queued already"); @@ -402,9 +412,13 @@ public class CheckpointCoordinator { return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION); } - final PendingCheckpoint checkpoint = props.isSavepoint() ? - new PendingSavepoint(job, checkpointID, timestamp, ackTasks, savepointStore) : - new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); + final PendingCheckpoint checkpoint = new PendingCheckpoint( + job, + checkpointID, + timestamp, + ackTasks, + props, + targetDirectory); // schedule the timer that will clean up the expired checkpoints TimerTask canceller = new TimerTask() { @@ -439,7 +453,7 @@ public class CheckpointCoordinator { if (shutdown) { return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN); } - else if (!props.isSavepoint()) { + else if (!props.forceCheckpoint()) { if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint while one was queued already"); return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED); @@ -566,7 +580,7 @@ public class CheckpointCoordinator { } if (!haveMoreRecentPending && !triggerRequestQueued) { LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); - triggerCheckpoint(System.currentTimeMillis()); + triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory()); } else if (!haveMoreRecentPending) { LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); triggerQueuedRequests(); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java index 76af4be..48cec7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java @@ -18,29 +18,27 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.jobgraph.JobStatus; + /** * A checkpoint ID counter. */ public interface CheckpointIDCounter { /** - * Starts the {@link CheckpointIDCounter} service. + * Starts the {@link CheckpointIDCounter} service down. */ void start() throws Exception; /** - * Shuts the {@link CheckpointIDCounter} service down and frees all created - * resources. - */ - void shutdown() throws Exception; - - /** - * Suspends the counter. + * Shuts the {@link CheckpointIDCounter} service. + * + * <p>The job status is forwarded and used to decide whether state should + * actually be discarded or kept. * - * <p>If the implementation allows recovery, the counter state needs to be - * kept. Otherwise, this acts as shutdown. + * @param jobStatus Job state on shut down */ - void suspend() throws Exception; + void shutdown(JobStatus jobStatus) throws Exception; /** * Atomically increments the current checkpoint ID. http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java index 7ea645a..e4856cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java @@ -18,44 +18,252 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.jobgraph.JobStatus; + +import java.io.Serializable; + /** * The configuration of a checkpoint, such as whether * <ul> - * <li>The checkpoint is a savepoint</li> - * <li>The checkpoint must be full, or may be incremental</li> - * <li>The checkpoint format must be the common (cross backend) format, or may be state-backend specific</li> + * <li>The checkpoint should be persisted</li> + * <li>The checkpoint must be full, or may be incremental (not yet implemented)</li> + * <li>The checkpoint format must be the common (cross backend) format, + * or may be state-backend specific (not yet implemented)</li> + * <li>when the checkpoint should be garbage collected</li> * </ul> */ -public class CheckpointProperties { +public class CheckpointProperties implements Serializable { + + private static final long serialVersionUID = -8835900655844879469L; + + private final boolean forced; + + private final boolean externalize; - private final boolean isSavepoint; + private final boolean discardSubsumed; + private final boolean discardFinished; + private final boolean discardCancelled; + private final boolean discardFailed; + private final boolean discardSuspended; - private CheckpointProperties(boolean isSavepoint) { - this.isSavepoint = isSavepoint; + CheckpointProperties( + boolean forced, + boolean externalize, + boolean discardSubsumed, + boolean discardFinished, + boolean discardCancelled, + boolean discardFailed, + boolean discardSuspended) { + + this.forced = forced; + this.externalize = externalize; + this.discardSubsumed = discardSubsumed; + this.discardFinished = discardFinished; + this.discardCancelled = discardCancelled; + this.discardFailed = discardFailed; + this.discardSuspended = discardSuspended; + + // Not persisted, but needs manual clean up + if (!externalize && !(discardSubsumed && discardFinished && discardCancelled + && discardFailed && discardSuspended)) { + throw new IllegalStateException("CheckpointProperties say to *not* persist the " + + "checkpoint, but the checkpoint requires manual cleanup."); + } } // ------------------------------------------------------------------------ - public boolean isSavepoint() { - return isSavepoint; + /** + * Returns whether the checkpoint should be forced. + * + * <p>Forced checkpoints ignore the configured maximum number of concurrent + * checkpoints and minimum time between checkpoints. Furthermore, they are + * not subsumed by more recent checkpoints as long as they are pending. + * + * @return <code>true</code> if the checkpoint should be forced; + * <code>false</code> otherwise. + * + * @see CheckpointCoordinator + * @see PendingCheckpoint + */ + public boolean forceCheckpoint() { + return forced; + } + + /** + * Returns whether the checkpoint should be persisted externally. + * + * @return <code>true</code> if the checkpoint should be persisted + * externally; <code>false</code> otherwise. + * + * @see PendingCheckpoint + */ + public boolean externalizeCheckpoint() { + return externalize; } // ------------------------------------------------------------------------ + // Garbage collection behaviour + // ------------------------------------------------------------------------ + + /** + * Returns whether the checkpoint should be discarded when it is subsumed. + * + * <p>A checkpoint is subsumed when the maximum number of retained + * checkpoints is reached and a more recent checkpoint completes.. + * + * @return <code>true</code> if the checkpoint should be discarded when it + * is subsumed; <code>false</code> otherwise. + * + * @see CompletedCheckpointStore + */ + public boolean discardOnSubsumed() { + return discardSubsumed; + } + + /** + * Returns whether the checkpoint should be discarded when the owning job + * reaches the {@link JobStatus#FINISHED} state. + * + * @return <code>true</code> if the checkpoint should be discarded when the + * owning job reaches the {@link JobStatus#FINISHED} state; <code>false</code> + * otherwise. + * + * @see CompletedCheckpointStore + */ + public boolean discardOnJobFinished() { + return discardFinished; + } + + /** + * Returns whether the checkpoint should be discarded when the owning job + * reaches the {@link JobStatus#CANCELED} state. + * + * @return <code>true</code> if the checkpoint should be discarded when the + * owning job reaches the {@link JobStatus#CANCELED} state; <code>false</code> + * otherwise. + * + * @see CompletedCheckpointStore + */ + public boolean discardOnJobCancelled() { + return discardCancelled; + } + + /** + * Returns whether the checkpoint should be discarded when the owning job + * reaches the {@link JobStatus#FAILED} state. + * + * @return <code>true</code> if the checkpoint should be discarded when the + * owning job reaches the {@link JobStatus#FAILED} state; <code>false</code> + * otherwise. + * + * @see CompletedCheckpointStore + */ + public boolean discardOnJobFailed() { + return discardFailed; + } + + /** + * Returns whether the checkpoint should be discarded when the owning job + * reaches the {@link JobStatus#SUSPENDED} state. + * + * @return <code>true</code> if the checkpoint should be discarded when the + * owning job reaches the {@link JobStatus#SUSPENDED} state; <code>false</code> + * otherwise. + * + * @see CompletedCheckpointStore + */ + public boolean discardOnJobSuspended() { + return discardSuspended; + } + + // ------------------------------------------------------------------------ + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + CheckpointProperties that = (CheckpointProperties) o; + return forced == that.forced && + externalize == that.externalize && + discardSubsumed == that.discardSubsumed && + discardFinished == that.discardFinished && + discardCancelled == that.discardCancelled && + discardFailed == that.discardFailed && + discardSuspended == that.discardSuspended; + } + + @Override + public int hashCode() { + int result = (forced ? 1 : 0); + result = 31 * result + (externalize ? 1 : 0); + result = 31 * result + (discardSubsumed ? 1 : 0); + result = 31 * result + (discardFinished ? 1 : 0); + result = 31 * result + (discardCancelled ? 1 : 0); + result = 31 * result + (discardFailed ? 1 : 0); + result = 31 * result + (discardSuspended ? 1 : 0); + return result; + } @Override public String toString() { - return "CheckpointProperties {" + - "isSavepoint=" + isSavepoint + + return "CheckpointProperties{" + + "forced=" + forced + + ", externalize=" + externalizeCheckpoint() + + ", discardSubsumed=" + discardSubsumed + + ", discardFinished=" + discardFinished + + ", discardCancelled=" + discardCancelled + + ", discardFailed=" + discardFailed + + ", discardSuspended=" + discardSuspended + '}'; } // ------------------------------------------------------------------------ + /** + * Creates the checkpoint properties for a (manually triggered) savepoint. + * + * <p>Savepoints are forced and persisted externally. They have to be + * garbage collected manually. + * + * @return Checkpoint properties for a (manually triggered) savepoint. + */ public static CheckpointProperties forStandardSavepoint() { - return new CheckpointProperties(true); + return new CheckpointProperties(true, true, false, false, false, false, false); } + /** + * Creates the checkpoint properties for a regular checkpoint. + * + * <p>Regular checkpoints are not forced and not persisted externally. They + * are garbage collected automatically. + * + * @return Checkpoint properties for a regular checkpoint. + */ public static CheckpointProperties forStandardCheckpoint() { - return new CheckpointProperties(false); + return new CheckpointProperties(false, false, true, true, true, true, true); + } + + /** + * Creates the checkpoint properties for an external checkpoint. + * + * <p>External checkpoints are not forced, but persisted externally. They + * are garbage collected automatically, except when the owning job + * terminates in state {@link JobStatus#FAILED}. The user is required to + * configure the clean up behaviour on job cancellation. + * + * @param deleteOnCancellation Flag indicating whether to discard on cancellation. + * + * @return Checkpoint properties for an external checkpoint. + */ + public static CheckpointProperties forExternalizedCheckpoint(boolean deleteOnCancellation) { + return new CheckpointProperties(false, true, true, true, deleteOnCancellation, false, true); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 0d279f1..e135272 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -19,11 +19,15 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.Serializable; import java.util.Map; import java.util.Objects; @@ -34,7 +38,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) * and that is considered completed. */ -public class CompletedCheckpoint implements StateObject { +public class CompletedCheckpoint implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(CompletedCheckpoint.class); private static final long serialVersionUID = -8360248179615702014L; @@ -51,9 +57,11 @@ public class CompletedCheckpoint implements StateObject { /** States of the different task groups belonging to this checkpoint */ private final Map<JobVertexID, TaskState> taskStates; - /** Flag to indicate whether the completed checkpoint data should be deleted when this - * handle to the checkpoint is disposed */ - private final boolean deleteStateWhenDisposed; + /** Properties for this checkpoint. */ + private final CheckpointProperties props; + + /** External path if persisted checkpoint; <code>null</code> otherwise. */ + private final String externalPath; // ------------------------------------------------------------------------ @@ -63,7 +71,8 @@ public class CompletedCheckpoint implements StateObject { long timestamp, long completionTimestamp, Map<JobVertexID, TaskState> taskStates, - boolean deleteStateWhenDisposed) { + CheckpointProperties props, + String externalPath) { checkArgument(checkpointID >= 0); checkArgument(timestamp >= 0); @@ -74,7 +83,13 @@ public class CompletedCheckpoint implements StateObject { this.timestamp = timestamp; this.duration = completionTimestamp - timestamp; this.taskStates = checkNotNull(taskStates); - this.deleteStateWhenDisposed = deleteStateWhenDisposed; + this.props = checkNotNull(props); + this.externalPath = externalPath; + + if (props.externalizeCheckpoint() && externalPath == null) { + throw new NullPointerException("Checkpoint properties say that the checkpoint " + + "should have been persisted, but missing external path."); + } } // ------------------------------------------------------------------------ @@ -95,19 +110,50 @@ public class CompletedCheckpoint implements StateObject { return duration; } - @Override - public void discardState() throws Exception { - if (deleteStateWhenDisposed) { + public CheckpointProperties getProperties() { + return props; + } - try { - StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); - } finally { - taskStates.clear(); + public boolean subsume() throws Exception { + if (props.discardOnSubsumed()) { + discard(); + return true; + } + + return false; + } + + public boolean discard(JobStatus jobStatus) throws Exception { + if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() || + jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() || + jobStatus == JobStatus.FAILED && props.discardOnJobFailed() || + jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) { + + discard(); + return true; + } else { + if (externalPath != null) { + LOG.info("Persistent checkpoint with ID {} at '{}' not discarded.", + checkpointID, + externalPath); } + + return false; + } + } + + private void discard() throws Exception { + try { + if (externalPath != null) { + SavepointStore.removeSavepoint(externalPath); + } + + StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); + } finally { + taskStates.clear(); } } - @Override public long getStateSize() throws IOException { long result = 0L; @@ -126,6 +172,10 @@ public class CompletedCheckpoint implements StateObject { return taskStates.get(jobVertexID); } + public String getExternalPath() { + return externalPath; + } + // -------------------------------------------------------------------------------------------- @Override @@ -153,4 +203,5 @@ public class CompletedCheckpoint implements StateObject { public String toString() { return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job); } + } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index c52fc25..d2c0f6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.jobgraph.JobStatus; + import java.util.List; /** @@ -38,7 +40,7 @@ public interface CompletedCheckpointStore { * * <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of * retained checkpoints, the oldest one will be discarded via {@link - * CompletedCheckpoint#discard(ClassLoader)}. + * CompletedCheckpoint#discard()}. */ void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception; @@ -49,17 +51,14 @@ public interface CompletedCheckpointStore { CompletedCheckpoint getLatestCheckpoint() throws Exception; /** - * Shuts down the store and discards all checkpoint instances. - */ - void shutdown() throws Exception; - - /** - * Suspends the store. + * Shuts down the store. + * + * <p>The job status is forwarded and used to decide whether state should + * actually be discarded or kept. * - * <p>If the implementation allows recovery, checkpoint state needs to be - * kept around. Otherwise, this should act like shutdown. + * @param jobStatus Job state on shut down */ - void suspend() throws Exception; + void shutdown(JobStatus jobStatus) throws Exception; /** * Returns all {@link CompletedCheckpoint} instances. http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 2ca9d69..983f1d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -19,6 +19,11 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -28,6 +33,8 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; @@ -46,6 +53,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class PendingCheckpoint { + private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class); + private final Object lock = new Object(); private final JobID jobId; @@ -58,7 +67,17 @@ public class PendingCheckpoint { private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks; - private final boolean disposeWhenSubsumed; + /** + * The checkpoint properties. If the checkpoint should be persisted + * externally, it happens in {@link #finalizeCheckpoint()}. + */ + private final CheckpointProperties props; + + /** Target directory to potentially persist checkpoint to; <code>null</code> if none configured. */ + private final String targetDirectory; + + /** The promise to fulfill once the checkpoint has been completed. */ + private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise = new FlinkCompletableFuture<>(); private int numAcknowledgedTasks; @@ -70,23 +89,21 @@ public class PendingCheckpoint { JobID jobId, long checkpointId, long checkpointTimestamp, - Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm) { - this(jobId, checkpointId, checkpointTimestamp, verticesToConfirm, true); - } - - PendingCheckpoint( - JobID jobId, - long checkpointId, - long checkpointTimestamp, Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm, - boolean disposeWhenSubsumed) - { + CheckpointProperties props, + String targetDirectory) { this.jobId = checkNotNull(jobId); this.checkpointId = checkpointId; this.checkpointTimestamp = checkpointTimestamp; this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm); - this.disposeWhenSubsumed = disposeWhenSubsumed; this.taskStates = new HashMap<>(); + this.props = checkNotNull(props); + this.targetDirectory = targetDirectory; + + // Sanity check + if (props.externalizeCheckpoint() && targetDirectory == null) { + throw new NullPointerException("No target directory specified to persist checkpoint to."); + } checkArgument(verticesToConfirm.size() > 0, "Checkpoint needs at least one vertex that commits the checkpoint"); @@ -137,33 +154,71 @@ public class PendingCheckpoint { * @return True if the checkpoint can be subsumed, false otherwise. */ public boolean canBeSubsumed() { - return true; + // If the checkpoint is forced, it cannot be subsumed. + return !props.forceCheckpoint(); + } + + CheckpointProperties getProps() { + return props; + } + + String getTargetDirectory() { + return targetDirectory; } // ------------------------------------------------------------------------ // Progress and Completion // ------------------------------------------------------------------------ + /** + * Returns the completion future. + * + * @return A future to the completed checkpoint + */ + public Future<CompletedCheckpoint> getCompletionFuture() { + return onCompletionPromise; + } + public CompletedCheckpoint finalizeCheckpoint() throws Exception { synchronized (lock) { - if (discarded) { - throw new IllegalStateException("pending checkpoint is discarded"); - } - if (notYetAcknowledgedTasks.isEmpty()) { - CompletedCheckpoint completed = new CompletedCheckpoint( - jobId, - checkpointId, - checkpointTimestamp, - System.currentTimeMillis(), - new HashMap<>(taskStates), - disposeWhenSubsumed); - - dispose(false); - - return completed; - } - else { - throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged"); + try { + if (discarded) { + throw new IllegalStateException("pending checkpoint is discarded"); + } + if (notYetAcknowledgedTasks.isEmpty()) { + // Persist if required + String externalPath = null; + if (props.externalizeCheckpoint()) { + try { + Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); + externalPath = SavepointStore.storeSavepoint( + targetDirectory, + savepoint); + } catch (Throwable t) { + LOG.error("Failed to persist checkpoints " + checkpointId + ".", t); + } + } + + CompletedCheckpoint completed = new CompletedCheckpoint( + jobId, + checkpointId, + checkpointTimestamp, + System.currentTimeMillis(), + new HashMap<>(taskStates), + props, + externalPath); + + onCompletionPromise.complete(completed); + + dispose(false); + + return completed; + } else { + throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged"); + } + } catch (Throwable t) { + onCompletionPromise.completeExceptionally(t); + throw t; } } } @@ -180,7 +235,6 @@ public class PendingCheckpoint { ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID); if (vertex != null) { - if (checkpointStateHandles != null) { List<KeyGroupsStateHandle> keyGroupsState = checkpointStateHandles.getKeyGroupsStateHandle(); ChainedStateHandle<StreamStateHandle> nonPartitionedState = @@ -256,18 +310,36 @@ public class PendingCheckpoint { * Aborts a checkpoint because it expired (took too long). */ public void abortExpired() throws Exception { - dispose(true); + try { + onCompletionPromise.completeExceptionally(new Exception("Checkpoint expired before completing")); + } finally { + dispose(true); + } } /** * Aborts the pending checkpoint because a newer completed checkpoint subsumed it. */ public void abortSubsumed() throws Exception { - dispose(true); + try { + if (props.forceCheckpoint()) { + onCompletionPromise.completeExceptionally(new Exception("Bug: forced checkpoints must never be subsumed")); + + throw new IllegalStateException("Bug: forced checkpoints must never be subsumed"); + } else { + onCompletionPromise.completeExceptionally(new Exception("Checkpoints has been subsumed")); + } + } finally { + dispose(true); + } } public void abortDeclined() throws Exception { - dispose(true); + try { + onCompletionPromise.completeExceptionally(new Exception("Checkpoint was declined (tasks not ready)")); + } finally { + dispose(true); + } } /** @@ -275,10 +347,14 @@ public class PendingCheckpoint { * @param cause The error's exception. */ public void abortError(Throwable cause) throws Exception { - dispose(true); + try { + onCompletionPromise.completeExceptionally(new Exception("Checkpoint failed: " + cause.getMessage(), cause)); + } finally { + dispose(true); + } } - protected void dispose(boolean releaseState) throws Exception { + private void dispose(boolean releaseState) throws Exception { synchronized (lock) { try { discarded = true; http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java deleted file mode 100644 index 0bb6a91..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.savepoint.Savepoint; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.util.ExceptionUtils; -import org.slf4j.Logger; -import scala.concurrent.Future; -import scala.concurrent.Promise; - -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A pending savepoint is like a pending checkpoint, but it additionally performs some - * actions upon completion, like notifying the triggerer. - */ -public class PendingSavepoint extends PendingCheckpoint { - - private static final Logger LOG = CheckpointCoordinator.LOG; - - private final SavepointStore store; - - /** The promise to fulfill once the savepoint is complete */ - private final Promise<String> onCompletionPromise; - - // -------------------------------------------------------------------------------------------- - - public PendingSavepoint( - JobID jobId, - long checkpointId, - long checkpointTimestamp, - Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm, - SavepointStore store) - { - super(jobId, checkpointId, checkpointTimestamp, verticesToConfirm, false); - - this.store = checkNotNull(store); - this.onCompletionPromise = new scala.concurrent.impl.Promise.DefaultPromise<>(); - } - - // -------------------------------------------------------------------------------------------- - // Savepoint completion - // -------------------------------------------------------------------------------------------- - - public Future<String> getCompletionFuture() { - return onCompletionPromise.future(); - } - - @Override - public CompletedCheckpoint finalizeCheckpoint() throws Exception { - // finalize checkpoint (this also disposes this pending checkpoint) - CompletedCheckpoint completedCheckpoint = super.finalizeCheckpoint(); - - // now store the checkpoint externally as a savepoint - try { - Savepoint savepoint = new SavepointV1( - completedCheckpoint.getCheckpointID(), - completedCheckpoint.getTaskStates().values()); - - String path = store.storeSavepoint(savepoint); - onCompletionPromise.success(path); - } - catch (Throwable t) { - LOG.warn("Failed to store savepoint.", t); - onCompletionPromise.failure(t); - - ExceptionUtils.rethrow(t, "Failed to store savepoint."); - } - - return completedCheckpoint; - } - - // ------------------------------------------------------------------------ - // Cancellation / Disposal - // ------------------------------------------------------------------------ - - @Override - public boolean canBeSubsumed() { - return false; - } - - @Override - public void abortSubsumed() throws Exception { - try { - Exception e = new Exception("Bug: Savepoints must never be subsumed"); - onCompletionPromise.failure(e); - throw e; - } - finally { - dispose(true); - } - } - - @Override - public void abortExpired() throws Exception { - try { - LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " expired before completing."); - onCompletionPromise.failure(new Exception("Savepoint expired before completing")); - } - finally { - dispose(true); - } - } - - @Override - public void abortDeclined() throws Exception { - try { - LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " was declined (tasks not ready)."); - onCompletionPromise.failure(new Exception("Savepoint was declined (tasks not ready)")); - } - finally { - dispose(true); - } - } - - @Override - public void abortError(Throwable cause) throws Exception { - try { - LOG.info("Savepoint with checkpoint ID " + getCheckpointId() + " failed due to an error", cause); - onCompletionPromise.failure( - new Exception("Savepoint could not be completed: " + cause.getMessage(), cause)); - } - finally { - dispose(true); - } - } - - // -------------------------------------------------------------------------------------------- - - @Override - public String toString() { - return String.format("Pending Savepoint %d @ %d - confirmed=%d, pending=%d", - getCheckpointId(), getCheckpointTimestamp(), - getNumberOfAcknowledgedTasks(), getNumberOfNonAcknowledgedTasks()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java index 84cbe19..e4ed996 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import java.util.concurrent.atomic.AtomicLong; @@ -35,10 +36,7 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter { public void start() throws Exception {} @Override - public void shutdown() throws Exception {} - - @Override - public void suspend() throws Exception {} + public void shutdown(JobStatus jobStatus) throws Exception {} @Override public long getAndIncrement() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java index aecb51e..a9624fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java @@ -41,7 +41,7 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa throws Exception { return new StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory - .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader); + .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 5e03988..082bca9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.state.StateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayDeque; import java.util.ArrayList; @@ -32,6 +34,8 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore { + private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class); + /** The maximum number of checkpoints to retain (at least 1). */ private final int maxNumberOfCheckpointsToRetain; @@ -44,16 +48,10 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at * least 1). Adding more checkpoints than this results * in older checkpoints being discarded. - * @param userClassLoader The user class loader used to discard checkpoints */ - public StandaloneCompletedCheckpointStore( - int maxNumberOfCheckpointsToRetain, - ClassLoader userClassLoader) { - + public StandaloneCompletedCheckpointStore(int maxNumberOfCheckpointsToRetain) { checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); - this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; - this.checkpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); } @@ -64,9 +62,9 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoints.addLast(checkpoint); + checkpoints.add(checkpoint); if (checkpoints.size() > maxNumberOfCheckpointsToRetain) { - checkpoints.removeFirst().discardState(); + checkpoints.remove().subsume(); } } @@ -86,17 +84,16 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt } @Override - public void shutdown() throws Exception { + public void shutdown(JobStatus jobStatus) throws Exception { try { - StateUtil.bestEffortDiscardAllStateObjects(checkpoints); + LOG.info("Shutting down"); + + for (CompletedCheckpoint checkpoint : checkpoints) { + checkpoint.discard(jobStatus); + } } finally { checkpoints.clear(); } } - @Override - public void suspend() throws Exception { - // Do a regular shutdown, because we can't recovery anything - shutdown(); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java index 0bceb8b..80e79b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java @@ -23,6 +23,7 @@ import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,30 +92,17 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter { } @Override - public void shutdown() throws Exception { + public void shutdown(JobStatus jobStatus) throws Exception { synchronized (startStopLock) { if (isStarted) { LOG.info("Shutting down."); sharedCount.close(); client.getConnectionStateListenable().removeListener(connStateListener); - LOG.info("Removing {} from ZooKeeper", counterPath); - client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath); - - isStarted = false; - } - } - } - - @Override - public void suspend() throws Exception { - synchronized (startStopLock) { - if (isStarted) { - LOG.info("Suspending."); - sharedCount.close(); - client.getConnectionStateListenable().removeListener(connStateListener); - - // Don't remove any state + if (jobStatus.isGloballyTerminalState()) { + LOG.info("Removing {} from ZooKeeper", counterPath); + client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath); + } isStarted = false; } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java index 55a0bbb..f47012d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java @@ -55,7 +55,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac throws Exception { return ZooKeeperUtils.createCompletedCheckpoints(client, config, jobId, - NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, userClassLoader); + NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index b826d9f..4f67921 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -24,9 +24,10 @@ import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.utils.ZKPaths; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; -import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.List; +import java.util.concurrent.Callable; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -75,9 +77,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto /** The maximum number of checkpoints to retain (at least 1). */ private final int maxNumberOfCheckpointsToRetain; - /** User class loader for discarding {@link CompletedCheckpoint} instances. */ - private final ClassLoader userClassLoader; - /** Local completed checkpoints. */ private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles; @@ -88,7 +87,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto * least 1). Adding more checkpoints than this results * in older checkpoints being discarded. On recovery, * we will only start with a single checkpoint. - * @param userClassLoader The user class loader used to discard checkpoints * @param client The Curator ZooKeeper client * @param checkpointsPath The ZooKeeper path for the checkpoints (needs to * start with a '/') @@ -98,7 +96,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto */ public ZooKeeperCompletedCheckpointStore( int maxNumberOfCheckpointsToRetain, - ClassLoader userClassLoader, CuratorFramework client, String checkpointsPath, RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception { @@ -107,7 +104,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto checkNotNull(stateStorage, "State storage"); this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; - this.userClassLoader = checkNotNull(userClassLoader, "User class loader"); checkNotNull(client, "Curator client"); checkNotNull(checkpointsPath, "Checkpoints path"); @@ -172,7 +168,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) { try { - removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i)); + removeSubsumed(initialCheckpoints.get(i)); } catch (Exception e) { LOG.error("Failed to discard checkpoint", e); @@ -200,7 +196,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto // Everything worked, let's remove a previous checkpoint if necessary. if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { - removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst()); + removeSubsumed(checkpointStateHandles.removeFirst()); } LOG.debug("Added {} to {}.", checkpoint, path); @@ -233,68 +229,90 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto } @Override - public void shutdown() throws Exception { - LOG.info("Shutting down"); + public void shutdown(JobStatus jobStatus) throws Exception { + if (jobStatus.isGloballyTerminalState()) { + LOG.info("Shutting down"); - for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) { - try { - removeFromZooKeeperAndDiscardCheckpoint(checkpoint); - } - catch (Exception e) { - LOG.error("Failed to discard checkpoint.", e); + for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) { + try { + removeShutdown(checkpoint, jobStatus); + } catch (Exception e) { + LOG.error("Failed to discard checkpoint.", e); + } } + + checkpointStateHandles.clear(); + + String path = "/" + client.getNamespace(); + + LOG.info("Removing {} from ZooKeeper", path); + ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true); + } else { + LOG.info("Suspending"); + + // Clear the local handles, but don't remove any state + checkpointStateHandles.clear(); } + } - checkpointStateHandles.clear(); + // ------------------------------------------------------------------------ - String path = "/" + client.getNamespace(); + private void removeSubsumed(final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception { + Callable<Void> action = new Callable<Void>() { + @Override + public Void call() throws Exception { + stateHandleAndPath.f0.retrieveState().subsume(); + return null; + } + }; - LOG.info("Removing {} from ZooKeeper", path); - ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true); + remove(stateHandleAndPath, action); } - @Override - public void suspend() throws Exception { - LOG.info("Suspending"); + private void removeShutdown( + final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath, + final JobStatus jobStatus) throws Exception { - // Clear the local handles, but don't remove any state - checkpointStateHandles.clear(); + Callable<Void> action = new Callable<Void>() { + @Override + public Void call() throws Exception { + CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState(); + checkpoint.discard(jobStatus); + return null; + } + }; + + remove(stateHandleAndPath, action); } /** * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle. */ - private void removeFromZooKeeperAndDiscardCheckpoint( - final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception { + private void remove( + final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath, + final Callable<Void> action) throws Exception { - final BackgroundCallback callback = new BackgroundCallback() { + BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { try { if (event.getType() == CuratorEventType.DELETE) { if (event.getResultCode() == 0) { - // The checkpoint try { - CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState(); - checkpoint.discardState(); - // Discard the checkpoint - LOG.debug("Discarded " + checkpoint); + action.call(); } finally { // Discard the state handle stateHandleAndPath.f0.discardState(); } - } - else { + } else { throw new IllegalStateException("Unexpected result code " + event.getResultCode() + " in '" + event + "' callback."); } - } - else { + } else { throw new IllegalStateException("Unexpected event type " + event.getType() + " in '" + event + "' callback."); } - } - catch (Exception e) { + } catch (Exception e) { LOG.error("Failed to discard checkpoint.", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java deleted file mode 100644 index 49f51be..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.util.FileUtils; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A file system based {@link SavepointStore}. - * - * <p>Stored savepoints have the following format: - * <pre> - * MagicNumber SavepointVersion Savepoint - * - MagicNumber => int - * - SavepointVersion => int (returned by Savepoint#getVersion()) - * - Savepoint => bytes (serialized via version-specific SavepointSerializer) - * </pre> - */ -public class FsSavepointStore implements SavepointStore { - - private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class); - - /** Magic number for sanity checks against stored savepoints. */ - int MAGIC_NUMBER = 0x4960672d; - - /** Root path for savepoints. */ - private final Path rootPath; - - /** Prefix for savepoint files. */ - private final String prefix; - - /** File system to use for file access. */ - private final FileSystem fileSystem; - - /** - * Creates a new file system based {@link SavepointStore}. - * - * @param rootPath Root path for savepoints - * @param prefix Prefix for savepoint files - * @throws IOException On failure to access root path - */ - FsSavepointStore(String rootPath, String prefix) throws IOException { - this.rootPath = new Path(checkNotNull(rootPath, "Root path")); - this.prefix = checkNotNull(prefix, "Prefix"); - - this.fileSystem = FileSystem.get(this.rootPath.toUri()); - } - - @Override - public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException { - Preconditions.checkNotNull(savepoint, "Savepoint"); - - Exception latestException = null; - Path path = null; - FSDataOutputStream fdos = null; - - // Try to create a FS output stream - for (int attempt = 0; attempt < 10; attempt++) { - path = new Path(rootPath, FileUtils.getRandomFilename(prefix)); - try { - fdos = fileSystem.create(path, false); - break; - } catch (Exception e) { - latestException = e; - } - } - - if (fdos == null) { - throw new IOException("Failed to create file output stream at " + path, latestException); - } - - boolean success = false; - try (DataOutputStream dos = new DataOutputStream(fdos)) { - // Write header - dos.writeInt(MAGIC_NUMBER); - dos.writeInt(savepoint.getVersion()); - - // Write savepoint - SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint); - serializer.serialize(savepoint, dos); - success = true; - } finally { - if (!success && fileSystem.exists(path)) { - if (!fileSystem.delete(path, true)) { - LOG.warn("Failed to delete file " + path + " after failed write."); - } - } - } - - return path.toString(); - } - - @Override - public Savepoint loadSavepoint(String path) throws IOException { - Preconditions.checkNotNull(path, "Path"); - - try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) { - int magicNumber = dis.readInt(); - - if (magicNumber == MAGIC_NUMBER) { - int version = dis.readInt(); - - SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version); - return serializer.deserialize(dis); - } else { - throw new RuntimeException("Unexpected magic number. This is most likely " + - "caused by trying to load a Flink 1.0 savepoint. You cannot load a " + - "savepoint triggered by Flink 1.0 with this version of Flink. If it is " + - "_not_ a Flink 1.0 savepoint, this error indicates that the specified " + - "file is not a proper savepoint or the file has been corrupted."); - } - } - } - - @Override - public void disposeSavepoint(String path) throws Exception { - Preconditions.checkNotNull(path, "Path"); - - try { - Savepoint savepoint = loadSavepoint(path); - LOG.info("Disposing savepoint: " + path); - savepoint.dispose(); - - Path filePath = new Path(path); - - if (fileSystem.exists(filePath)) { - if (!fileSystem.delete(filePath, true)) { - throw new IOException("Failed to delete " + filePath + "."); - } - } else { - throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'."); - } - } catch (Throwable t) { - throw new IOException("Failed to dispose savepoint " + path + ".", t); - } - } - - @Override - public void shutdown() throws Exception { - // Nothing to do, because the savepoint life-cycle is independent of - // the cluster life-cycle. - } - - private FSDataInputStream createFsInputStream(Path path) throws IOException { - if (fileSystem.exists(path)) { - return fileSystem.open(path); - } else { - throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'."); - } - } - - /** - * Returns the savepoint root path. - * - * @return Savepoint root path - */ - Path getRootPath() { - return rootPath; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java deleted file mode 100644 index 2cf8f31..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Heap-backed savepoint store. - * - * <p>The life-cycle of savepoints is bound to the life-cycle of the cluster. - */ -public class HeapSavepointStore implements SavepointStore { - - private static final Logger LOG = LoggerFactory.getLogger(HeapSavepointStore.class); - - private final Object shutDownLock = new Object(); - - /** Stored savepoints. */ - private final Map<String, Savepoint> savepoints = new HashMap<>(1); - - /** ID counter to identify savepoints. */ - private final AtomicInteger currentId = new AtomicInteger(); - - /** Flag indicating whether state store has been shut down. */ - private boolean shutDown; - - /** Shut down hook. */ - private final Thread shutdownHook; - - /** - * Creates a heap-backed savepoint store. - * - * <p>Savepoint are discarded on {@link #shutdown()}. - */ - public HeapSavepointStore() { - this.shutdownHook = new Thread(new Runnable() { - @Override - public void run() { - try { - shutdown(); - } catch (Throwable t) { - LOG.warn("Failure during shut down hook.", t); - } - } - }); - - try { - Runtime.getRuntime().addShutdownHook(shutdownHook); - } catch (IllegalStateException ignored) { - // JVM is already shutting down - } catch (Throwable t) { - LOG.warn("Failed to register shutdown hook."); - } - } - - @Override - public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException { - Preconditions.checkNotNull(savepoint, "Savepoint"); - - synchronized (shutDownLock) { - if (shutDown) { - throw new IllegalStateException("Shut down"); - } else { - String path = "jobmanager://savepoints/" + currentId.incrementAndGet(); - savepoints.put(path, savepoint); - return path; - } - } - } - - @Override - public Savepoint loadSavepoint(String path) throws IOException { - Preconditions.checkNotNull(path, "Path"); - - Savepoint savepoint; - synchronized (shutDownLock) { - savepoint = savepoints.get(path); - } - - if (savepoint != null) { - return savepoint; - } else { - throw new IllegalArgumentException("Invalid path '" + path + "'."); - } - } - - @Override - public void disposeSavepoint(String path) throws Exception { - Preconditions.checkNotNull(path, "Path"); - - Savepoint savepoint; - synchronized (shutDownLock) { - savepoint = savepoints.remove(path); - } - - if (savepoint != null) { - savepoint.dispose(); - } else { - throw new IllegalArgumentException("Invalid path '" + path + "'."); - } - } - - @Override - public void shutdown() throws Exception { - synchronized (shutDownLock) { - // This is problematic as the user code class loader is not - // available at this point. - for (Savepoint savepoint : savepoints.values()) { - try { - savepoint.dispose(); - } catch (Throwable t) { - LOG.warn("Failed to dispose savepoint " + savepoint.getCheckpointId(), t); - } - } - - savepoints.clear(); - - // Remove shutdown hook to prevent resource leaks, unless this is - // invoked by the shutdown hook itself. - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } catch (IllegalStateException ignored) { - // Race, JVM is in shutdown already, we can safely ignore this - } catch (Throwable t) { - LOG.warn("Failed to unregister shut down hook."); - } - } - - shutDown = true; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java index 47917b4..845008d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java @@ -19,16 +19,18 @@ package org.apache.flink.runtime.checkpoint.savepoint; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskState; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import java.io.IOException; import java.util.HashMap; import java.util.Map; /** - * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it. + * The SavepointLoader is a utility to load and verify a Savepoint, and to create a checkpoint from it. */ public class SavepointLoader { @@ -39,7 +41,6 @@ public class SavepointLoader { * * @param jobId The JobID of the job to load the savepoint for. * @param tasks Tasks that will possibly be reset - * @param savepointStore The store that holds the savepoint. * @param savepointPath The path of the savepoint to rollback to * * @throws IllegalStateException If mismatch between program and savepoint state @@ -48,13 +49,12 @@ public class SavepointLoader { public static CompletedCheckpoint loadAndValidateSavepoint( JobID jobId, Map<JobVertexID, ExecutionJobVertex> tasks, - SavepointStore savepointStore, - String savepointPath) throws Exception { + String savepointPath) throws IOException { // (1) load the savepoint - Savepoint savepoint = savepointStore.loadSavepoint(savepointPath); + Savepoint savepoint = SavepointStore.loadSavepoint(savepointPath); final Map<JobVertexID, TaskState> taskStates = new HashMap<>(savepoint.getTaskStates().size()); - + // (2) validate it (parallelism, etc) for (TaskState taskState : savepoint.getTaskStates()) { ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID()); @@ -66,12 +66,12 @@ public class SavepointLoader { else { String msg = String.format("Failed to rollback to savepoint %s. " + "Max parallelism mismatch between savepoint state and new program. " + - "Cannot map operator %s with parallelism %d to new program with " + + "Cannot map operator %s with max parallelism %d to new program with " + "parallelism %d. This indicates that the program has been changed " + "in a non-compatible way after the savepoint.", savepoint, taskState.getJobVertexID(), - taskState.getParallelism(), + taskState.getMaxParallelism(), executionJobVertex.getParallelism()); throw new IllegalStateException(msg); @@ -87,8 +87,9 @@ public class SavepointLoader { } // (3) convert to checkpoint so the system can fall back to it - return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, false); - } + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + return new CompletedCheckpoint(jobId, savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath); + } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java index 211209c..6a55b33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java @@ -25,7 +25,7 @@ import java.io.IOException; /** * Serializer for {@link Savepoint} instances. * - * <p>This serializer is used to read/write a savepoint via {@link FsSavepointStore}. + * <p>This serializer is used to read/write a savepoint via {@link SavepointStore}. * * <p>Version-specific serializers are accessed via the {@link SavepointSerializers} helper. * http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java index 68b88d2..4b65418 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java @@ -18,23 +18,105 @@ package org.apache.flink.runtime.checkpoint.savepoint; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * Savepoint store used to persist {@link Savepoint} instances. + * A file system based savepoint store. * - * <p>The main implementation is the {@link FsSavepointStore}. We also have the - * {@link HeapSavepointStore} for historical reasons (introduced in Flink 1.0). + * <p>Stored savepoints have the following format: + * <pre> + * MagicNumber SavepointVersion Savepoint + * - MagicNumber => int + * - SavepointVersion => int (returned by Savepoint#getVersion()) + * - Savepoint => bytes (serialized via version-specific SavepointSerializer) + * </pre> */ -public interface SavepointStore { +public class SavepointStore { + + private static final Logger LOG = LoggerFactory.getLogger(SavepointStore.class); + + /** Magic number for sanity checks against stored savepoints. */ + private static final int MAGIC_NUMBER = 0x4960672d; + + /** Prefix for savepoint files. */ + private static final String prefix = "savepoint-"; /** * Stores the savepoint. * + * @param targetDirectory Target directory to store savepoint in * @param savepoint Savepoint to be stored * @param <T> Savepoint type * @return Path of stored savepoint * @throws Exception Failures during store are forwarded */ - <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception; + public static <T extends Savepoint> String storeSavepoint( + String targetDirectory, + T savepoint) throws IOException { + + checkNotNull(targetDirectory, "Target directory"); + checkNotNull(savepoint, "Savepoint"); + + Exception latestException = null; + Path path = null; + FSDataOutputStream fdos = null; + + FileSystem fs = null; + + // Try to create a FS output stream + for (int attempt = 0; attempt < 10; attempt++) { + path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix)); + + if (fs == null) { + fs = FileSystem.get(path.toUri()); + } + + try { + fdos = fs.create(path, false); + break; + } catch (Exception e) { + latestException = e; + } + } + + if (fdos == null) { + throw new IOException("Failed to create file output stream at " + path, latestException); + } + + boolean success = false; + try (DataOutputStream dos = new DataOutputStream(fdos)) { + // Write header + dos.writeInt(MAGIC_NUMBER); + dos.writeInt(savepoint.getVersion()); + + // Write savepoint + SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint); + serializer.serialize(savepoint, dos); + success = true; + } finally { + if (!success && fs.exists(path)) { + if (!fs.delete(path, true)) { + LOG.warn("Failed to delete file {} after failed write.", path); + } + } + } + + return path.toString(); + } /** * Loads the savepoint at the specified path. @@ -43,24 +125,62 @@ public interface SavepointStore { * @return The loaded savepoint * @throws Exception Failures during load are forwared */ - Savepoint loadSavepoint(String path) throws Exception; + public static Savepoint loadSavepoint(String path) throws IOException { + Preconditions.checkNotNull(path, "Path"); - /** - * Disposes the savepoint at the specified path. - * - * @param path Path of savepoint to dispose - * @throws Exception Failures during diposal are forwarded - */ - void disposeSavepoint(String path) throws Exception; + try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) { + int magicNumber = dis.readInt(); + + if (magicNumber == MAGIC_NUMBER) { + int version = dis.readInt(); + + SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version); + return serializer.deserialize(dis); + } else { + throw new RuntimeException("Unexpected magic number. This is most likely " + + "caused by trying to load a Flink 1.0 savepoint. You cannot load a " + + "savepoint triggered by Flink 1.0 with this version of Flink. If it is " + + "_not_ a Flink 1.0 savepoint, this error indicates that the specified " + + "file is not a proper savepoint or the file has been corrupted."); + } + } + } /** - * Shut downs the savepoint store. + * Removes the savepoint meta data w/o loading and disposing it. * - * <p>Only necessary for implementations where the savepoint life-cycle is - * bound to the cluster life-cycle. - * - * @throws Exception Failures during shut down are forwarded + * @param path Path of savepoint to remove + * @throws Exception Failures during disposal are forwarded */ - void shutdown() throws Exception; + public static void removeSavepoint(String path) throws IOException { + Preconditions.checkNotNull(path, "Path"); + + try { + LOG.info("Removing savepoint: {}.", path); + + Path filePath = new Path(path); + FileSystem fs = FileSystem.get(filePath.toUri()); + + if (fs.exists(filePath)) { + if (!fs.delete(filePath, true)) { + throw new IOException("Failed to delete " + filePath + "."); + } + } else { + throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'."); + } + } catch (Throwable t) { + throw new IOException("Failed to dispose savepoint " + path + ".", t); + } + } + + private static FSDataInputStream createFsInputStream(Path path) throws IOException { + FileSystem fs = FileSystem.get(path.toUri()); + + if (fs.exists(path)) { + return fs.open(path); + } else { + throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'."); + } + } }