[jira] [Commented] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801730#comment-17801730 ] Martijn Visser commented on FLINK-33863: Merged via apache/flink:release-1.18 f9383e6780ae8beb995d9bbd58a8484d19900f55 > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Assignee: Ruibin Xing >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17801086#comment-17801086 ] Ruibin Xing commented on FLINK-33863: - [~martijnvisser] I think so. I have created a backport PR for this. https://github.com/apache/flink/pull/24008 > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Assignee: Ruibin Xing >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800974#comment-17800974 ] Martijn Visser commented on FLINK-33863: Shouldn't this also be backported to 1.18? > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Assignee: Ruibin Xing >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0 > > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17800702#comment-17800702 ] Yanfei Lei commented on FLINK-33863: Merged via [d415d93bbf9620ba985136469107edd8c6e31cc6|https://github.com/apache/flink/commit/d415d93bbf9620ba985136469107edd8c6e31cc6] > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Assignee: Ruibin Xing >Priority: Major > Labels: pull-request-available > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798808#comment-17798808 ] Ruibin Xing commented on FLINK-33863: - [~Yanfei Lei] Hi, I will try to illustrate this problem with an example: {code:java} | Snappy Header 1 | State 1 | Snappy Header 2 | State 2 | Snappy Header 3 | State 3 | ^ ^ ^ ^ ^ offset a b c d e {code} This is the layout of a snapshot of compressed operator states. If we try to restore it in a sequence of State 1, 3, 2 instead of State 1, 2, 3: # We will start with offset a. # Snappy will verify the header 1 and everything will be ok. # We will seek to offset b(from the OperatorStateHandle) and restoring the states until we reach offset C. # Now we are restoring State 3, we will verify the snappy header 2 instead of 3. # Then we will seek to offset d and eventually reached offset e. # Then we are going to restoring State 2 and when trying to verify the header, an EOF error is thrown. So there are two problems if we don't sort states by offsets before restoring them: # In step 4, we try to restoring State 3, instead the header of State 2 is verified. # There is currently no simple way to seek to the correct header position. > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Assignee: Ruibin Xing >Priority: Major > Labels: pull-request-available > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33863) Compressed Operator state restore failed
[ https://issues.apache.org/jira/browse/FLINK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798802#comment-17798802 ] Yanfei Lei commented on FLINK-33863: Is it because once the file stream has reached EOF, we can‘t use seek() to go back to an earlier position? > Compressed Operator state restore failed > > > Key: FLINK-33863 > URL: https://issues.apache.org/jira/browse/FLINK-33863 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Ruibin Xing >Priority: Major > Labels: pull-request-available > > We encountered an issue when using Flink 1.18.0. Our job enabled Snapshot > Compression and used multiple operator states and broadcast states in an > operator. When recovering Operator State from a Savepoint, the following > error occurred: "org.xerial.snappy.SnappyFramedInputStream: encountered EOF > while reading stream header." > After researching, I believe the error is due to Flink 1.18.0's support for > Snapshot Compression on Operator State (see > https://issues.apache.org/jira/browse/FLINK-30113 ). When writing a > Savepoint, SnappyFramedInputStream adds a header to the beginning of the > data. When recovering Operator State from a Savepoint, > SnappyFramedInputStream verifies the header from the beginning of the data. > Currently, when recovering Operator State with Snapshot Compression enabled, > the logic is as follows: > For each OperatorStateHandle: > 1. Verify if the current Savepoint stream's offset is the Snappy header. > 2. Seek to the state's start offset. > 3. Read the state's data and finally seek to the state's end offset. > (See: > [https://github.com/apache/flink/blob/ef2b626d67147797e992ec3b338bafdb4e5ab1c7/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java#L172] > ) > Furthermore, when there are multiple Operator States, they are not sorted > according to the Operator State's offset. The broadcast states will always be > written to the end of the savepoint. However when reading from savepoint, > there are no guarantee that broadcast states will be read at last. > Therefore, if the Operator States are out of order and the final offset is > recovered first, the Savepoint stream will be seeked to the end, resulting in > an EOF error. > I propose a solution: sort the OperatorStateHandle by offset and then recover > the Operator State in order. After testing, this approach resolves the issue. > I will submit a PR. This is my first time contributing code, so any help is > really appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)