[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16462197#comment-16462197 ]
ASF GitHub Bot commented on FLINK-9182: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185729269 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -49,13 +50,13 @@ /** * Processing time timers that are currently in-flight. */ - private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup; + private final Map<String, InternalTimer<K, N>>[] processingTimeTimersByKeyGroup; --- End diff -- It seems that you switch from set to map because you want to use `ConcurrentHashMap`, but you can simply have a set that is backed by `ConcurrentHashMap` either from `Collections.newSetFromMap(...)` or `ConcurrentHashMap.newKeySet()` > async checkpoints for timer service > ----------------------------------- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.5.0, 1.4.2 > Reporter: makeyang > Assignee: makeyang > Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy <String,HeapInternalTimerService> tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)