[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16451976#comment-16451976 ]
ASF GitHub Bot commented on FLINK-9182: --------------------------------------- GitHub user makeyang opened a pull request: https://github.com/apache/flink/pull/5908 [FLINK-9182]async checkpoints for timer service ## What is the purpose of the change This PR is WIP, and is need finish unit tests which are marked as TODO. It is opened to collect feedback for a proposed solution for FLINK-9182 ## Brief change log 1. add one more state called rawkeyedstatemeta, which is help to store verion info of timerservice and tierm size 2. make timer state snapshot async ## Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (yes / (**no**) The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / **no**) The serializers: (yes / **no** / don't know) The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) The S3 file system connector: (yes / **no** / don't know) ## Documentation Does this pull request introduce a new feature? (yes / **no**) If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/makeyang/flink FLINK-9182 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5908.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5908 ---- commit 7eca3bebd2b92ffb53a2058d10df966ffd3d4875 Author: makeyang <makeyang@...> Date: 2018-04-25T09:24:26Z [FLINK-9182]async checkpoints for timer service there are some unit tests still need to fix which I marked as TODO. the package has passed the integration test on my test env so please take a look at the code to verified the init thoughts first ---- > async checkpoints for timer service > ----------------------------------- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.5.0, 1.4.2 > Reporter: makeyang > Assignee: makeyang > Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy <String,HeapInternalTimerService> tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)