GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/5396

    [FLINK-5820] [state backends] Split shared/exclusive state and properly 
handle disposal

    ## What is the purpose of the change
    
    This PR contains the final changes needed for [FLINK-5820]. Disposal of 
checkpoint directories happens properly across all file system types 
(previously did not work properly for some S3 connectors) with reduced calls to 
the file systems. Shared and exclusive state are split into different 
directories, to help implement cleanup safety nets.
    
    ## Brief change log
    
      1. TaskManagers use the `CheckpointStorage` to create 
`CheckpointStreamFactories`. Previously, these stream factories were created by 
the `StateBackend`. This completes the separating out the "storage" aspect of 
the `StateBackend` into the `CheckpointStorage`.
    
      2. The location where to store state is communicated between the 
`CheckpointCoordinator` (instantiating the original `CheckpointStorageLocation` 
for a checkpoint/savepoint) and the Tasks in a unified manner. Tasks 
transparently obtain their `CheckpointStreamFactories` always in the same way, 
regardless of whether writing state for checkpoints or savepoints.
    
      3. Checkpoint state now has the scope `EXCLUSIVE` or `SHARED`, which may 
be stored differently. The current file system based backends put shared state 
into a */shared* directory, while exclusive state goes into the */chk-1234* 
directory.
    
      4. Tasks can directly write *task-owned state* to a checkpoint storage. 
That state neither belongs specifically to one checkpoint, nor is it shared and 
eventually released by the Checkpoint Coordinator. Only the tasks themselves 
may release the state. An example for that type of state are the *write ahead 
logs* created by some sinks.
    
      5. When a checkpoint is finalized, its storage is described by a 
`CompletedCheckpointStorageLocation`. That object gives access to addressing, 
metadata, and handles location disposal. This allows us to drop the *"delete 
parent if empty"* logic in File State Handles and fixes the issue that 
checkpoint directories are currently left over on S3.
    
    **Future Work**
    
      - In the future, the `CompletedCheckpointStorageLocation` should also be 
used as a way to handle relative addressing of checkpoints, to allow users to 
move them to different directories without breaking the internal paths.
    
      - We can now implement disposal fast paths, like drop directory as a 
whole, rather than dropping each state object separately. However, one would 
still need to release drop shared state objects individually. Finishing these 
fast paths is currently blocked on some rework of the shared state handles, to 
make their selective release easier and more robust.
    
    ## Verifying this change
    
    This change can be verified by running a Flink cluster with a checkpointed 
program and 
    
    This PR also adds and adjusts various unit tests to guard the new behavior.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
      - The serializers: (yes / no / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
      - The S3 file system connector: (yes / no / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? *Somewhat* (it changes 
the state backend directory layouts)
      - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink locations

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5396.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5396
    
----
commit ec8e552a7b50b8c605bb2609713cb2dd50245118
Author: Stephan Ewen <sewen@...>
Date:   2018-01-30T14:53:46Z

    [hotfix] [checkpointing] Cleanup: Fix Nullability and names in checkpoint 
stats.

commit cf18831b69bd909d6491eb73d3294d3295ddd930
Author: Stephan Ewen <sewen@...>
Date:   2018-01-30T10:57:30Z

    [hotfix] [tests] Drop obsolete CheckpointExternalResumeTest.
    
    Because all checkpoints are now externalized (write their metadata) this is 
an obsolete test.

commit a46acdd0f7142e40eee8c742e17eefaa6c7da3da
Author: Stephan Ewen <sewen@...>
Date:   2018-01-29T22:24:24Z

    [hotfix] [checkpoints] Clean up CompletedCheckpoint, grouping related 
methods together

commit f3eb9511718fe8526a6602b547ce6b4f1fd9e2fa
Author: Stephan Ewen <sewen@...>
Date:   2018-01-29T22:54:57Z

    [hotfix] [checkpoints] Drop ill-defined hashCode() and equals() from 
CompletedCheckpoint.

commit 8d198c7eb7c0f23f179f45b1cdc8b862027076b7
Author: Stephan Ewen <sewen@...>
Date:   2018-01-26T09:56:39Z

    [hotfix] [tests] Clean up HeapKeyedStateBackendAsyncByDefaultTest

commit 59c917eb5dc5329a233c760367ffbef2ffe98beb
Author: Stephan Ewen <sewen@...>
Date:   2018-01-10T16:16:03Z

    [FLINK-8531] [checkpoints] (part 1) Pull CheckpointType into its own class.

commit e9ed622588190008cc9bfaa03578a55bdba4c09d
Author: Stephan Ewen <sewen@...>
Date:   2018-01-19T14:18:57Z

    [FLINK-8531] [checkpoints] (part 2) Add CheckpointType to 
CheckpointProperties

commit f38eb542684365837474303a10d49eb50eabb378
Author: Stephan Ewen <sewen@...>
Date:   2018-01-10T17:02:27Z

    [FLINK-8531] [checkpoints] (part 3) Rework ExternalizedCheckpointITCase

commit 2551f0df0ed0b89d25f4442f101fa453f3ae697e
Author: Stephan Ewen <sewen@...>
Date:   2018-01-10T17:13:50Z

    [FLINK-8531] [checkpoints] (part 4) rename forCheckpoint() to 
forCheckpointWithDefaultLocation()

commit 09dab449f665e5b2eea08912b7d3ecfa32449e53
Author: Stephan Ewen <sewen@...>
Date:   2018-01-19T12:37:08Z

    [FLINK-8531] [checkpoints] (part 5) Introduce 
CheckpointStorageLocationReference instead of String to communicate the location

commit 828873847ca710a562f96a86c1caf058347a3406
Author: Stephan Ewen <sewen@...>
Date:   2018-01-10T16:14:06Z

    [FLINK-8531] [checkpoints] (part 6) Tasks resolve CheckpointStreamFactory 
from CheckpointStorage and Checkpoint Location Reference to persist checkpoint 
data.

commit 950e7fd2987cd12d14829be573c3f8c54056e965
Author: Stephan Ewen <sewen@...>
Date:   2018-01-26T10:20:59Z

    [FLINK-8531] [checkpoints] (part 7) Move tests specific to Checkpoint 
Storage and Checkpoint Stream to separate tests suites

commit 19d6d1603841d321e5c509c8d054d6056f8ff243
Author: Stephan Ewen <sewen@...>
Date:   2018-01-26T11:06:44Z

    [FLINK-8531] [checkpoints] (part 8) Add tests for the FsCheckpointStorage 
and MemoryBackendCheckpointStorage.

commit 62f635dbec3ee4d7b151e8ae0887a4cf84427752
Author: Stephan Ewen <sewen@...>
Date:   2018-01-26T17:37:42Z

    [FLINK-8531] [checkpoints] (part 9) Introduce EXCLUSIVE and SHARED scope 
for states

commit 3c5440d3dd2f0fe3a0ef6c2855cefef0fc028ed4
Author: Stephan Ewen <sewen@...>
Date:   2018-01-26T09:42:34Z

    [hotfix] [runtime] Fix checkstyle for 'runtime/io/network/api'.

commit 4f2770f4a9cd880218a832676deb2f33a690284c
Author: Stephan Ewen <sewen@...>
Date:   2018-01-31T15:54:58Z

    [FLINK-8539] [checkpointing] (part 1) Introduce 
CompletedCheckpointStorageLocation to properly handle disposal of checkpoints.
    
    That concept allows us to properly handle deletion of a checkpoint storage, 
for example deleting checkpoint
    directories, or the dropping of a checkpoint specific table.
    
    This replaces the current workaround for file systems, where every file 
disposal checks if the parent
    directory is now empty, and deletes it if that is the case. That is not 
only inefficient, but
    prohibitively expensive on some systems, like Amazon S3.

commit 99ca08650b1de215e0079eed04d513fecd445299
Author: Stephan Ewen <sewen@...>
Date:   2018-01-31T15:57:45Z

    [FLINK-8539] [checkpointing] (part 2) Modify all tests to use 
CompletedCheckpointStorageLocation.

commit cf26d3875eb6db3818c73dc1f42cc4ecaad98103
Author: Stephan Ewen <sewen@...>
Date:   2018-01-31T16:01:33Z

    [FLINK-8539] [checkpointing] (part 3) Rename FixFileFsStateOutputStream to 
FsCheckpointMetadataOutputStream
    
    The new name captures the proper use and meaning of the class in a better 
way.

commit 51691e0d1ff5b334b67b87e700c89b69657f28a5
Author: Stephan Ewen <sewen@...>
Date:   2018-01-26T11:46:44Z

    [FLINK-8540] [checkpointing] FileStateHandles no longer attempt to clean up 
their parent directory.
    
    Performing directory contents checks and cleaning up the parent directory 
in the state handle disposal
    has previously led to excessive file system metadata requests, which 
especially on systems like
    Amazon S3 is prohibitively expensive.

----


---

Reply via email to