Ruibin Xing created FLINK-33863:
-----------------------------------
Summary: Compressed Operator state restore failed
Key: FLINK-33863
URL: https://issues.apache.org/jira/browse/FLINK-33863
Project: Flink
Issue Type: Bug
Components: Runtime / State Backends
Affects Versions: 1.18.0
Reporter: Ruibin Xing
We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot
Compression and used multiple Operator States in an operator. When recovering
Operator State from a Savepoint, the following error occurred:
"org.xerial.snappy.SnappyFramedInputStream: encountered EOF while reading
stream header."
After researching, I believe the error is due to Flink 1.18.0's support for
Snapshot Compression on Operator State (see
https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a Savepoint,
SnappyFramedInputStream adds a header to the beginning of the data. When
recovering Operator State from a Savepoint, SnappyFramedInputStream verifies
the header from the beginning of the data.
Currently, when recovering Operator State with Snapshot Compression enabled,
the logic is as follows:
For each OperatorStateHandle:
1. Verify if the current Savepoint stream's offset is the Snappy header.
2. Seek to the state's start offset.
3. Read the state's data and finally seek to the state's end offset.
(See:
https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172
)
Furthermore, when there are multiple Operator States, they are not sorted
according to the Operator State's offset. Therefore, if the Operator States are
out of order and the final offset is recovered first, the Savepoint stream will
be seeked to the end, resulting in an EOF error.
I propose a solution: sort the OperatorStateHandle by offset and then recover
the Operator State in order. After testing, this approach resolves the issue.
I will submit a PR. This is my first time contributing code, so any help is
really appreciated.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)