[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230306#comment-17230306 ] Xiang Gao commented on FLINK-19300: --- Created a PR. Seems like in case of non-versioned payload, we would push back those read bytes. We might not know the correct number of bytes to push back if we use DataInputView.readFully() while catching EOF. Did something similar to DataInputView.readFully(), but keep the number of bytes so that we can push back when necessary. > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0 >Reporter: Xiang Gao >Assignee: Xiang Gao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.12.0, 1.11.3 > > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230163#comment-17230163 ] Xiang Gao commented on FLINK-19300: --- Hi [~tzulitai], Yeah, your assumption is right on the behavior of the issue. The InternalTimerServiceSerializationProxy will silently skip timers instead of throwing error. Will submit a PR asap. Thanks, > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.8.0 >Reporter: Xiang Gao >Assignee: Xiang Gao >Priority: Blocker > Fix For: 1.12.0, 1.11.3 > > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229796#comment-17229796 ] Tzu-Li (Gordon) Tai commented on FLINK-19300: - Just a comment on the severity of the issue: It looks like timer loss is only possible if somehow, the key groups contain a {{0}} at the very beginning of the stream. This seems to be the only possible case that would lead to the {{InternalTimerServiceSerializationProxy}} silently skipping the rest of the reads. > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Xiang Gao >Priority: Critical > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229793#comment-17229793 ] Tzu-Li (Gordon) Tai commented on FLINK-19300: - As for priority of this issue: The bug looks like it's been there for quite a while already across many versions, but I'd suggest to list it as a blocker for 1.12.0 and 1.11.3. [~xianggao] Please do submit a PR for this, I'll try to review it as soon as possible. > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Xiang Gao >Priority: Critical > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229790#comment-17229790 ] Tzu-Li (Gordon) Tai commented on FLINK-19300: - This looks like a real issue, the typical {{read}} v.s. {{readFully}} problem. This read path should only occur for the case where users are using RocksDB backends + heap-based timers (using the heap backend should be fine, would not bump into this). [~xianggao] to help me understand the full problem: in your scenarios, I'm assuming that the timer losses are caused by somehow the {{InternalTimerServiceSerializationProxy}} silently skips reading the timers, instead of some {{IOException}} due to incorrect read attempts (and eventually fails the restore, instead of a timer loss). Could you clarify if my assumption is correct? As for the fix, I would suggest to try to reuse the `java.io.DataInput#readFully` method instead or re-implementing it: {code} byte[] tmp = new byte[VERSIONED_IDENTIFIER.length]; DataInputView inputView = new DataInputViewStreamWrapper(inputStream); inputView.readFully(tmp); {code} You can catch {{EOFException}} to determine if end of stream is reached. > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Xiang Gao >Priority: Critical > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229782#comment-17229782 ] david weinstein commented on FLINK-19300: - We're using Flink version 1.8. > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Xiang Gao >Priority: Critical > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229780#comment-17229780 ] Robert Metzger commented on FLINK-19300: Thanks a lot for reporting this. Which Flink version are you using? > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Xiang Gao >Priority: Critical > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203504#comment-17203504 ] Xiang Gao commented on FLINK-19300: --- [~klion26] Thanks for taking a look! I'm not sure about the criteria for "blocker" issues. This issue seems to only exist in heap based timers, but losing timer is a bit dangerous. Wonder what's our process on patching this kind of issue. May I submit a PR for this? > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Xiang Gao >Priority: Critical > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint
[ https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199331#comment-17199331 ] Congxian Qiu(klion26) commented on FLINK-19300: --- [~xianggao] thanks for reporting the issue, I think you analysis is right, {{InputStream#read}} can't guarantee to fully read the byte array. As this may loss the timer I'm not sure this need to be a blocker or not? > Timer loss after restoring from savepoint > - > > Key: FLINK-19300 > URL: https://issues.apache.org/jira/browse/FLINK-19300 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Reporter: Xiang Gao >Priority: Critical > > While using heap-based timers, we are seeing occasional timer loss after > restoring program from savepoint, especially when using a remote savepoint > storage (s3). > After some investigation, the issue seems to be related to [this line in > deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65]. > When trying to check the VERSIONED_IDENTIFIER, the input stream may not > guarantee filling the byte array, causing timers to be dropped for the > affected key group. > Should keep reading until expected number of bytes are actually read or if > end of the stream has been reached. -- This message was sent by Atlassian Jira (v8.3.4#803005)