[ https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313670#comment-16313670 ]
ASF GitHub Bot commented on FLINK-5823: --------------------------------------- GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/5248 [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata ## What is the purpose of the change This pull requests puts the State Backends in charge of persisting the metadata. This has the following advantages: - Checkpoints become conceptually independent of File Systems. We can in the future implement state backends purely backed by non-file systems like databases or message queues. - We can simplify or drop the extra code paths implemented for externalized checkpoints and checkpoint metadata persisting in HA cases. - The checkpoint and savepoint configurations go purely through the state backends, making testing much simpler. - Because the configuration go through the state backends only, it is simple to let the state backends pick up a mix of configuration from the application code (in code config) and configuration from the cluster setup. For example, a programmer can pick the state backend, and the cluster can have default limits or checkpoint directories configured. To support that, state backends may implement an additional interface which lets them pick up configuration values from the cluster configuration. - As a followup, this will allow us to implement more efficient ways of dropping checkpoint state (recursive directory delete) as well as logic to scavenge left-over checkpoint data. ## Altered user-facing Behavior - All checkpoints are always "externalized", meaning that the metadata is always persisted. The notion of externalized checkpoints is dropped. - Checkpoints have no "externalization setting", but a **retention policy**, like - `RETAIN_ON_CANCELLATION`: Keep checkpoints when user manually cancels job, similar as the corresponding setting for externalized checkpoints - `RETAIN_ON_FAILURE`: Retain when the job reaches a terminal failure. For compatibility, this is automatically picked when the user calls the now deprecated method to activate externalized checkpoints. - `NEVER_RETAIN_AFTER_TERMINATION`: Conceptually similar to the behavior when no externalized checkpoints were configured. - The `MemoryStateBackend` is viewed as a FileSystem-based State Backend that does not create separate files for state, but just holds state inline with the checkpoint metadata. In the Metadata and Savepoint handling, there is no distinction between the `MemoryStateBackend` and the `FsStateBackend`. - As a special case, the MemoryStateBackend may choose to not durably persist the metadata (when no storage location is configured, by default), in which case it will not be able to support an HA mode (there is an eager check for that). That is merely there to support no-config getting started experiences and simpler in-IDE development setups. ## Followup work To make sure there is no extra persisting of the checkpoint metadata by the HA store (it simply references the regular persisted checkpoint metadata) we need some changes to the `ZooKeeperCompletedCheckpointStore`. Once we start storing shared checkpoint state (incremental checkpoints) and task-owned state (write-ahead sinks) in different locations, we can start optimizing checkpoint directory cleanup, and can start implementing scavengers for left-over state. ## Brief change log - The state backends introduce the concept of a `CheckpointStorage` (storage of bytes) and `CheckpointStorageLocation` (specific location for the bytes of a checkpoint/savepoint). That makes the separation of concerns in the state backend clear: `KeyedStateBackend` and `OperatorStatebackend` define how to hold and checkpoint the state, while `CheckpointStorage` defines how to persist bytes (data and metadata). - The `CheckpointStorage` is responsible for storing the checkpoint metadata. There is no implicit assumption that the checkpoint metadata is stored in a file systems any more. - All checkpoint directory / savepoint directory specific config settings are now part of the state backends. The Checkpoint Coordinator simply calls the relevant methods on the state backends to store metadata. - All checkpoints are addressable via a "pointer", which is interpreted by the state backend to find the checkpoint. For File-system based statebackends (all statebackends in Flink currently), this pointer is the file path. ## Verifying this change This change adds and adjusts many existing tests to verify the behavior. Manual verification can happen by just starting a regular Flink cluster, enabling checkpoints, and seeing that metadata files get persisted always. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? Kind of, it removes teh concept of externalized checkpoints. - If yes, how is the feature documented? Not documented, yet. If approved, will add docs as part of the next steps. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink backend2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5248.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5248 ---- commit dfdfb7e9bba925035dcf7eecffddbdf341b90f7e Author: Stephan Ewen <sewen@...> Date: 2017-10-25T11:23:46Z [FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator commit a47bdd2b9d8cc8c62b47b2ffec5c418e038f7c20 Author: Stephan Ewen <sewen@...> Date: 2017-10-25T15:30:14Z [FLINK-7925] [checkpoints] Add CheckpointingOptions The CheckpointingOptions consolidate all checkpointing and state backend-related settings that were previously split across different classes. commit a550e115be1616587c19d7c21225ed1467801ddf Author: Stephan Ewen <sewen@...> Date: 2017-10-25T15:32:17Z [hotfix] [core] Fix broken JavaDoc links in ConfigConstants commit b94a261fed8233a9f7adbaf2a813e1953c9a1a28 Author: Stephan Ewen <sewen@...> Date: 2017-10-25T17:04:10Z [FLINK-5823] [checkpoints] State backends define checkpoint and savepoint directories, improved configuration commit 712d5b962bffeef1ec3f5a803ceb08d6409fe12e Author: Stephan Ewen <sewen@...> Date: 2017-10-26T12:55:28Z [hotfix] [rocksdb] Clean up RocksDB state backend code - arrange variables to properly express configuration (client side) versus runtime (task manager side) - make all runtime-only fields properly transient - fix confusing variable name for local directories commit a7510cf56c259ba0daa1230be27e34040004d413 Author: Stephan Ewen <sewen@...> Date: 2017-10-26T13:43:23Z [FLINK-5823] [checkpoints] Make RocksDB state backend configurable commit ef5bd0c7ef3825607274ff78aa9e0ac8159828b5 Author: Stephan Ewen <sewen@...> Date: 2017-10-26T19:26:00Z [FLINK-5823] [checkpoints] State backends now also handle the checkpoint metadata ---- > Store Checkpoint Root Metadata in StateBackend (not in HA custom store) > ----------------------------------------------------------------------- > > Key: FLINK-5823 > URL: https://issues.apache.org/jira/browse/FLINK-5823 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)