[ 
https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798808#comment-17798808
 ] 

Ruibin Xing commented on FLINK-33863:
-------------------------------------

[~Yanfei Lei] Hi, I will try to illustrate this problem with an example:

 
{code:java}
       | Snappy Header 1 | State 1 | Snappy Header 2 | State 2 | Snappy Header 
3 | State 3 | 
       ^                 ^         ^                                            
 ^         ^
offset a                 b         c                                            
 d         e    

{code}
This is the layout of a snapshot of compressed operator states.

If we try to restore it in a sequence of State 1, 3, 2 instead of State 1, 2, 3:
 # We will start with offset a.
 # Snappy will verify the header 1 and everything will be ok.
 # We will seek to offset b(from the OperatorStateHandle) and restoring the 
states until we reach offset C.
 # Now we are restoring State 3,  we will verify the snappy header 2 instead of 
3.
 # Then we will seek to offset d and eventually reached offset e.
 # Then we are going to restoring State 2 and when trying to verify the header, 
an EOF error is thrown.

So there are two problems if we don't sort states by offsets before restoring 
them:
 # In step 4, we try to restoring State 3, instead the header of State 2 is 
verified.
 # There is currently no simple way to seek to the correct header position.

 

 

 

> Compressed Operator state restore failed
> ----------------------------------------
>
>                 Key: FLINK-33863
>                 URL: https://issues.apache.org/jira/browse/FLINK-33863
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.18.0
>            Reporter: Ruibin Xing
>            Assignee: Ruibin Xing
>            Priority: Major
>              Labels: pull-request-available
>
> We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot 
> Compression and used multiple operator states and broadcast states in an 
> operator. When recovering Operator State from a Savepoint, the following 
> error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF 
> while reading stream header."
> After researching, I believe the error is due to Flink 1.18.0's support for 
> Snapshot Compression on Operator State (see 
> https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a 
> Savepoint, SnappyFramedInputStream adds a header to the beginning of the 
> data. When recovering Operator State from a Savepoint, 
> SnappyFramedInputStream verifies the header from the beginning of the data.
> Currently, when recovering Operator State with Snapshot Compression enabled, 
> the logic is as follows:
> For each OperatorStateHandle:
> 1. Verify if the current Savepoint stream's offset is the Snappy header.
> 2. Seek to the state's start offset.
> 3. Read the state's data and finally seek to the state's end offset.
> (See: 
> [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172]
>  )
> Furthermore, when there are multiple Operator States, they are not sorted 
> according to the Operator State's offset. The broadcast states will always be 
> written to the end of the savepoint. However when reading from savepoint, 
> there are no guarantee that broadcast states will be read at last.
> Therefore, if the Operator States are out of order and the final offset is 
> recovered first, the Savepoint stream will be seeked to the end, resulting in 
> an EOF error.
> I propose a solution: sort the OperatorStateHandle by offset and then recover 
> the Operator State in order. After testing, this approach resolves the issue.
> I will submit a PR. This is my first time contributing code, so any help is 
> really appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to