GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/1153

    [FLINK-2354] Add job graph and checkpoint recovery

    ## tl;dr
    
    This PR introduces `JobGraph` and `SuccessfulCheckpoint` recovery for 
submitted programs in case of JobManager failures.
    
    ## General Idea
    
    The general idea is to persist job graphs and successful checkpoints in 
ZooKeeper.
    
    We have introduced JobManager high availability via ZooKeeper in #1016. My 
PR builds on top of this and adds initial support for program recovery. We can 
recover both programs and successful checkpoints in case of a JobManager 
failure as soon as a standby job manager is granted leadership.
    
    ZooKeeper's sweet spot is rather small data (in KB range), but job graph 
and checkpoint state can grow larger. Therefore we don't directly persist the 
actual metadata, but use the state backend as a layer of indirection. We create 
state handles for the job graph and completed checkpoints and persist those. 
The state handle acts as a pointer to the actual data.
    
    At the moment, only the file system state backend is supported for this. 
The state handles need to be accessible from both task and job managers (e.g. a 
DFS).
    
    ## Configuration
    
    The minimal required configuration:
    
    ```bash
    recovery.mode: ZOOKEEPER
    ha.zookeeper.quorum: <ZooKeeper quroum peers>
    state.backend: FILESYSTEM
    state.backend.fs.dir.recovery: /path/to/recovery
    ```
    
    I don't like the current configuration keys. Until the next release, I 
would like a more consistent naming, e.g. prefix everything with 
`recovery.zookeeper`.
    
    ## ZooKeeper Nodes Overview
    
    Overview of ZNodes and components managing them:
    
    ```bash
    O- /flink
    |
    +----O /flink/jobgraphs (SubmittedJobGraphs)
    |    |
    |    +----O /flink/jobgraphs/<job-id>
    |
    +----O /flink/checkpoints  (CompletedCheckpoints)
    |    |
    |    +----O /flink/checkpoints/<job-id>
    |    .    |
    |    .    +----O /flink/checkpoints/<job-id>/1
    |    .    |
    |    .    +----O /flink/checkpoints/<job-id>/N
    |
    +----O /flink/checkpoint-counter (CheckpointIDCounter)
         |
         +----O /flink/checkpoints/<job-id>
    ```
    
    ## Implementation
    
    ### Submission vs. Recovery (JobManager and SubmittedJobGraphs)
    
    - `ZooKeeperSubmittedJobGraphs` manages `SubmittedJobGraph` state handles 
in ZooKeeper
    - Submission and recovery follow mostly the same code paths (see 
`JobManager#submitJob()`).
    - On (initial) submission:
      - After writing to ZooKeeper the JM checks synchronously whether she is 
still leader.
      - If not, the job is not scheduled for execution, but kept in ZooKeeper. 
Future leading JobManagers need to recover it. The client currently sees this 
as successful submission. The job is not removed in this case, because another 
job manager might recover between the write and remove. In such a case, a job 
would be running without being in ZooKeeper and without being acked to the 
client.
    - On recovery:
      - Recovery is triggered on granted leadership via the configured 
execution delay between retries.
      - All available jobs are scheduled for execution.
    - The ZNode for job graphs is monitored for modifications during 
operations. This way, a job manager can (eventually) detect if another job 
manager adds/removes a job and react to it.
    
    ### CompletedCheckpoints
    
    - `ZooKeeperCompletedCheckpoints` manages `SuccessfulCheckpoint` state 
handles in ZooKeeper (per job). Note that a `SuccessfulCheckpoint` has pointers 
to further state handles in most cases. In this case, we add another layer of 
indirection.
    - Every completed checkpoint is added to ZooKeeper and identified by its 
checkpoint ID.
    - On recovery, the latest checkpoint is recovered. If more than one 
checkpoint is available, we still only recover one in order to make sure that 
the history of checkpoints is consistent (currently we retain only 1 checkpoint 
anyways, but if we ever chose to retain more) in corner cases, where multiple 
job managers run the same job with checkpointing for some time.
    
    ### CheckpointIDCounter
    
    - `ZooKeeperCheckpointIDCounter` manages a shared counter in ZooKeeper (per 
job).
    - The `Checkpointed` interface requires ascending checkpoint IDs for each 
checkpoint.
    - We use a shared counter (per job) via a Curator recipe for this.
    
    ### Akka messages
    
    - This PR introduces two new JobManager message types:
      - RecoverAllJobs
      - RecoverJob(JobID)
    - The ZooKeeper operations are blocking and all JobManager actor calls 
needs to make sure to *not* block the JobManager. I've tried to cover all 
cases, where a ZooKeeper operation is triggered.
    - For tests, I didn't manage to stop the JobManager actor w/o running the 
`postStop` method. Because this method has some cleanup logic (removing job 
graphs and checkpoints), all JobManager recovery tests run the JobManager as a 
separate `JobManagerProcess`. This is quite heavy weight. If someone knows a 
way to stop the actor w/o the `postStop` being called, it would be great to 
refactor this.
    
    ## Next Steps
    
    - Behaviour on recovery via fixed delay is too simplistic.
    - Client is not fully integrated and submits jobs in detached mode if 
recovery mode is set to ZooKeeper.
    
    ## Tests
    
    There was a Travis/AWS outage yesterday and I couldn't run as many builds 
as we should yet. I would like to run a couple of runs before we merge this.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink recovery-2354

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1153
    
----
commit aa0be0a27b7077fcdb303d99db40a4fb85acf82a
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-03T13:13:28Z

    [runtime] Add type parameter to ByteStreamStateHandle

commit f37041bd705e71a3d7b2897e498fbbe625b38217
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-19T17:53:18Z

    [clients] Submit job detached if recovery enabled

commit 83523771621eb8446a365e769f7b525d6430bcbb
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-20T11:08:24Z

    [FLINK-2652] [tests] Temporary ignore flakey 
PartitionRequestClientFactoryTest

commit ad9b6572b73229ed92a6b3a0eee08d36a8e8bc6e
Author: Ufuk Celebi <u...@apache.org>
Date:   2015-09-01T15:25:46Z

    [FLINK-2354] [runtime] Add job graph and checkpoint recovery

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to