[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192368#comment-17192368 ] Seth Wiesman commented on FLINK-14942: -- merged in master: 597f5027c5b0277a80448f988c11f314449d270f > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.11.0 >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Blocker > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192239#comment-17192239 ] Seth Wiesman commented on FLINK-14942: -- I agree, will only merge this into 1.12. > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.11.0 >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Blocker > Labels: pull-request-available, usability > Fix For: 1.12.0 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191931#comment-17191931 ] Zhu Zhu commented on FLINK-14942: - [~qinjunjerry] from what I know, shallow copy is not new in 1.11. It has been there for quite a long time (at least since 1.9, see https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#modifying-savepoints). So I agree with [~rmetzger] that it would be better to treat it as a new feature and add it in 1.12. Or at least it does not need to be a blocker for 1.11.2, which is a bugfix release. I will remove 1.11.x from the fix versions for now. We can add it back if there is consensus to port it back to 1.11 in further discussion. > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.11.0 >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Blocker > Labels: pull-request-available, usability > Fix For: 1.12.0, 1.11.2 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191506#comment-17191506 ] Jun Qin commented on FLINK-14942: - I am not sure... On the other hand, the current functionality is broken... [~sjwiesman] What you think? > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.11.0 >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Blocker > Labels: pull-request-available, usability > Fix For: 1.12.0, 1.11.2 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191503#comment-17191503 ] Robert Metzger commented on FLINK-14942: It seems that this is not really a bugfix, but a new feature? Can we downgrade the priority to "Critical" and add it only as a new feature to 1.12 (not adding it to 1.11.2). For the system stability, we are usually not adding new features to minor releases. > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.11.0 >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Blocker > Labels: pull-request-available, usability > Fix For: 1.12.0, 1.11.2 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191481#comment-17191481 ] Zhu Zhu commented on FLINK-14942: - Hi [~sjwiesman][~qinjunjerry], just to confirm whether we need to wait for this improvement before creating releasing 1.11.2 RC1? I'm asking this because the deadline is later today, and this item is an improvement. > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.11.0 >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Blocker > Labels: pull-request-available, usability > Fix For: 1.12.0, 1.11.2 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17189579#comment-17189579 ] Jun Qin commented on FLINK-14942: - Created PR: https://github.com/apache/flink/pull/13309 > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176294#comment-17176294 ] Jun Qin commented on FLINK-14942: - I have doubled checked, when you creating a new savepoint from an existing savepoint, if the state for an operator in the existing savepoint is not touched (i.e., removed/modified), the SavepointMetadata of the new savepoint will contain the same path as the original metadata which is a relative path in Flink 1.11. When you try to restore a job from the new savepoint and load the state of those untouched operators, it will fail with: {code:java} Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 provided restore options.Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 moreCaused by: java.io.FileNotFoundException: /path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or directory) at java.base/java.io.FileInputStream.open0(Native Method) at java.base/java.io.FileInputStream.open(FileInputStream.java:219) at java.base/java.io.FileInputStream.(FileInputStream.java:157) at org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85) at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) at org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118) at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 15 more{code} For those untouched operators, a potential solution, I think, is to use the read/write API of state processing API library to re-create the state of those operators in the new savepoint. [~sjwiesman], What do you think? > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175636#comment-17175636 ] Seth Wiesman commented on FLINK-14942: -- It's not a question of enabling it, the feature hasn't been implemented. I'm actually still not entirely sure how best to do it. I'll start to think through some ideas but I'm not sure when I'll have bandwidth. If you have any ideas please let me know. > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173767#comment-17173767 ] Congxian Qiu(klion26) commented on FLINK-14942: --- After savepoint relocatable, I think we need to make a deep copy default for StateProcessing API now. because we assume that all the data and metadata are in the same directory. > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Reporter: Jun Qin >Assignee: Jun Qin >Priority: Major > Labels: usability > Fix For: 1.12.0 > > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy
[ https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994758#comment-16994758 ] Seth Wiesman commented on FLINK-14942: -- I've moved this from bug to improvement. Yeah, this is definitely something that needs to be resolved and (at least in my head) is on the roadmap. I'm not sure when I will have the bandwidth to pick it up but if someone else wants to work on it I'm happy to talk about ideas. > State Processing API: add an option to make deep copy > - > > Key: FLINK-14942 > URL: https://issues.apache.org/jira/browse/FLINK-14942 > Project: Flink > Issue Type: Improvement > Components: API / State Processor >Affects Versions: 1.11.0 >Reporter: Jun Qin >Priority: Major > > Current when a new savepoint is created based on a source savepoint, then > there are references in the new savepoint to the source savepoint. Here is > the [State Processing API > doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html] > says: > bq. Note: When basing a new savepoint on existing state, the state processor > api makes a shallow copy of the pointers to the existing operators. This > means that both savepoints share state and one cannot be deleted without > corrupting the other! > This JIRA is to request an option to have a deep copy (instead of shallow > copy) such that the new savepoint is self-contained. -- This message was sent by Atlassian Jira (v8.3.4#803005)