[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-11-11 Thread Xiang Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230306#comment-17230306
 ] 

Xiang Gao commented on FLINK-19300:
---

Created a PR. Seems like in case of non-versioned payload, we would push back 
those read bytes. We might not know the correct number of bytes to push back if 
we use DataInputView.readFully() while catching EOF. Did something similar to 
DataInputView.readFully(), but keep the number of bytes so that we can push 
back when necessary.

> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0
>Reporter: Xiang Gao
>Assignee: Xiang Gao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-11-11 Thread Xiang Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17230163#comment-17230163
 ] 

Xiang Gao commented on FLINK-19300:
---

Hi [~tzulitai], 

Yeah, your assumption is right on the behavior of the issue. The 
InternalTimerServiceSerializationProxy will silently skip timers instead of 
throwing error. 

Will submit a PR asap.

Thanks,

> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.8.0
>Reporter: Xiang Gao
>Assignee: Xiang Gao
>Priority: Blocker
> Fix For: 1.12.0, 1.11.3
>
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-11-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229796#comment-17229796
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-19300:
-

Just a comment on the severity of the issue:

It looks like timer loss is only possible if somehow, the key groups contain a 
{{0}} at the very beginning of the stream. This seems to be the only possible 
case that would lead to the {{InternalTimerServiceSerializationProxy}} silently 
skipping the rest of the reads.

> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Xiang Gao
>Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-11-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229793#comment-17229793
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-19300:
-

As for priority of this issue:

The bug looks like it's been there for quite a while already across many 
versions, but I'd suggest to list it as a blocker for 1.12.0 and 1.11.3.

[~xianggao] Please do submit a PR for this, I'll try to review it as soon as 
possible.

> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Xiang Gao
>Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-11-10 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229790#comment-17229790
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-19300:
-

This looks like a real issue, the typical {{read}} v.s. {{readFully}} problem.

This read path should only occur for the case where users are using RocksDB 
backends + heap-based timers (using the heap backend should be fine, would not 
bump into this).

[~xianggao] to help me understand the full problem: in your scenarios, I'm 
assuming that the timer losses are caused by somehow the 
{{InternalTimerServiceSerializationProxy}} silently skips reading the timers, 
instead of some {{IOException}} due to incorrect read attempts (and eventually 
fails the restore, instead of a timer loss). Could you clarify if my assumption 
is correct?

As for the fix, I would suggest to try to reuse the 
`java.io.DataInput#readFully` method instead or re-implementing it:
{code}
byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];

DataInputView inputView = new DataInputViewStreamWrapper(inputStream);
inputView.readFully(tmp);
{code}

You can catch {{EOFException}} to determine if end of stream is reached.



> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Xiang Gao
>Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-11-10 Thread david weinstein (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229782#comment-17229782
 ] 

david weinstein commented on FLINK-19300:
-

We're using Flink version 1.8.

> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Xiang Gao
>Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-11-10 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229780#comment-17229780
 ] 

Robert Metzger commented on FLINK-19300:


Thanks a lot for reporting this. Which Flink version are you using?

> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Xiang Gao
>Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-09-28 Thread Xiang Gao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17203504#comment-17203504
 ] 

Xiang Gao commented on FLINK-19300:
---

[~klion26] Thanks for taking a look! I'm not sure about the criteria for 
"blocker" issues. This issue seems to only exist in heap based timers, but 
losing timer is a bit dangerous.

Wonder what's our process on patching this kind of issue. May I submit a PR for 
this?

> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Xiang Gao
>Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-19300) Timer loss after restoring from savepoint

2020-09-21 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17199331#comment-17199331
 ] 

Congxian Qiu(klion26) commented on FLINK-19300:
---

[~xianggao] thanks for reporting the issue, I think you analysis is right, 
{{InputStream#read}} can't guarantee to fully read the byte array.

As this may loss the timer I'm not sure this need to be a blocker or not?

> Timer loss after restoring from savepoint
> -
>
> Key: FLINK-19300
> URL: https://issues.apache.org/jira/browse/FLINK-19300
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Reporter: Xiang Gao
>Priority: Critical
>
> While using heap-based timers, we are seeing occasional timer loss after 
> restoring program from savepoint, especially when using a remote savepoint 
> storage (s3). 
> After some investigation, the issue seems to be related to [this line in 
> deserialization|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java#L65].
>  When trying to check the VERSIONED_IDENTIFIER, the input stream may not 
> guarantee filling the byte array, causing timers to be dropped for the 
> affected key group.
> Should keep reading until expected number of bytes are actually read or if 
> end of the stream has been reached. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)