[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800974#comment-17800974 ]
Martijn Visser commented on FLINK-33863: ---------------------------------------- Shouldn't this also be backported to 1.18? > Compressed Operator state restore failed > ---------------------------------------- > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.18.0 > Reporter: Ruibin Xing > Assignee: Ruibin Xing > Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)