[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480054#comment-16480054 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang commented on the issue: https://github.com/apache/flink/pull/5908 @StefanRRichter should jira be close too? > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480049#comment-16480049 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang closed the pull request at: https://github.com/apache/flink/pull/5908 > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16479183#comment-16479183 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5908 @makeyang can you also please close this PR? > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478440#comment-16478440 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang commented on the issue: https://github.com/apache/flink/pull/6019 @StefanRRichter & @sihuazhou thanks u guys > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478441#comment-16478441 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang closed the pull request at: https://github.com/apache/flink/pull/6019 > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477365#comment-16477365 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6019 Yes, the only reason I did not start with this is that I first wanted to wait for the completion and merge of the RocksDB timer service to have a complete picture. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477360#comment-16477360 ] ASF GitHub Bot commented on FLINK-9182: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6019 @StefanRRichter I definitely agree with your point! The timers should be considered as keyed state is a beautiful way to go! > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477353#comment-16477353 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6019 @sihuazhou my plan was to integrate the timer service more closely with the keyed state backends, starting from the point that we are merging the PR for timers in RocksDB. I think timers should just be considered as keyed state and eventually become part of the keyed state backend's snapshot. @makeyang with the above comment, that essentially means that with the work on the RocksDB timer service we are planning to have a larger rewrite of how snapshotting for timer works anyways. I have also concrete plans and work for doing this for the heap timer service, as well as including a efficient way to delete timers. All this unfortunately does not match too well with this PR. I suggest that this can be closed because we will address this issue for the next release in somewhat more holistic way. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477009#comment-16477009 ] ASF GitHub Bot commented on FLINK-9182: --- Github user sihuazhou commented on the issue: https://github.com/apache/flink/pull/6019 I wonder can you introduce a `HeapState` which scoped to `key group` to support timer service. This way timer service is backed by keyed state backend, which looks like a beautiful things. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476884#comment-16476884 ] ASF GitHub Bot commented on FLINK-9182: --- GitHub user makeyang opened a pull request: https://github.com/apache/flink/pull/6019 [FLINK-9182]async checkpoints for timer service ## What is the purpose of the change it is for async checkpoints for timer service the whole idea is based on discussion in previous PR for FLINK-9182 in this link:https://github.com/apache/flink/pull/5908 ## Brief change log in sync part flat copy of the internal array of the priority queue in async part build key group and write timer key group ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/makeyang/flink FLINK-9182-version2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6019.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6019 commit 82799922203bd6cb959c11336f71aee4def431d7 Author: makeyangDate: 2018-05-16T05:44:16Z [FLINK-9182]async checkpoints for timer service the whole idea is based on discussion on github: https://github.com/apache/flink/pull/5908 the idea is propesed by StefanRRichter as below: "Second, I would probably suggest a simpler model for the async snapshots. You dropped the idea of making flat copies, but I wonder if this was premature. I can see that calling set.toArray(...) per keygroup could (maybe) turn out a bit slow because it has to potentially iterate and flatten linked entries. However, with async snapshots, we could get rid of the key-group partitioning of sets, and instead do a flat copy of the internal array of the priority queue. This would translate to just a single memcopy call internally, which is very efficient. In the async part, we can still partition the timers by key-group in a similar way as the copy-on-write state table does. This would avoid slowing down the event processing path (in fact improving it be unifying the sets) and also keep the approach very straight forward and less invasive." > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, >
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471429#comment-16471429 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang commented on the issue: https://github.com/apache/flink/pull/5908 @StefanRRichter I definitely like u ideas about flat copy priority queue in sync and then handler key-group thing in async. so I'll get rid of my solution and try to implement u solution and then give it a new pr > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470209#comment-16470209 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r187297365 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times * * @param context context that provides information and means required for taking a snapshot */ - public void snapshotState(StateSnapshotContext context) throws Exception { + public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception { if (getKeyedStateBackend() != null) { KeyedStateCheckpointOutputStream out; - + OperatorStateCheckpointOutputStream metaOut; try { out = context.getRawKeyedOperatorStateOutput(); } catch (Exception exception) { throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', exception); } - try { - KeyGroupsList allKeyGroups = out.getKeyGroupList(); - for (int keyGroupIdx : allKeyGroups) { - out.startNewKeyGroup(keyGroupIdx); - - timeServiceManager.snapshotStateForKeyGroup( - new DataOutputViewStreamWrapper(out), keyGroupIdx); - } + metaOut = context.getRawKeyedOperatorStateMetaOutput(); } catch (Exception exception) { - throw new Exception("Could not write timer service of " + getOperatorName() + - " to checkpoint state stream.", exception); - } finally { - try { - out.close(); - } catch (Exception closeException) { - LOG.warn("Could not close raw keyed operator state stream for {}. This " + - "might have prevented deleting some state data.", getOperatorName(), closeException); - } + throw new Exception("Could not open raw operator state stream for " + + getOperatorName() + '.', exception); } + final Tuple4, Integer, TreeSet> ret = timeServiceManager.startOneSnapshotState(); + final int currentSnapshotVersion = ret.f0; + final Map timerServices = ret.f1; + final Integer stateTableVersion = ret.f2; + final TreeSet snapshotVersions = ret.f3; + LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString()); + Callable snapshotTimerCallable = new Callable() { + @Override + public Boolean call() { + try { + KeyGroupsList allKeyGroups = out.getKeyGroupList(); + metaOut.startNewPartition(); + DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut); + metaWrapper.writeInt(stateTableVersion); + if (snapshotVersions.size() > 0) { + metaWrapper.writeInt(snapshotVersions.size()); + for (Integer i : snapshotVersions) { + metaWrapper.writeInt(i); + } + } + else { + metaWrapper.writeInt(0); + } + int keyGroupCount = allKeyGroups.getNumberOfKeyGroups();
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470206#comment-16470206 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r187296892 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java --- @@ -97,6 +105,32 @@ public String toString() { '}'; } + public String buildHashKey() { + return this.hashKey; + } + + public static String buildHashKey(String key, String namespace, long timestamp) { + return key + ":" + namespace + ":" + timestamp; + } + + public void markDelete(int deleteVersion) { + synchronized (this.deleteVersion) { --- End diff -- I agree with u that this synchronization can be replaced by volatile > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470189#comment-16470189 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r187294255 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -49,13 +50,13 @@ /** * Processing time timers that are currently in-flight. */ - private final Set>[] processingTimeTimersByKeyGroup; + private final Map >[] processingTimeTimersByKeyGroup; --- End diff -- @StefanRRichter I switch from set to map because in deleteProcessingTimeTimer/deleteEventTimeTimer method, I have to check if contais Timer with currentKey, namespace and time while I don't have version info. If I use Set as interface, then I have to iterator Set to fetch exactly timer object, which is not convenient as Map > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470191#comment-16470191 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r187294295 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java --- @@ -39,11 +40,18 @@ private final long timestamp; private final K key; private final N namespace; + private final String hashKey; --- End diff -- same as above > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469988#comment-16469988 ] ASF GitHub Bot commented on FLINK-9182: --- Github user makeyang commented on the issue: https://github.com/apache/flink/pull/5908 @StefanRRichter sorry for the late answer. just take a surgery few days ago and come back now. I'll take close look at you comments and then answer back. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462236#comment-16462236 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185749819 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, long time) { @Override public void registerEventTimeTimer(N namespace, long time) { - InternalTimertimer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set > timerSet = getEventTimeTimerSetForTimer(timer); - if (timerSet.add(timer)) { + InternalTimer timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace, + this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1); + Map > timerMap = getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey()); + InternalTimer prev = timerMap.put(timer.buildHashKey(), timer); + if (prev == null) { --- End diff -- What happens if we find a `prev != null` that was marked as deleted? Looks like no timer will be inserted even though it should. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462232#comment-16462232 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185749320 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, long time) { @Override public void registerEventTimeTimer(N namespace, long time) { - InternalTimertimer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set > timerSet = getEventTimeTimerSetForTimer(timer); - if (timerSet.add(timer)) { + InternalTimer timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace, + this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1); + Map > timerMap = getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey()); + InternalTimer prev = timerMap.put(timer.buildHashKey(), timer); + if (prev == null) { eventTimeTimersQueue.add(timer); } } @Override public void deleteProcessingTimeTimer(N namespace, long time) { - InternalTimer timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set > timerSet = getProcessingTimeTimerSetForTimer(timer); - if (timerSet.remove(timer)) { + Map > timerMap = getProcessingTimeTimerSetForTimer((K) keyContext.getCurrentKey()); + String key = InternalTimer.buildHashKey(keyContext.getCurrentKey().toString(), namespace.toString(), time); + InternalTimer timer = timerMap.get(key); + if (timer != null) { + timer.markDelete(this.knInternalTimeServiceManager.getStateTableVersion().intValue()); processingTimeTimersQueue.remove(timer); } + this.knInternalTimeServiceManager.getReadLock().lock(); + try { + if (this.knInternalTimeServiceManager.getSnapshotVersions().size() == 0) { + timerMap.remove(key); --- End diff -- This looks like it could take a very long time (until the timer triggers) until a timer is truly removed when the remove happened while there was a snapshot ongoing? This could potentially accumulate a lot of deleted timers. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462214#comment-16462214 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5908 Maybe let me add some more. First, about introducing a separate new state handle. Our long term plan is actually to integrate timers more closely with the backends, so that we can also have a timer service in RocksDB. In general, I would target for reducing the handles(e.g. timers could then be considered also as managed keyed stated) instead of adding more. Second, I would probably suggest a simpler model for the async snapshots. You dropped the idea of making flat copies, but I wonder if this was premature. I can see that maybe calling `set.toArray(...)` per keygroup could (maybe) turn out a bit slow because it has to potentially iterate and flatten linked entries. However, with async snapshots, we could get rid of the key-group partitioning of sets, and instead do a flat copy of the internal array of the priority queue. This would translate to just a single memcopy call internally, which is very efficient. In the async part, we can still partition the timers by key-group in a similar way as the copy-on-write state table does. This would avoid slowing down the event processing path (in fact improving it be unifying the sets) and also keep the approach very straight forward and less invasive. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462199#comment-16462199 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185729732 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java --- @@ -39,11 +40,18 @@ private final long timestamp; private final K key; private final N namespace; + private final String hashKey; --- End diff -- Why do we need this string key and not just use the normal hash code? If the answer is because you want to use it in `ConcurrentHashMap`, there are other ways to accomplish this. > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462200#comment-16462200 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185734334 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times * * @param context context that provides information and means required for taking a snapshot */ - public void snapshotState(StateSnapshotContext context) throws Exception { + public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception { if (getKeyedStateBackend() != null) { KeyedStateCheckpointOutputStream out; - + OperatorStateCheckpointOutputStream metaOut; try { out = context.getRawKeyedOperatorStateOutput(); } catch (Exception exception) { throw new Exception("Could not open raw keyed operator state stream for " + getOperatorName() + '.', exception); } - try { - KeyGroupsList allKeyGroups = out.getKeyGroupList(); - for (int keyGroupIdx : allKeyGroups) { - out.startNewKeyGroup(keyGroupIdx); - - timeServiceManager.snapshotStateForKeyGroup( - new DataOutputViewStreamWrapper(out), keyGroupIdx); - } + metaOut = context.getRawKeyedOperatorStateMetaOutput(); } catch (Exception exception) { - throw new Exception("Could not write timer service of " + getOperatorName() + - " to checkpoint state stream.", exception); - } finally { - try { - out.close(); - } catch (Exception closeException) { - LOG.warn("Could not close raw keyed operator state stream for {}. This " + - "might have prevented deleting some state data.", getOperatorName(), closeException); - } + throw new Exception("Could not open raw operator state stream for " + + getOperatorName() + '.', exception); } + final Tuple4, Integer, TreeSet> ret = timeServiceManager.startOneSnapshotState(); + final int currentSnapshotVersion = ret.f0; + final Map timerServices = ret.f1; + final Integer stateTableVersion = ret.f2; + final TreeSet snapshotVersions = ret.f3; + LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString()); + Callable snapshotTimerCallable = new Callable() { + @Override + public Boolean call() { + try { + KeyGroupsList allKeyGroups = out.getKeyGroupList(); + metaOut.startNewPartition(); + DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut); + metaWrapper.writeInt(stateTableVersion); + if (snapshotVersions.size() > 0) { + metaWrapper.writeInt(snapshotVersions.size()); + for (Integer i : snapshotVersions) { + metaWrapper.writeInt(i); + } + } + else { + metaWrapper.writeInt(0); + } + int keyGroupCount =
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462197#comment-16462197 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185729269 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java --- @@ -49,13 +50,13 @@ /** * Processing time timers that are currently in-flight. */ - private final Set>[] processingTimeTimersByKeyGroup; + private final Map >[] processingTimeTimersByKeyGroup; --- End diff -- It seems that you switch from set to map because you want to use `ConcurrentHashMap`, but you can simply have a set that is backed by `ConcurrentHashMap` either from `Collections.newSetFromMap(...)` or `ConcurrentHashMap.newKeySet()` > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462198#comment-16462198 ] ASF GitHub Bot commented on FLINK-9182: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5908#discussion_r185730033 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java --- @@ -97,6 +105,32 @@ public String toString() { '}'; } + public String buildHashKey() { + return this.hashKey; + } + + public static String buildHashKey(String key, String namespace, long timestamp) { + return key + ":" + namespace + ":" + timestamp; + } + + public void markDelete(int deleteVersion) { + synchronized (this.deleteVersion) { --- End diff -- What is the use of this synchronization on a non-final field that cannot be achieved with a volatile field? > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copytuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from timerservice. after this, check if snapshotVersions size is 0 > (which means there is no running snapshot) and if true, delete timer .the > other place to delete is in snapshot timer's iterat: when timer's delete > version is less than min value of snapshotVersions, which means the timer is > deleted and no running snapshot should keep it. > ## some more additions: processingTimeTimers and eventTimeTimers for each > group used to be hashset and now it is changed to concurrenthashmap with > key+namesapce+timestamp as its hash key. > # related mail list thread > ## > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html > # github pull request > ## //coming soon -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451976#comment-16451976 ] ASF GitHub Bot commented on FLINK-9182: --- GitHub user makeyang opened a pull request: https://github.com/apache/flink/pull/5908 [FLINK-9182]async checkpoints for timer service ## What is the purpose of the change This PR is WIP, and is need finish unit tests which are marked as TODO. It is opened to collect feedback for a proposed solution for FLINK-9182 ## Brief change log 1. add one more state called rawkeyedstatemeta, which is help to store verion info of timerservice and tierm size 2. make timer state snapshot async ## Does this pull request potentially affect one of the following parts: Dependencies (does it add or upgrade a dependency): (yes / (**no**) The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / **no**) The serializers: (yes / **no** / don't know) The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) The S3 file system connector: (yes / **no** / don't know) ## Documentation Does this pull request introduce a new feature? (yes / **no**) If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/makeyang/flink FLINK-9182 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5908.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5908 commit 7eca3bebd2b92ffb53a2058d10df966ffd3d4875 Author: makeyangDate: 2018-04-25T09:24:26Z [FLINK-9182]async checkpoints for timer service there are some unit tests still need to fix which I marked as TODO. the package has passed the integration test on my test env so please take a look at the code to verified the init thoughts first > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose delete version is not -1 and is bigger than > or equals snapshot version, serialized it. otherwise, it will not be > serialized by this snapshot. > ## when everything is serialized, remove snapshot version in > snapshotVersions, which is still in another thread and this action is guarded > by write lock. > ## last thing: timer physical deletion. 2 places to physically delete > timers: each time when timer is deleted in timerservice, it is marked delete > for giving it a delete verison equals to stateTableVersion without physically > delete it from