GitHub user uce opened a pull request: https://github.com/apache/flink/pull/1153
[FLINK-2354] Add job graph and checkpoint recovery ## tl;dr This PR introduces `JobGraph` and `SuccessfulCheckpoint` recovery for submitted programs in case of JobManager failures. ## General Idea The general idea is to persist job graphs and successful checkpoints in ZooKeeper. We have introduced JobManager high availability via ZooKeeper in #1016. My PR builds on top of this and adds initial support for program recovery. We can recover both programs and successful checkpoints in case of a JobManager failure as soon as a standby job manager is granted leadership. ZooKeeper's sweet spot is rather small data (in KB range), but job graph and checkpoint state can grow larger. Therefore we don't directly persist the actual metadata, but use the state backend as a layer of indirection. We create state handles for the job graph and completed checkpoints and persist those. The state handle acts as a pointer to the actual data. At the moment, only the file system state backend is supported for this. The state handles need to be accessible from both task and job managers (e.g. a DFS). ## Configuration The minimal required configuration: ```bash recovery.mode: ZOOKEEPER ha.zookeeper.quorum: <ZooKeeper quroum peers> state.backend: FILESYSTEM state.backend.fs.dir.recovery: /path/to/recovery ``` I don't like the current configuration keys. Until the next release, I would like a more consistent naming, e.g. prefix everything with `recovery.zookeeper`. ## ZooKeeper Nodes Overview Overview of ZNodes and components managing them: ```bash O- /flink | +----O /flink/jobgraphs (SubmittedJobGraphs) | | | +----O /flink/jobgraphs/<job-id> | +----O /flink/checkpoints (CompletedCheckpoints) | | | +----O /flink/checkpoints/<job-id> | . | | . +----O /flink/checkpoints/<job-id>/1 | . | | . +----O /flink/checkpoints/<job-id>/N | +----O /flink/checkpoint-counter (CheckpointIDCounter) | +----O /flink/checkpoints/<job-id> ``` ## Implementation ### Submission vs. Recovery (JobManager and SubmittedJobGraphs) - `ZooKeeperSubmittedJobGraphs` manages `SubmittedJobGraph` state handles in ZooKeeper - Submission and recovery follow mostly the same code paths (see `JobManager#submitJob()`). - On (initial) submission: - After writing to ZooKeeper the JM checks synchronously whether she is still leader. - If not, the job is not scheduled for execution, but kept in ZooKeeper. Future leading JobManagers need to recover it. The client currently sees this as successful submission. The job is not removed in this case, because another job manager might recover between the write and remove. In such a case, a job would be running without being in ZooKeeper and without being acked to the client. - On recovery: - Recovery is triggered on granted leadership via the configured execution delay between retries. - All available jobs are scheduled for execution. - The ZNode for job graphs is monitored for modifications during operations. This way, a job manager can (eventually) detect if another job manager adds/removes a job and react to it. ### CompletedCheckpoints - `ZooKeeperCompletedCheckpoints` manages `SuccessfulCheckpoint` state handles in ZooKeeper (per job). Note that a `SuccessfulCheckpoint` has pointers to further state handles in most cases. In this case, we add another layer of indirection. - Every completed checkpoint is added to ZooKeeper and identified by its checkpoint ID. - On recovery, the latest checkpoint is recovered. If more than one checkpoint is available, we still only recover one in order to make sure that the history of checkpoints is consistent (currently we retain only 1 checkpoint anyways, but if we ever chose to retain more) in corner cases, where multiple job managers run the same job with checkpointing for some time. ### CheckpointIDCounter - `ZooKeeperCheckpointIDCounter` manages a shared counter in ZooKeeper (per job). - The `Checkpointed` interface requires ascending checkpoint IDs for each checkpoint. - We use a shared counter (per job) via a Curator recipe for this. ### Akka messages - This PR introduces two new JobManager message types: - RecoverAllJobs - RecoverJob(JobID) - The ZooKeeper operations are blocking and all JobManager actor calls needs to make sure to *not* block the JobManager. I've tried to cover all cases, where a ZooKeeper operation is triggered. - For tests, I didn't manage to stop the JobManager actor w/o running the `postStop` method. Because this method has some cleanup logic (removing job graphs and checkpoints), all JobManager recovery tests run the JobManager as a separate `JobManagerProcess`. This is quite heavy weight. If someone knows a way to stop the actor w/o the `postStop` being called, it would be great to refactor this. ## Next Steps - Behaviour on recovery via fixed delay is too simplistic. - Client is not fully integrated and submits jobs in detached mode if recovery mode is set to ZooKeeper. ## Tests There was a Travis/AWS outage yesterday and I couldn't run as many builds as we should yet. I would like to run a couple of runs before we merge this. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink recovery-2354 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1153 ---- commit aa0be0a27b7077fcdb303d99db40a4fb85acf82a Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-03T13:13:28Z [runtime] Add type parameter to ByteStreamStateHandle commit f37041bd705e71a3d7b2897e498fbbe625b38217 Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-19T17:53:18Z [clients] Submit job detached if recovery enabled commit 83523771621eb8446a365e769f7b525d6430bcbb Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-20T11:08:24Z [FLINK-2652] [tests] Temporary ignore flakey PartitionRequestClientFactoryTest commit ad9b6572b73229ed92a6b3a0eee08d36a8e8bc6e Author: Ufuk Celebi <u...@apache.org> Date: 2015-09-01T15:25:46Z [FLINK-2354] [runtime] Add job graph and checkpoint recovery ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---