[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798808#comment-17798808 ]
Ruibin Xing commented on FLINK-33863: ------------------------------------- [~Yanfei Lei] Hi, I will try to illustrate this problem with an example: {code:java} | Snappy Header 1 | State 1 | Snappy Header 2 | State 2 | Snappy Header 3 | State 3 | ^ ^ ^ ^ ^ offset a b c d e {code} This is the layout of a snapshot of compressed operator states. If we try to restore it in a sequence of State 1, 3, 2 instead of State 1, 2, 3: # We will start with offset a. # Snappy will verify the header 1 and everything will be ok. # We will seek to offset b(from the OperatorStateHandle) and restoring the states until we reach offset C. # Now we are restoring State 3, we will verify the snappy header 2 instead of 3. # Then we will seek to offset d and eventually reached offset e. # Then we are going to restoring State 2 and when trying to verify the header, an EOF error is thrown. So there are two problems if we don't sort states by offsets before restoring them: # In step 4, we try to restoring State 3, instead the header of State 2 is verified. # There is currently no simple way to seek to the correct header position. > Compressed Operator state restore failed > ---------------------------------------- > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.18.0 > Reporter: Ruibin Xing > Assignee: Ruibin Xing > Priority: Major > Labels: pull-request-available > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)