[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16311347#comment-16311347
 ] 

ASF GitHub Bot commented on FLINK-8360:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/5239

    [FLINK-8360] Implement task-local state recovery

    ## What is the purpose of the change
    
    *This changes introduces the task-local recovery feature. The main idea is 
to have a secondary, local copy of the checkpointed state, while there is still 
a primary copy in DFS that we report to the checkpoint coordinator.
    
    Recovery can attempt to restore from the secondary local copy, if 
available, to save network bandwidth. This requires that the assignment from 
tasks to slots is as sticky is possible.
    
    For starters, we will implement this feature for all managed keyed states 
and can easily enhance it to all other state types (e.g. operator state) later, 
because the basic infrastructure is already in place. This PR is on top of 
#4745.*
    
    
    ## Brief change log
    
      - *Introduced `TaskExecutorLocalStateStoresManager` per task manager. 
This class manages one `TaskLocalStateStore` for each task running on the task 
manager.*
      - *`TaskLocalStateStore` stores and provides the local state for one 
task. Reporting of checkpointed states goes through this class. The primary 
state handles are forwarded to the checkpoint coordinator, the optional 
secondary (local) state is stored in the local store.*
      - *`LocalRecoveryDirectoryProvider` is used by `TaskLocalStateStore` to 
manage the local state directory/ies.*
    - *`StreamTaskStateManager` uses the `TaskLocalStateStore` to restore state 
for its operators.*
    - *File-based local state is created through 
`DuplicatingCheckpointOutputStream`, a stream that duplicates writes into two 
internal streams - typically one primary against a DFS and one secondary 
against local FS.*
    -*RocksDB's incremental checkpoints are not just based on one file, but on 
a directory. As we do not require reference counting for local files (we can 
use hardlinks), we can deal with checkpoint directories as a whole. We 
introduced `IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle` 
for this purpose.*
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *If we activate the local recovery feature on the state backend (via 
`#setLocalRecoveryMode(...)`, and introduce task failure by user-code 
exception, we should observe that managed keyed state is recovered from the 
local FS through the logs.*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
      - The serializers: (yes, slightly and in a way that should not matter.)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (Documentation pending)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink task-local-recovery

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5239
    
----
commit 769aa8619f7da4512e51bfc13110459b50d67c0c
Author: Stefan Richter <s.richter@...>
Date:   2017-08-21T12:31:38Z

    [FLINK-7719] [checkpoints] Send checkpoint id to task as part of deployment 
descriptor when resuming

commit 194d289eb5d67494fb7242dda3589171e0bcd4f6
Author: Stefan Richter <s.richter@...>
Date:   2017-09-27T13:10:52Z

    [FLINK-7720] [checkpoints] Centralize creation of backends and state 
related resources

commit 4beb98837fbb1c99d608b6b5ceea1fba1989fbd8
Author: Stefan Richter <s.richter@...>
Date:   2017-11-28T17:05:41Z

    smaller refactorings of method and class names

commit 2dc9f30bcad35ef2a63a4014e2fa7f9b6411d811
Author: Stefan Richter <s.richter@...>
Date:   2017-11-28T17:06:05Z

    introduce state object collection for better OO.

commit 3c9d39d8ca4c1370467d5596f2c3ef51b9aff043
Author: Stefan Richter <s.richter@...>
Date:   2017-11-29T09:59:05Z

    Split generic type in Snapshotable interface

commit 6cf67e5fc04997873d2d9f4abfaa6be962bb464f
Author: Stefan Richter <s.richter@...>
Date:   2017-11-29T14:33:32Z

    Introduce explicit snapshot result type with 2 locations

commit 5d829f235fa433af4311ca0c943843b879ed4627
Author: Stefan Richter <s.richter@...>
Date:   2017-11-30T11:00:48Z

    Deep compare handle broken. TODO could use matcher instead

commit 238ea9881900f26c6e78af093e96fad348cab002
Author: Stefan Richter <s.richter@...>
Date:   2017-12-01T11:13:43Z

    javadoc on new classes.

commit 5d9c414cec2bc9410ed0972d37390200e915e325
Author: Stefan Richter <s.richter@...>
Date:   2017-12-04T14:12:00Z

    Preparing, test related refactorings

commit adbe507c24eda44088745159444d1ae5011e2bda
Author: Stefan Richter <s.richter@...>
Date:   2017-12-04T14:12:12Z

    Additional unit test for forwarding to/from TaskStateManager and 
StateObjectCollection.

commit 5a8b43e33918b84aec98904a77061d69ec1ff51c
Author: Stefan Richter <s.richter@...>
Date:   2017-12-05T17:46:07Z

    Introduce configuration for local recovery.

commit 0c77cf022fd10ee60c2a5ffe649f4a3cec2fb5fa
Author: Stefan Richter <s.richter@...>
Date:   2017-12-06T11:00:12Z

    Introduce snapshot strategy and some connected refactorings.

commit 48ac41b78cfede2b3bc34f0f9d61f9e2374aff3b
Author: Stefan Richter <s.richter@...>
Date:   2017-12-06T16:52:59Z

    file based local recovery for heap keyed backend

commit f4946175060b0b05b463b13b2e94f8cbd5457392
Author: Stefan Richter <s.richter@...>
Date:   2017-12-11T11:01:54Z

    [BUGFIX] Lifecycle of operator close/dispose is of w.r.t finished flag.

commit e4bcc7f4aa67a9f91874d5ae765517ab8aa9166f
Author: Stefan Richter <s.richter@...>
Date:   2017-12-11T14:08:29Z

    Integration fixup.

commit 95f60dff6407f9f5e59ed975b61ece3071c3daae
Author: Stefan Richter <s.richter@...>
Date:   2017-12-12T16:07:08Z

    introduce disposable interface

commit 37eb9b7ce99171a20c07ece1dc11b2fc3122292d
Author: Stefan Richter <s.richter@...>
Date:   2017-12-14T10:07:51Z

    Implement restore path for file-based local snapshots

commit a12c1c207e1a93db43bb4077646fbd8eb6c6b13b
Author: Stefan Richter <s.richter@...>
Date:   2017-12-14T18:04:21Z

    Cleanup organization improvements

commit 9724491fe4c8ec0b1e318e24a76db2e96541a94e
Author: Stefan Richter <s.richter@...>
Date:   2017-12-14T18:05:00Z

    RocksDB local incremental snapshots

commit e9811c0735809021af7c8175bacab3ce6c046a22
Author: Stefan Richter <s.richter@...>
Date:   2017-12-15T15:01:55Z

    Recovery path for RocksDB local incremental snapshots.

commit 588f32fd5d91db22cbf7b14c4726a64408defd2a
Author: Stefan Richter <s.richter@...>
Date:   2017-12-18T16:15:55Z

    Additional unit tests (WIP).

commit c1b5a8337944bcae3ac1fc012e8c16bc0e6265a7
Author: Stefan Richter <s.richter@...>
Date:   2017-12-30T19:23:48Z

    Abandon old OperatorStateHandles (replace by newer OperatorSubtaskState).

commit e7e602768497b3cad0567ad1edf58c4bf67d1018
Author: Stefan Richter <s.richter@...>
Date:   2018-01-02T13:21:04Z

    Additional unit tests (WIP) - 4.

commit 3fff0889dafefdb7cb0920b692ed9fdb3645d3b4
Author: Stefan Richter <s.richter@...>
Date:   2018-01-02T16:38:33Z

    Change directory structure so that we can delete local state directories 
based on allocation id in the (flip-6) future.

commit 958baaa5c118b431c18df298f3f138133ac42466
Author: Stefan Richter <s.richter@...>
Date:   2018-01-02T17:30:48Z

    Make checkstyle happy :-)

----


> Implement task-local state recovery
> -----------------------------------
>
>                 Key: FLINK-8360
>                 URL: https://issues.apache.org/jira/browse/FLINK-8360
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>             Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to