Tzu-Li (Gordon) Tai created FLINK-20189:
-------------------------------------------

             Summary: Restored feedback events may be silently dropped if per 
key-group header bytes were not fully read
                 Key: FLINK-20189
                 URL: https://issues.apache.org/jira/browse/FLINK-20189
             Project: Flink
          Issue Type: Task
          Components: Stateful Functions
    Affects Versions: statefun-2.2.1
            Reporter: Tzu-Li (Gordon) Tai
            Assignee: Tzu-Li (Gordon) Tai
             Fix For: statefun-2.3.0, statefun-2.2.2


The attempt to read the per key-group header bytes here does not guarantee the 
header bytes are fully-read:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L163

What could happen is the following:
* Say the input stream actually has the header bytes written in there
* Less then {{HEADER_BYTES.length}} number of bytes was read into the read 
buffer, in the above reference code line.
* The {{if (bytesRead > 0 && !Arrays.equals(header, HEADER_BYTES))}} check 
would be true, because the read byte array != the expected header bytes.
* We would mistakenly think that the header bytes are not in the input stream, 
and pushback. i.e. the header bytes were not being skipped, and the following 
reads would see the header bytes first.
* Most importantly, since the header bytes are not being skipped in this case, 
the {{STATEFUN_VERSION}} (which is {{0}}) is being incorrectly read by 
{{KeyGroupStream.readFrom(...)}} as the number of feedback elements to read: 
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java#L57
* The end result of all of this is in this scenario: some checkpointed feedback 
events would be silently dropped.

Although it is hard to say how possible this would happen in reality, and would 
also depend on the actual implementation of the {{InputStream}}, from the 
general contracts of {{InputStream#read(byte[])}} this is definitely something 
that should be addressed.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to