[ 
https://issues.apache.org/jira/browse/FLINK-9043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596981#comment-16596981
 ] 

ASF GitHub Bot commented on FLINK-9043:
---------------------------------------

GodfreyJohnson closed pull request #6630: [FLINK-9043] restore from the latest 
job's completed checkpoint with …
URL: https://github.com/apache/flink/pull/6630
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e936b246222..165377e269e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -41,6 +41,7 @@
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
+import org.apache.flink.runtime.util.HDFSUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -180,6 +181,9 @@
        /** Registry that tracks state which is shared across (incremental) 
checkpoints */
        private SharedStateRegistry sharedStateRegistry;
 
+       /** StateBackend to get the external checkpoint base dir */
+       private StateBackend stateBackend;
+
        // 
--------------------------------------------------------------------------------------------
 
        public CheckpointCoordinator(
@@ -228,6 +232,7 @@ public CheckpointCoordinator(
                this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.executor = checkNotNull(executor);
+               this.stateBackend = checkpointStateBackend;
                this.sharedStateRegistryFactory = 
checkNotNull(sharedStateRegistryFactory);
                this.sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
 
@@ -1076,6 +1081,157 @@ public boolean restoreLatestCheckpointedState(
                }
        }
 
+       /**
+        * Restores the latest checkpointed state.
+        *
+        * @param tasks Map of job vertices to restore. State for these 
vertices is
+        * restored via {@link 
Execution#setInitialState(JobManagerTaskRestore)}.
+        * @param errorIfNoCheckpoint Fail if no completed checkpoint is 
available to
+        * restore from.
+        * @param allowNonRestoredState Allow checkpoint state that cannot be 
mapped
+        * to any job vertex in tasks.
+        * @param userCodeLoader classloader
+        * @return <code>true</code> if state was restored, <code>false</code> 
otherwise.
+        * @throws IllegalStateException If the CheckpointCoordinator is shut 
down.
+        * @throws IllegalStateException If no completed checkpoint is 
available and
+        *                               the <code>failIfNoCheckpoint</code> 
flag has been set.
+        * @throws IllegalStateException If the checkpoint contains state that 
cannot be
+        *                               mapped to any job vertex in 
<code>tasks</code> and the
+        *                               <code>allowNonRestoredState</code> 
flag has not been set.
+        * @throws IllegalStateException If the max parallelism changed for an 
operator
+        *                               that restores state from this 
checkpoint.
+        * @throws IllegalStateException If the parallelism changed for an 
operator
+        *                               that restores <i>non-partitioned</i> 
state from this
+        *                               checkpoint.
+        */
+       public boolean restoreLatestCheckpointedState(
+               Map<JobVertexID, ExecutionJobVertex> tasks,
+               boolean errorIfNoCheckpoint,
+               boolean allowNonRestoredState,
+               ClassLoader userCodeLoader) throws Exception {
+
+               synchronized (lock) {
+                       if (shutdown) {
+                               throw new 
IllegalStateException("CheckpointCoordinator is shut down");
+                       }
+
+                       // We create a new shared state registry object, so 
that all pending async disposal requests from previous
+                       // runs will go against the old object (were they can 
do no harm).
+                       // This must happen under the checkpoint lock.
+                       sharedStateRegistry.close();
+                       sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
+
+                       // Recover the checkpoints, TODO this could be done 
only when there is a new leader, not on each recovery
+                       completedCheckpointStore.recover();
+
+                       // Now, we re-register all (shared) states from the 
checkpoint store with the new registry
+                       for (CompletedCheckpoint completedCheckpoint : 
completedCheckpointStore.getAllCheckpoints()) {
+                               
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+                       }
+
+                       LOG.debug("Status of the shared state registry of job 
{} after restore: {}.", job, sharedStateRegistry);
+                       LOG.info("Current job completed checkpoint size {}", 
completedCheckpointStore.getAllCheckpoints().size());
+
+                       // Restore from the latest checkpoint
+                       CompletedCheckpoint latest = 
completedCheckpointStore.getLatestCheckpoint();
+
+                       if (latest == null) {
+                               LOG.info("Current job latest completed 
checkpoint is null");
+                               if (errorIfNoCheckpoint) {
+                                       throw new IllegalStateException("No 
completed checkpoint available");
+                               } else {
+                                       String metadataPath = 
getOldJobCheckpointMetadataPath();
+
+                                       if (metadataPath != null && 
!metadataPath.isEmpty()) {
+                                               // restore from checkpoing with 
hdfs path
+                                               LOG.info("The old job already 
has completed checkpoint," +
+                                                       " new job will recovery 
from completed checkpoint");
+                                               restoreSavepoint(metadataPath, 
allowNonRestoredState, tasks, userCodeLoader);
+                                               return true;
+                                       } else {
+                                               LOG.debug("Resetting the master 
hooks.");
+                                               
MasterHooks.reset(masterHooks.values(), LOG);
+
+                                               return false;
+                                       }
+
+                               }
+                       }
+
+                       LOG.info("Restoring job {} from latest valid 
checkpoint: {}.", job, latest);
+
+                       // re-assign the task states
+                       final Map<OperatorID, OperatorState> operatorStates = 
latest.getOperatorStates();
+
+                       StateAssignmentOperation stateAssignmentOperation =
+                               new 
StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, 
allowNonRestoredState);
+
+                       stateAssignmentOperation.assignStates();
+
+                       // call master hooks for restore
+
+                       MasterHooks.restoreMasterHooks(
+                               masterHooks,
+                               latest.getMasterHookStates(),
+                               latest.getCheckpointID(),
+                               allowNonRestoredState,
+                               LOG);
+
+                       // update metrics
+
+                       if (statsTracker != null) {
+                               long restoreTimestamp = 
System.currentTimeMillis();
+                               RestoredCheckpointStats restored = new 
RestoredCheckpointStats(
+                                       latest.getCheckpointID(),
+                                       latest.getProperties(),
+                                       restoreTimestamp,
+                                       latest.getExternalPointer());
+
+                               statsTracker.reportRestoredCheckpoint(restored);
+                       }
+
+                       return true;
+               }
+       }
+
+       public String getOldJobCheckpointMetadataPath() throws Exception {
+               String stateBackendString = stateBackend.toString();
+               LOG.info("stateBackendString: {}", stateBackendString);
+
+               String externalCheckpointBaseDir = null;
+
+               if (stateBackendString != null && !stateBackendString.isEmpty() 
&&
+                       (stateBackendString.contains(HDFSUtils.HDFS_PREFIX) ||
+                               
stateBackendString.contains(HDFSUtils.VIEWFS_PREFIX))) {
+                       // just for hdfs
+                       String[] tmp1 = stateBackendString.split(",");
+                       if(tmp1 != null && tmp1.length >= 1) {
+                               String[] tmp2 = tmp1[0].split(":");
+                               if (tmp2 != null){
+                                       if (tmp2.length == 2) {
+                                               externalCheckpointBaseDir = 
tmp2[1].trim().replaceAll("'", "");
+                                       } else if (tmp2.length == 3){
+                                               externalCheckpointBaseDir = 
tmp2[1].trim().replaceAll("'", "")+":"+tmp2[2].trim().replaceAll("'", "");
+                                       }
+                               }
+                       }
+               } else {
+                       return null;
+               }
+
+               LOG.info("externalCheckpointBaseDir: {}", 
externalCheckpointBaseDir);
+
+               String metadataPath = 
HDFSUtils.getFullPathForLatestJobCompletedCheckpointMeta(externalCheckpointBaseDir);
+
+               if(metadataPath != null && metadataPath.isEmpty()) {
+                       LOG.info("checkpoint already exist with metadata path: 
{}", metadataPath);
+               } else {
+                       LOG.info("not found the completed checkpoint from the 
old job");
+               }
+
+               return metadataPath;
+       }
+
        /**
         * Restore the state with given savepoint.
         *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 01cb2b6b099..edbecda04e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1138,7 +1138,8 @@ private ExecutionGraph 
createAndRestoreExecutionGraph(JobManagerJobMetricGroup c
                        if 
(!checkpointCoordinator.restoreLatestCheckpointedState(
                                newExecutionGraph.getAllVertices(),
                                false,
-                               false)) {
+                               false,
+                               userCodeLoader)) {
 
                                // check whether we can restore from a savepoint
                                
tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, 
jobGraph.getSavepointRestoreSettings());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/HDFSUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/HDFSUtils.java
new file mode 100644
index 00000000000..1ee6cc40980
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/HDFSUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.CHECKPOINT_DIR_PREFIX;
+import static 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.METADATA_FILE_NAME;
+
+public class HDFSUtils {
+       private static final Logger log = 
LoggerFactory.getLogger(HDFSUtils.class);
+
+       public static String HDFS_PREFIX = "hdfs";
+
+       public static String VIEWFS_PREFIX = "viewfs";
+
+       public static String 
getFullPathForLatestJobCompletedCheckpointMeta(String path) throws Exception {
+               // get job directory list
+               FileStatus[] jobList = getFileListByHDFSPath(path);
+
+               if(jobList != null && jobList.length >= 2) {
+                       // get the latest job by modified time
+                       String latestJobDir = getLatestJobDirectory(jobList);
+                       log.info("Latest job directory: {}", latestJobDir);
+
+                       if(latestJobDir != null && !latestJobDir.isEmpty()) {
+                               FileStatus[] jobSubList = 
getFileListByHDFSPath(path + "/" + latestJobDir);
+                               if(jobSubList != null && jobSubList.length > 1) 
{
+                                       for(FileStatus fileStatus2: jobSubList) 
{
+                                               log.info("Sub directory {} for 
latest job {}", fileStatus2.getPath().getName(), latestJobDir);
+                                               
if(fileStatus2.getPath().getName().contains(CHECKPOINT_DIR_PREFIX)) {
+                                                       return path + "/" + 
latestJobDir + "/" + fileStatus2.getPath().getName() + "/" + METADATA_FILE_NAME;
+                                               }
+                                       }
+                               }
+                       }
+               }
+
+               return null;
+       }
+
+       private static String getLatestJobDirectory(FileStatus[] jobList) {
+               String result = null;
+               // the max modify time directory is for the current job
+               long maxModifyTime = 0L;
+               // the latest job directory
+               long secondModifyTime = 0L;
+               for (FileStatus fileStatus: jobList) {
+                       if(fileStatus.getModificationTime() > maxModifyTime) {
+                               maxModifyTime = 
fileStatus.getModificationTime();
+                       }
+               }
+
+               log.info("Max modify time: {}", maxModifyTime);
+
+               for (FileStatus fileStatus: jobList) {
+                       if(fileStatus.getModificationTime() != maxModifyTime && 
fileStatus.getModificationTime() > secondModifyTime) {
+                               secondModifyTime = 
fileStatus.getModificationTime();
+                               result = fileStatus.getPath().getName();
+                               log.info("Set second directory to {} with time 
{}", result, secondModifyTime);
+                       }
+               }
+               return result;
+       }
+
+       public static FileStatus[] getFileListByHDFSPath(String path) throws 
Exception {
+               if (path == null || path.isEmpty()) {
+                       throw new Exception("HDFS path is null");
+               }
+
+               FileStatus[] status = null;
+               log.info("Get file list by hdfs path: {}", path);
+
+               try {
+                       FileSystem fs = FileSystem.get(new Configuration());
+                       status = fs.listStatus(new Path(path));
+                       log.info("File List for path {}", path);
+                       for (FileStatus fileStatus: status) {
+                               log.info("name: {}, modify time: {}", 
fileStatus.getPath().getName(), fileStatus.getModificationTime());
+                       }
+               } catch (Exception e) {
+                       e.printStackTrace();
+               }
+               return status;
+       }
+}
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0988730689a..c99033e085f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1341,11 +1341,21 @@ class JobManager(
       // because it is a blocking operation
       future {
         try {
+
+          val checkpointMetadataDir = 
executionGraph.getCheckpointCoordinator.getOldJobCheckpointMetadataPath
+
+          log.info("legacy mode with old job checkpoint metadata path: " + 
checkpointMetadataDir)
+
           if (isRecovery) {
             // this is a recovery of a master failure (this master takes over)
             executionGraph.restoreLatestCheckpointedState(false, false)
-          }
-          else {
+          } else if (checkpointMetadataDir != null && 
!checkpointMetadataDir.isEmpty) {
+            log.info("legacy mode restore from old job completed checkpoint: " 
+ checkpointMetadataDir)
+            
executionGraph.getCheckpointCoordinator.restoreSavepoint(checkpointMetadataDir,
+              false,
+              executionGraph.getAllVertices,
+              executionGraph.getUserClassLoader)
+          } else {
             // load a savepoint only if this is not starting from a newer 
checkpoint
             // as part of an master failure recovery
             val savepointSettings = jobGraph.getSavepointRestoreSettings


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Introduce a friendly way to resume the job from externalized checkpoints 
> automatically
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-9043
>                 URL: https://issues.apache.org/jira/browse/FLINK-9043
>             Project: Flink
>          Issue Type: New Feature
>            Reporter: godfrey johnson
>            Assignee: Sihua Zhou
>            Priority: Major
>              Labels: pull-request-available
>
> I know a flink job can reovery from checkpoint with restart strategy, but can 
> not recovery as spark streaming jobs when job is starting.
> Every time, the submitted flink job is regarded as a new job, while , in the 
> spark streaming  job, which can detect the checkpoint directory first,  and 
> then recovery from the latest succeed one. However, Flink only can recovery 
> until the job failed first, then retry with strategy.
>  
> So, would flink support to recover from the checkpoint directly in a new job?
> h2. New description by [~sihuazhou]
> Currently, it's quite a bit not friendly for users to recover job from the 
> externalized checkpoint, user need to find the dedicate dir for the job which 
> is not a easy thing when there are too many jobs. This ticket attend to 
> introduce a more friendly way to allow the user to use the externalized 
> checkpoint to do recovery.
> The implementation steps are copied from the comments of [~StephanEwen]:
>  - We could make this an option where you pass a flag (-r) to automatically 
> look for the latest checkpoint in a given directory.
>  - If more than one jobs checkpointed there before, this operation would fail.
>  - We might also need a way to have jobs not create the UUID subdirectory, 
> otherwise the scanning for the latest checkpoint would not easily work.
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to