GodfreyJohnson closed pull request #6630: [FLINK-9043] restore from the latest job's completed checkpoint with … URL: https://github.com/apache/flink/pull/6630
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e936b246222..165377e269e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.SharedStateRegistryFactory; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; +import org.apache.flink.runtime.util.HDFSUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -180,6 +181,9 @@ /** Registry that tracks state which is shared across (incremental) checkpoints */ private SharedStateRegistry sharedStateRegistry; + /** StateBackend to get the external checkpoint base dir */ + private StateBackend stateBackend; + // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -228,6 +232,7 @@ public CheckpointCoordinator( this.checkpointIdCounter = checkNotNull(checkpointIDCounter); this.completedCheckpointStore = checkNotNull(completedCheckpointStore); this.executor = checkNotNull(executor); + this.stateBackend = checkpointStateBackend; this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory); this.sharedStateRegistry = sharedStateRegistryFactory.create(executor); @@ -1076,6 +1081,157 @@ public boolean restoreLatestCheckpointedState( } } + /** + * Restores the latest checkpointed state. + * + * @param tasks Map of job vertices to restore. State for these vertices is + * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}. + * @param errorIfNoCheckpoint Fail if no completed checkpoint is available to + * restore from. + * @param allowNonRestoredState Allow checkpoint state that cannot be mapped + * to any job vertex in tasks. + * @param userCodeLoader classloader + * @return <code>true</code> if state was restored, <code>false</code> otherwise. + * @throws IllegalStateException If the CheckpointCoordinator is shut down. + * @throws IllegalStateException If no completed checkpoint is available and + * the <code>failIfNoCheckpoint</code> flag has been set. + * @throws IllegalStateException If the checkpoint contains state that cannot be + * mapped to any job vertex in <code>tasks</code> and the + * <code>allowNonRestoredState</code> flag has not been set. + * @throws IllegalStateException If the max parallelism changed for an operator + * that restores state from this checkpoint. + * @throws IllegalStateException If the parallelism changed for an operator + * that restores <i>non-partitioned</i> state from this + * checkpoint. + */ + public boolean restoreLatestCheckpointedState( + Map<JobVertexID, ExecutionJobVertex> tasks, + boolean errorIfNoCheckpoint, + boolean allowNonRestoredState, + ClassLoader userCodeLoader) throws Exception { + + synchronized (lock) { + if (shutdown) { + throw new IllegalStateException("CheckpointCoordinator is shut down"); + } + + // We create a new shared state registry object, so that all pending async disposal requests from previous + // runs will go against the old object (were they can do no harm). + // This must happen under the checkpoint lock. + sharedStateRegistry.close(); + sharedStateRegistry = sharedStateRegistryFactory.create(executor); + + // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery + completedCheckpointStore.recover(); + + // Now, we re-register all (shared) states from the checkpoint store with the new registry + for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) { + completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); + } + + LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry); + LOG.info("Current job completed checkpoint size {}", completedCheckpointStore.getAllCheckpoints().size()); + + // Restore from the latest checkpoint + CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); + + if (latest == null) { + LOG.info("Current job latest completed checkpoint is null"); + if (errorIfNoCheckpoint) { + throw new IllegalStateException("No completed checkpoint available"); + } else { + String metadataPath = getOldJobCheckpointMetadataPath(); + + if (metadataPath != null && !metadataPath.isEmpty()) { + // restore from checkpoing with hdfs path + LOG.info("The old job already has completed checkpoint," + + " new job will recovery from completed checkpoint"); + restoreSavepoint(metadataPath, allowNonRestoredState, tasks, userCodeLoader); + return true; + } else { + LOG.debug("Resetting the master hooks."); + MasterHooks.reset(masterHooks.values(), LOG); + + return false; + } + + } + } + + LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest); + + // re-assign the task states + final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates(); + + StateAssignmentOperation stateAssignmentOperation = + new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState); + + stateAssignmentOperation.assignStates(); + + // call master hooks for restore + + MasterHooks.restoreMasterHooks( + masterHooks, + latest.getMasterHookStates(), + latest.getCheckpointID(), + allowNonRestoredState, + LOG); + + // update metrics + + if (statsTracker != null) { + long restoreTimestamp = System.currentTimeMillis(); + RestoredCheckpointStats restored = new RestoredCheckpointStats( + latest.getCheckpointID(), + latest.getProperties(), + restoreTimestamp, + latest.getExternalPointer()); + + statsTracker.reportRestoredCheckpoint(restored); + } + + return true; + } + } + + public String getOldJobCheckpointMetadataPath() throws Exception { + String stateBackendString = stateBackend.toString(); + LOG.info("stateBackendString: {}", stateBackendString); + + String externalCheckpointBaseDir = null; + + if (stateBackendString != null && !stateBackendString.isEmpty() && + (stateBackendString.contains(HDFSUtils.HDFS_PREFIX) || + stateBackendString.contains(HDFSUtils.VIEWFS_PREFIX))) { + // just for hdfs + String[] tmp1 = stateBackendString.split(","); + if(tmp1 != null && tmp1.length >= 1) { + String[] tmp2 = tmp1[0].split(":"); + if (tmp2 != null){ + if (tmp2.length == 2) { + externalCheckpointBaseDir = tmp2[1].trim().replaceAll("'", ""); + } else if (tmp2.length == 3){ + externalCheckpointBaseDir = tmp2[1].trim().replaceAll("'", "")+":"+tmp2[2].trim().replaceAll("'", ""); + } + } + } + } else { + return null; + } + + LOG.info("externalCheckpointBaseDir: {}", externalCheckpointBaseDir); + + String metadataPath = HDFSUtils.getFullPathForLatestJobCompletedCheckpointMeta(externalCheckpointBaseDir); + + if(metadataPath != null && metadataPath.isEmpty()) { + LOG.info("checkpoint already exist with metadata path: {}", metadataPath); + } else { + LOG.info("not found the completed checkpoint from the old job"); + } + + return metadataPath; + } + /** * Restore the state with given savepoint. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 01cb2b6b099..edbecda04e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1138,7 +1138,8 @@ private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup c if (!checkpointCoordinator.restoreLatestCheckpointedState( newExecutionGraph.getAllVertices(), false, - false)) { + false, + userCodeLoader)) { // check whether we can restore from a savepoint tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/HDFSUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/HDFSUtils.java new file mode 100644 index 00000000000..1ee6cc40980 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/HDFSUtils.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.CHECKPOINT_DIR_PREFIX; +import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.METADATA_FILE_NAME; + +public class HDFSUtils { + private static final Logger log = LoggerFactory.getLogger(HDFSUtils.class); + + public static String HDFS_PREFIX = "hdfs"; + + public static String VIEWFS_PREFIX = "viewfs"; + + public static String getFullPathForLatestJobCompletedCheckpointMeta(String path) throws Exception { + // get job directory list + FileStatus[] jobList = getFileListByHDFSPath(path); + + if(jobList != null && jobList.length >= 2) { + // get the latest job by modified time + String latestJobDir = getLatestJobDirectory(jobList); + log.info("Latest job directory: {}", latestJobDir); + + if(latestJobDir != null && !latestJobDir.isEmpty()) { + FileStatus[] jobSubList = getFileListByHDFSPath(path + "/" + latestJobDir); + if(jobSubList != null && jobSubList.length > 1) { + for(FileStatus fileStatus2: jobSubList) { + log.info("Sub directory {} for latest job {}", fileStatus2.getPath().getName(), latestJobDir); + if(fileStatus2.getPath().getName().contains(CHECKPOINT_DIR_PREFIX)) { + return path + "/" + latestJobDir + "/" + fileStatus2.getPath().getName() + "/" + METADATA_FILE_NAME; + } + } + } + } + } + + return null; + } + + private static String getLatestJobDirectory(FileStatus[] jobList) { + String result = null; + // the max modify time directory is for the current job + long maxModifyTime = 0L; + // the latest job directory + long secondModifyTime = 0L; + for (FileStatus fileStatus: jobList) { + if(fileStatus.getModificationTime() > maxModifyTime) { + maxModifyTime = fileStatus.getModificationTime(); + } + } + + log.info("Max modify time: {}", maxModifyTime); + + for (FileStatus fileStatus: jobList) { + if(fileStatus.getModificationTime() != maxModifyTime && fileStatus.getModificationTime() > secondModifyTime) { + secondModifyTime = fileStatus.getModificationTime(); + result = fileStatus.getPath().getName(); + log.info("Set second directory to {} with time {}", result, secondModifyTime); + } + } + return result; + } + + public static FileStatus[] getFileListByHDFSPath(String path) throws Exception { + if (path == null || path.isEmpty()) { + throw new Exception("HDFS path is null"); + } + + FileStatus[] status = null; + log.info("Get file list by hdfs path: {}", path); + + try { + FileSystem fs = FileSystem.get(new Configuration()); + status = fs.listStatus(new Path(path)); + log.info("File List for path {}", path); + for (FileStatus fileStatus: status) { + log.info("name: {}, modify time: {}", fileStatus.getPath().getName(), fileStatus.getModificationTime()); + } + } catch (Exception e) { + e.printStackTrace(); + } + return status; + } +} diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0988730689a..c99033e085f 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1341,11 +1341,21 @@ class JobManager( // because it is a blocking operation future { try { + + val checkpointMetadataDir = executionGraph.getCheckpointCoordinator.getOldJobCheckpointMetadataPath + + log.info("legacy mode with old job checkpoint metadata path: " + checkpointMetadataDir) + if (isRecovery) { // this is a recovery of a master failure (this master takes over) executionGraph.restoreLatestCheckpointedState(false, false) - } - else { + } else if (checkpointMetadataDir != null && !checkpointMetadataDir.isEmpty) { + log.info("legacy mode restore from old job completed checkpoint: " + checkpointMetadataDir) + executionGraph.getCheckpointCoordinator.restoreSavepoint(checkpointMetadataDir, + false, + executionGraph.getAllVertices, + executionGraph.getUserClassLoader) + } else { // load a savepoint only if this is not starting from a newer checkpoint // as part of an master failure recovery val savepointSettings = jobGraph.getSavepointRestoreSettings ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services