[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480054#comment-16480054
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang commented on the issue:

https://github.com/apache/flink/pull/5908
  
@StefanRRichter  should jira be close too?


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16480049#comment-16480049
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang closed the pull request at:

https://github.com/apache/flink/pull/5908


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16479183#comment-16479183
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5908
  
@makeyang can you also please close this PR?


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478440#comment-16478440
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang commented on the issue:

https://github.com/apache/flink/pull/6019
  
@StefanRRichter & @sihuazhou thanks u guys


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478441#comment-16478441
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang closed the pull request at:

https://github.com/apache/flink/pull/6019


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477365#comment-16477365
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6019
  
Yes, the only reason I did not start with this is that I first wanted to 
wait for the completion and merge of the RocksDB timer service to have a 
complete picture.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477360#comment-16477360
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6019
  
@StefanRRichter I definitely agree with your point! The timers should be 
considered as keyed state is a beautiful way to go!


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477353#comment-16477353
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6019
  
@sihuazhou my plan was to integrate the timer service more closely with the 
keyed state backends, starting from the point that we are merging the PR for 
timers in RocksDB. I think timers should just be considered as keyed state and 
eventually become part of the keyed state backend's snapshot.

@makeyang with the above comment, that essentially means that with the work 
on the RocksDB timer service we are planning to have a larger rewrite of how 
snapshotting for timer works anyways. I have also concrete plans and work for 
doing this for the heap timer service, as well as including a efficient way to 
delete timers. All this unfortunately does not match too well with this PR. I 
suggest that this can be closed because we will address this issue for the next 
release in somewhat more holistic way.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477009#comment-16477009
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6019
  
I wonder can you introduce a `HeapState` which scoped to `key group` to 
support timer service. This way timer service is backed by keyed state backend, 
which looks like a beautiful things.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476884#comment-16476884
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

GitHub user makeyang opened a pull request:

https://github.com/apache/flink/pull/6019

[FLINK-9182]async checkpoints for timer service

## What is the purpose of the change

it is for async checkpoints for timer service
the whole idea is based on discussion in previous PR for FLINK-9182 in this 
link:https://github.com/apache/flink/pull/5908

## Brief change log

in sync part flat copy of the internal array of the priority queue
in async part build key group and write timer key group

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/makeyang/flink FLINK-9182-version2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6019.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6019


commit 82799922203bd6cb959c11336f71aee4def431d7
Author: makeyang 
Date:   2018-05-16T05:44:16Z

[FLINK-9182]async checkpoints for timer service
the whole idea is based on discussion on github: 
https://github.com/apache/flink/pull/5908
the idea is propesed by StefanRRichter as below:
"Second, I would probably suggest a simpler model for the async snapshots. 
You dropped the idea of making flat copies, but I wonder if this was premature. 
I can see that calling set.toArray(...) per keygroup could (maybe) turn out a 
bit slow because it has to potentially iterate and flatten linked entries. 
However, with async snapshots, we could get rid of the key-group partitioning 
of sets, and instead do a flat copy of the internal array of the priority 
queue. This would translate to just a single memcopy call internally, which is 
very efficient. In the async part, we can still partition the timers by 
key-group in a similar way as the copy-on-write state table does. This would 
avoid slowing down the event processing path (in fact improving it be unifying 
the sets) and also keep the approach very straight forward and less invasive."




> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> 

[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16471429#comment-16471429
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang commented on the issue:

https://github.com/apache/flink/pull/5908
  
@StefanRRichter I definitely like u ideas about flat copy priority queue in 
sync and then handler key-group thing in async. so I'll get rid of my solution 
and try to implement u solution and then give it a new pr


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470209#comment-16470209
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r187297365
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -395,36 +402,102 @@ public final OperatorSnapshotFutures 
snapshotState(long checkpointId, long times
 *
 * @param context context that provides information and means required 
for taking a snapshot
 */
-   public void snapshotState(StateSnapshotContext context) throws 
Exception {
+   public void snapshotState(StateSnapshotContext context, 
OperatorSnapshotFutures snapshotInProgress) throws Exception {
if (getKeyedStateBackend() != null) {
KeyedStateCheckpointOutputStream out;
-
+   OperatorStateCheckpointOutputStream metaOut;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed 
operator state stream for " +
getOperatorName() + '.', exception);
}
-
try {
-   KeyGroupsList allKeyGroups = 
out.getKeyGroupList();
-   for (int keyGroupIdx : allKeyGroups) {
-   out.startNewKeyGroup(keyGroupIdx);
-
-   
timeServiceManager.snapshotStateForKeyGroup(
-   new 
DataOutputViewStreamWrapper(out), keyGroupIdx);
-   }
+   metaOut = 
context.getRawKeyedOperatorStateMetaOutput();
} catch (Exception exception) {
-   throw new Exception("Could not write timer 
service of " + getOperatorName() +
-   " to checkpoint state stream.", 
exception);
-   } finally {
-   try {
-   out.close();
-   } catch (Exception closeException) {
-   LOG.warn("Could not close raw keyed 
operator state stream for {}. This " +
-   "might have prevented deleting 
some state data.", getOperatorName(), closeException);
-   }
+   throw new Exception("Could not open raw 
operator state stream for " +
+   getOperatorName() + '.', exception);
}
+   final Tuple4, Integer, TreeSet> ret = 
timeServiceManager.startOneSnapshotState();
+   final int currentSnapshotVersion = ret.f0;
+   final Map 
timerServices = ret.f1;
+   final Integer stateTableVersion = ret.f2;
+   final TreeSet snapshotVersions = ret.f3;
+   LOG.info("snapshotVersions after calling 
startOneSnapshotState:" + snapshotVersions.toString());
+   Callable snapshotTimerCallable = new 
Callable() {
+   @Override
+   public Boolean call() {
+   try {
+   KeyGroupsList allKeyGroups = 
out.getKeyGroupList();
+   metaOut.startNewPartition();
+   DataOutputViewStreamWrapper 
metaWrapper = new DataOutputViewStreamWrapper(metaOut);
+   
metaWrapper.writeInt(stateTableVersion);
+   if (snapshotVersions.size() > 
0) {
+   
metaWrapper.writeInt(snapshotVersions.size());
+   for (Integer i : 
snapshotVersions) {
+   
metaWrapper.writeInt(i);
+   }
+   }
+   else {
+   metaWrapper.writeInt(0);
+   }
+   int keyGroupCount = 
allKeyGroups.getNumberOfKeyGroups();

[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470206#comment-16470206
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r187296892
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -97,6 +105,32 @@ public String toString() {
'}';
}
 
+   public String buildHashKey() {
+   return this.hashKey;
+   }
+
+   public static String buildHashKey(String key, String namespace, long 
timestamp) {
+   return key + ":" + namespace + ":" + timestamp;
+   }
+
+   public void markDelete(int deleteVersion) {
+   synchronized (this.deleteVersion) {
--- End diff --

I agree with u that this synchronization can be replaced by volatile


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470189#comment-16470189
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r187294255
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -49,13 +50,13 @@
/**
 * Processing time timers that are currently in-flight.
 */
-   private final Set>[] processingTimeTimersByKeyGroup;
+   private final Map>[] 
processingTimeTimersByKeyGroup;
--- End diff --

@StefanRRichter 
I switch from set to map because in 
deleteProcessingTimeTimer/deleteEventTimeTimer method, I have to check if 
contais Timer with currentKey, namespace and time while I don't have version 
info.
If I use Set as interface, then I have to iterator Set to fetch exactly 
timer object, which is not convenient as Map


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16470191#comment-16470191
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r187294295
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -39,11 +40,18 @@
private final long timestamp;
private final K key;
private final N namespace;
+   private final String hashKey;
--- End diff --

same as above


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16469988#comment-16469988
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user makeyang commented on the issue:

https://github.com/apache/flink/pull/5908
  
@StefanRRichter sorry for the late answer. just take a surgery few days ago 
and come back now.
I'll take close look at you comments and then answer back.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462236#comment-16462236
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r185749819
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, 
long time) {
 
@Override
public void registerEventTimeTimer(N namespace, long time) {
-   InternalTimer timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
-   Set> timerSet = 
getEventTimeTimerSetForTimer(timer);
-   if (timerSet.add(timer)) {
+   InternalTimer timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace,
+   
this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1);
+   Map> timerMap = 
getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey());
+   InternalTimer prev = timerMap.put(timer.buildHashKey(), 
timer);
+   if (prev == null) {
--- End diff --

What happens if we find a `prev != null` that was marked as deleted? Looks 
like no timer will be inserted even though it should.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462232#comment-16462232
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r185749320
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, 
long time) {
 
@Override
public void registerEventTimeTimer(N namespace, long time) {
-   InternalTimer timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
-   Set> timerSet = 
getEventTimeTimerSetForTimer(timer);
-   if (timerSet.add(timer)) {
+   InternalTimer timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace,
+   
this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1);
+   Map> timerMap = 
getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey());
+   InternalTimer prev = timerMap.put(timer.buildHashKey(), 
timer);
+   if (prev == null) {
eventTimeTimersQueue.add(timer);
}
}
 
@Override
public void deleteProcessingTimeTimer(N namespace, long time) {
-   InternalTimer timer = new InternalTimer<>(time, (K) 
keyContext.getCurrentKey(), namespace);
-   Set> timerSet = 
getProcessingTimeTimerSetForTimer(timer);
-   if (timerSet.remove(timer)) {
+   Map> timerMap = 
getProcessingTimeTimerSetForTimer((K) keyContext.getCurrentKey());
+   String key = 
InternalTimer.buildHashKey(keyContext.getCurrentKey().toString(), 
namespace.toString(), time);
+   InternalTimer timer = timerMap.get(key);
+   if (timer != null) {
+   
timer.markDelete(this.knInternalTimeServiceManager.getStateTableVersion().intValue());
processingTimeTimersQueue.remove(timer);
}
+   this.knInternalTimeServiceManager.getReadLock().lock();
+   try {
+   if 
(this.knInternalTimeServiceManager.getSnapshotVersions().size() == 0) {
+   timerMap.remove(key);
--- End diff --

This looks like it could take a very long time (until the timer triggers) 
until a timer is truly removed when the remove happened while there was a 
snapshot ongoing? This could potentially accumulate a lot of deleted timers.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot 

[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462214#comment-16462214
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5908
  
Maybe let me add some more. First, about introducing a separate new state 
handle. Our long term plan is actually to integrate timers more closely with 
the backends, so that we can also have a timer service in RocksDB. In general, 
I would target for reducing the handles(e.g. timers could then be considered 
also as managed keyed stated) instead of adding more. Second, I would probably 
suggest a simpler model for the async snapshots. You dropped the idea of making 
flat copies, but I wonder if this was premature. I can see that maybe calling 
`set.toArray(...)` per keygroup could (maybe) turn out a bit slow because it 
has to potentially iterate and flatten linked entries. However, with async 
snapshots, we could get rid of the key-group partitioning of sets, and instead 
do a flat copy of the internal array of the priority queue. This would 
translate to just a single memcopy call internally, which is very efficient. In 
the async part, we can still partition the timers by key-group in a similar way 
as the copy-on-write state table does. This would avoid slowing down the event 
processing path (in fact improving it be unifying the sets) and also keep the 
approach very straight forward and less invasive.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462199#comment-16462199
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r185729732
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -39,11 +40,18 @@
private final long timestamp;
private final K key;
private final N namespace;
+   private final String hashKey;
--- End diff --

Why do we need this string key and not just use the normal hash code? If 
the answer is because you want to use it in `ConcurrentHashMap`, there are 
other ways to accomplish this.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462200#comment-16462200
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r185734334
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -395,36 +402,102 @@ public final OperatorSnapshotFutures 
snapshotState(long checkpointId, long times
 *
 * @param context context that provides information and means required 
for taking a snapshot
 */
-   public void snapshotState(StateSnapshotContext context) throws 
Exception {
+   public void snapshotState(StateSnapshotContext context, 
OperatorSnapshotFutures snapshotInProgress) throws Exception {
if (getKeyedStateBackend() != null) {
KeyedStateCheckpointOutputStream out;
-
+   OperatorStateCheckpointOutputStream metaOut;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed 
operator state stream for " +
getOperatorName() + '.', exception);
}
-
try {
-   KeyGroupsList allKeyGroups = 
out.getKeyGroupList();
-   for (int keyGroupIdx : allKeyGroups) {
-   out.startNewKeyGroup(keyGroupIdx);
-
-   
timeServiceManager.snapshotStateForKeyGroup(
-   new 
DataOutputViewStreamWrapper(out), keyGroupIdx);
-   }
+   metaOut = 
context.getRawKeyedOperatorStateMetaOutput();
} catch (Exception exception) {
-   throw new Exception("Could not write timer 
service of " + getOperatorName() +
-   " to checkpoint state stream.", 
exception);
-   } finally {
-   try {
-   out.close();
-   } catch (Exception closeException) {
-   LOG.warn("Could not close raw keyed 
operator state stream for {}. This " +
-   "might have prevented deleting 
some state data.", getOperatorName(), closeException);
-   }
+   throw new Exception("Could not open raw 
operator state stream for " +
+   getOperatorName() + '.', exception);
}
+   final Tuple4, Integer, TreeSet> ret = 
timeServiceManager.startOneSnapshotState();
+   final int currentSnapshotVersion = ret.f0;
+   final Map 
timerServices = ret.f1;
+   final Integer stateTableVersion = ret.f2;
+   final TreeSet snapshotVersions = ret.f3;
+   LOG.info("snapshotVersions after calling 
startOneSnapshotState:" + snapshotVersions.toString());
+   Callable snapshotTimerCallable = new 
Callable() {
+   @Override
+   public Boolean call() {
+   try {
+   KeyGroupsList allKeyGroups = 
out.getKeyGroupList();
+   metaOut.startNewPartition();
+   DataOutputViewStreamWrapper 
metaWrapper = new DataOutputViewStreamWrapper(metaOut);
+   
metaWrapper.writeInt(stateTableVersion);
+   if (snapshotVersions.size() > 
0) {
+   
metaWrapper.writeInt(snapshotVersions.size());
+   for (Integer i : 
snapshotVersions) {
+   
metaWrapper.writeInt(i);
+   }
+   }
+   else {
+   metaWrapper.writeInt(0);
+   }
+   int keyGroupCount = 

[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462197#comment-16462197
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r185729269
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
 ---
@@ -49,13 +50,13 @@
/**
 * Processing time timers that are currently in-flight.
 */
-   private final Set>[] processingTimeTimersByKeyGroup;
+   private final Map>[] 
processingTimeTimersByKeyGroup;
--- End diff --

It seems that you switch from set to map because you want to use 
`ConcurrentHashMap`, but you can simply have a set that is backed by 
`ConcurrentHashMap` either from `Collections.newSetFromMap(...)` or 
`ConcurrentHashMap.newKeySet()`


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462198#comment-16462198
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5908#discussion_r185730033
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java
 ---
@@ -97,6 +105,32 @@ public String toString() {
'}';
}
 
+   public String buildHashKey() {
+   return this.hashKey;
+   }
+
+   public static String buildHashKey(String key, String namespace, long 
timestamp) {
+   return key + ":" + namespace + ":" + timestamp;
+   }
+
+   public void markDelete(int deleteVersion) {
+   synchronized (this.deleteVersion) {
--- End diff --

What is the use of this synchronization on a non-final field that cannot be 
achieved with a volatile field?


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-04-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451976#comment-16451976
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

GitHub user makeyang opened a pull request:

https://github.com/apache/flink/pull/5908

[FLINK-9182]async checkpoints for timer service

## What is the purpose of the change
This PR is WIP, and is need finish unit tests which are marked as TODO.
It is opened to collect feedback for a proposed solution for FLINK-9182

## Brief change log
1. add one more state called rawkeyedstatemeta, which is help to store 
verion info of timerservice and tierm size
2. make timer state snapshot async

## Does this pull request potentially affect one of the following parts:
Dependencies (does it add or upgrade a dependency): (yes / (**no**)
The public API, i.e., is any changed class annotated with 
@Public(Evolving): (yes / **no**)
The serializers: (yes / **no** / don't know)
The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
The S3 file system connector: (yes / **no** / don't know)

## Documentation
Does this pull request introduce a new feature? (yes / **no**)
If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/makeyang/flink FLINK-9182

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5908.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5908


commit 7eca3bebd2b92ffb53a2058d10df966ffd3d4875
Author: makeyang 
Date:   2018-04-25T09:24:26Z

[FLINK-9182]async checkpoints for timer service
there are some unit tests still need to fix which I marked as TODO.
the package has passed the integration test on my test env so please take a 
look at the code to verified the init thoughts first




> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from