[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-09-08 Thread Seth Wiesman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192368#comment-17192368
 ] 

Seth Wiesman commented on FLINK-14942:
--

merged in master: 597f5027c5b0277a80448f988c11f314449d270f

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.11.0
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Blocker
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-09-08 Thread Seth Wiesman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192239#comment-17192239
 ] 

Seth Wiesman commented on FLINK-14942:
--

I agree, will only merge this into 1.12. 

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.11.0
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Blocker
>  Labels: pull-request-available, usability
> Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-09-07 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191931#comment-17191931
 ] 

Zhu Zhu commented on FLINK-14942:
-

[~qinjunjerry] from what I know, shallow copy is not new in 1.11. It has been 
there for quite a long time (at least since 1.9, see 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#modifying-savepoints).
So I agree with [~rmetzger] that it would be better to treat it as a new 
feature and add it in 1.12. 
Or at least it does not need to be a blocker for 1.11.2, which is a bugfix 
release.
I will remove 1.11.x from the fix versions for now. We can add it back if there 
is consensus to port it back to 1.11 in further discussion.

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.11.0
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Blocker
>  Labels: pull-request-available, usability
> Fix For: 1.12.0, 1.11.2
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-09-07 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191506#comment-17191506
 ] 

Jun Qin commented on FLINK-14942:
-

I am not sure... On the other hand, the current functionality is broken... 
[~sjwiesman] What you think?

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.11.0
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Blocker
>  Labels: pull-request-available, usability
> Fix For: 1.12.0, 1.11.2
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-09-07 Thread Robert Metzger (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191503#comment-17191503
 ] 

Robert Metzger commented on FLINK-14942:


It seems that this is not really a bugfix, but a new feature? Can we downgrade 
the priority to "Critical" and add it only as a new feature to 1.12 (not adding 
it to 1.11.2). For the system stability, we are usually not adding new features 
to minor releases.

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.11.0
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Blocker
>  Labels: pull-request-available, usability
> Fix For: 1.12.0, 1.11.2
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-09-06 Thread Zhu Zhu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17191481#comment-17191481
 ] 

Zhu Zhu commented on FLINK-14942:
-

Hi [~sjwiesman][~qinjunjerry], just to confirm whether we need to wait for this 
improvement before creating releasing 1.11.2 RC1?
I'm asking this because the deadline is later today, and this item is an 
improvement.

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.11.0
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Blocker
>  Labels: pull-request-available, usability
> Fix For: 1.12.0, 1.11.2
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-09-02 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17189579#comment-17189579
 ] 

Jun Qin commented on FLINK-14942:
-

Created PR: https://github.com/apache/flink/pull/13309

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-08-12 Thread Jun Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17176294#comment-17176294
 ] 

Jun Qin commented on FLINK-14942:
-

I have doubled checked, when you creating a new savepoint from an existing 
savepoint, if the state for an operator in the existing savepoint is not 
touched (i.e., removed/modified), the SavepointMetadata of the new savepoint 
will contain the same path as the original metadata which is a relative path in 
Flink 1.11. When you try to restore a job from the new savepoint and load the 
state of those untouched operators, it will fail with:
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from 
any of the 1 provided restore options.Caused by: 
org.apache.flink.util.FlinkException: Could not restore keyed state backend for 
KeyedProcessOperator_33e978864e75b8b6137396c7b1f7711d_(1/1) from any of the 1 
provided restore options. at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
 ... 9 moreCaused by: org.apache.flink.runtime.state.BackendBuildingException: 
Failed when trying to restore heap backend at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116)
 at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347)
 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ... 11 moreCaused by: java.io.FileNotFoundException: 
/path/to/new/savepoint/a623a314-9dd1-4d60-bc8f-d56816f55f03 (No such file or 
directory) at java.base/java.io.FileInputStream.open0(Native Method) at 
java.base/java.io.FileInputStream.open(FileInputStream.java:219) at 
java.base/java.io.FileInputStream.(FileInputStream.java:157) at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
 at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142) 
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
 at 
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
 at 
org.apache.flink.runtime.state.KeyGroupsStateHandle.openInputStream(KeyGroupsStateHandle.java:118)
 at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:124)
 at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
 ... 15 more{code}
For those untouched operators, a potential solution, I think, is to use the 
read/write API of state processing API library to re-create the state of those 
operators in the new savepoint. [~sjwiesman], What do you think? 

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-08-11 Thread Seth Wiesman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175636#comment-17175636
 ] 

Seth Wiesman commented on FLINK-14942:
--

It's not a question of enabling it, the feature hasn't been implemented. I'm 
actually still not entirely sure how best to do it. I'll start to think through 
some ideas but I'm not sure when I'll have bandwidth. If you have any ideas 
please let me know. 

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2020-08-09 Thread Congxian Qiu(klion26) (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17173767#comment-17173767
 ] 

Congxian Qiu(klion26) commented on FLINK-14942:
---

After savepoint relocatable, I think we need to make a deep copy default for 
StateProcessing API now. because we assume that all the data and metadata are 
in the same directory.

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Reporter: Jun Qin
>Assignee: Jun Qin
>Priority: Major
>  Labels: usability
> Fix For: 1.12.0
>
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14942) State Processing API: add an option to make deep copy

2019-12-12 Thread Seth Wiesman (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994758#comment-16994758
 ] 

Seth Wiesman commented on FLINK-14942:
--

I've moved this from bug to improvement.

Yeah, this is definitely something that needs to be resolved and (at least in 
my head) is on the roadmap. I'm not sure when I will have the bandwidth to pick 
it up but if someone else wants to work on it I'm happy to talk about ideas.

> State Processing API: add an option to make deep copy
> -
>
> Key: FLINK-14942
> URL: https://issues.apache.org/jira/browse/FLINK-14942
> Project: Flink
>  Issue Type: Improvement
>  Components: API / State Processor
>Affects Versions: 1.11.0
>Reporter: Jun Qin
>Priority: Major
>
> Current when a new savepoint is created based on a source savepoint, then 
> there are references in the new savepoint to the source savepoint. Here is 
> the [State Processing API 
> doc|https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html]
>  says: 
> bq. Note: When basing a new savepoint on existing state, the state processor 
> api makes a shallow copy of the pointers to the existing operators. This 
> means that both savepoints share state and one cannot be deleted without 
> corrupting the other!
> This JIRA is to request an option to have a deep copy (instead of shallow 
> copy) such that the new savepoint is self-contained. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)