[jira] [Updated] (FLINK-33863) Compressed Operator state restore failed

2024-01-02 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-33863:
---
Fix Version/s: 1.18.2

> Compressed Operator state restore failed
> 
>
> Key: FLINK-33863
> URL: https://issues.apache.org/jira/browse/FLINK-33863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Ruibin Xing
>Assignee: Ruibin Xing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot 
> Compression and used multiple operator states and broadcast states in an 
> operator. When recovering Operator State from a Savepoint, the following 
> error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF 
> while reading stream header."
> After researching, I believe the error is due to Flink 1.18.0's support for 
> Snapshot Compression on Operator State (see 
> https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a 
> Savepoint, SnappyFramedInputStream adds a header to the beginning of the 
> data. When recovering Operator State from a Savepoint, 
> SnappyFramedInputStream verifies the header from the beginning of the data.
> Currently, when recovering Operator State with Snapshot Compression enabled, 
> the logic is as follows:
> For each OperatorStateHandle:
> 1. Verify if the current Savepoint stream's offset is the Snappy header.
> 2. Seek to the state's start offset.
> 3. Read the state's data and finally seek to the state's end offset.
> (See: 
> [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172]
>  )
> Furthermore, when there are multiple Operator States, they are not sorted 
> according to the Operator State's offset. The broadcast states will always be 
> written to the end of the savepoint. However when reading from savepoint, 
> there are no guarantee that broadcast states will be read at last.
> Therefore, if the Operator States are out of order and the final offset is 
> recovered first, the Savepoint stream will be seeked to the end, resulting in 
> an EOF error.
> I propose a solution: sort the OperatorStateHandle by offset and then recover 
> the Operator State in order. After testing, this approach resolves the issue.
> I will submit a PR. This is my first time contributing code, so any help is 
> really appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33863) Compressed Operator state restore failed

2023-12-16 Thread Ruibin Xing (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruibin Xing updated FLINK-33863:

Description: 
We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot 
Compression and used multiple operator states and broadcast states in an 
operator. When recovering Operator State from a Savepoint, the following error 
occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF while 
reading stream header."

After researching, I believe the error is due to Flink 1.18.0's support for 
Snapshot Compression on Operator State (see 
https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a Savepoint, 
SnappyFramedInputStream adds a header to the beginning of the data. When 
recovering Operator State from a Savepoint, SnappyFramedInputStream verifies 
the header from the beginning of the data.

Currently, when recovering Operator State with Snapshot Compression enabled, 
the logic is as follows:
For each OperatorStateHandle:
1. Verify if the current Savepoint stream's offset is the Snappy header.
2. Seek to the state's start offset.
3. Read the state's data and finally seek to the state's end offset.
(See: 
[https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172]
 )

Furthermore, when there are multiple Operator States, they are not sorted 
according to the Operator State's offset. The broadcast states will always be 
written to the end of the savepoint. However when reading from savepoint, there 
are no guarantee that broadcast states will be read at last.

Therefore, if the Operator States are out of order and the final offset is 
recovered first, the Savepoint stream will be seeked to the end, resulting in 
an EOF error.

I propose a solution: sort the OperatorStateHandle by offset and then recover 
the Operator State in order. After testing, this approach resolves the issue.

I will submit a PR. This is my first time contributing code, so any help is 
really appreciated.

  was:
We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot 
Compression and used multiple Operator States in an operator. When recovering 
Operator State from a Savepoint, the following error occurred: 
"org.xerial.snappy.SnappyFramedInputStream: encountered EOF while reading 
stream header."

After researching, I believe the error is due to Flink 1.18.0's support for 
Snapshot Compression on Operator State (see 
https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a Savepoint, 
SnappyFramedInputStream adds a header to the beginning of the data. When 
recovering Operator State from a Savepoint, SnappyFramedInputStream verifies 
the header from the beginning of the data.

Currently, when recovering Operator State with Snapshot Compression enabled, 
the logic is as follows:
For each OperatorStateHandle:
1. Verify if the current Savepoint stream's offset is the Snappy header.
2. Seek to the state's start offset.
3. Read the state's data and finally seek to the state's end offset.
(See: 
https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172
 )

Furthermore, when there are multiple Operator States, they are not sorted 
according to the Operator State's offset. Therefore, if the Operator States are 
out of order and the final offset is recovered first, the Savepoint stream will 
be seeked to the end, resulting in an EOF error.

I propose a solution: sort the OperatorStateHandle by offset and then recover 
the Operator State in order. After testing, this approach resolves the issue.

I will submit a PR. This is my first time contributing code, so any help is 
really appreciated.


> Compressed Operator state restore failed
> 
>
> Key: FLINK-33863
> URL: https://issues.apache.org/jira/browse/FLINK-33863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Ruibin Xing
>Priority: Major
>  Labels: pull-request-available
>
> We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot 
> Compression and used multiple operator states and broadcast states in an 
> operator. When recovering Operator State from a Savepoint, the following 
> error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF 
> while reading stream header."
> After researching, I believe the error is due to Flink 1.18.0's support for 
> Snapshot Compression on Operator State (see 
> https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a 
> Savepoint, SnappyFramedInputStream adds a header to the beginning of the 
> data. When 

[jira] [Updated] (FLINK-33863) Compressed Operator state restore failed

2023-12-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33863:
---
Labels: pull-request-available  (was: )

> Compressed Operator state restore failed
> 
>
> Key: FLINK-33863
> URL: https://issues.apache.org/jira/browse/FLINK-33863
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Ruibin Xing
>Priority: Major
>  Labels: pull-request-available
>
> We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot 
> Compression and used multiple Operator States in an operator. When recovering 
> Operator State from a Savepoint, the following error occurred: 
> "org.xerial.snappy.SnappyFramedInputStream: encountered EOF while reading 
> stream header."
> After researching, I believe the error is due to Flink 1.18.0's support for 
> Snapshot Compression on Operator State (see 
> https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a 
> Savepoint, SnappyFramedInputStream adds a header to the beginning of the 
> data. When recovering Operator State from a Savepoint, 
> SnappyFramedInputStream verifies the header from the beginning of the data.
> Currently, when recovering Operator State with Snapshot Compression enabled, 
> the logic is as follows:
> For each OperatorStateHandle:
> 1. Verify if the current Savepoint stream's offset is the Snappy header.
> 2. Seek to the state's start offset.
> 3. Read the state's data and finally seek to the state's end offset.
> (See: 
> https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172
>  )
> Furthermore, when there are multiple Operator States, they are not sorted 
> according to the Operator State's offset. Therefore, if the Operator States 
> are out of order and the final offset is recovered first, the Savepoint 
> stream will be seeked to the end, resulting in an EOF error.
> I propose a solution: sort the OperatorStateHandle by offset and then recover 
> the Operator State in order. After testing, this approach resolves the issue.
> I will submit a PR. This is my first time contributing code, so any help is 
> really appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)