[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566718#comment-17566718 ] Hangxiang Yu commented on FLINK-23143: -- Sure, It's public currently. > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Assignee: Hangxiang Yu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17566676#comment-17566676 ] Martijn Visser commented on FLINK-23143: Can we make sure that any design/Google doc is also made public or published on the Wiki? > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Assignee: Hangxiang Yu >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17539537#comment-17539537 ] Hangxiang Yu commented on FLINK-23143: -- I have updated the pr. > Does that mean postponing state migration until user access? I think ideally, state should be migrated before modifying it (i.e. on reading metadata records from changelog); otherwise, there might be data loss or exception when serializing state changes in RocksDB. WDYT? Currently, state migration may happen not only when user access, but also before modifing it as you said (considering materilization part may not be included in snapshot). > Besides that, what about updating TTL? If we return existing state, than TTL > settings won't be updated, right? Currently, ChangelogStateFactory will be disposed while finishing restore. So there is no state cache for ChangelogKeyedStateBackend and all TTL of states will be updated. > ChangelogBackend metaInfo and e.g. RocksDBBackend metaInfo don't have to be > the same; and the former shouldn't know how to create metaInfo for the latter. Sure, I agree. So I have change the methods (createInternalState and create) to add extra paramater. But I am not sure whether there is a better solution about interface change. > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534310#comment-17534310 ] Roman Khachatryan commented on FLINK-23143: --- > The added methods we implement just mock what we do when restore (from > rocksdb or hashmap). Yes, but we call them at most once on restore currently. > They will judge before adding new state. And judging will also works when > users access. In the PR (prototype), there is no such judgement, right? Do you mean it should be added? Does that mean postponing state migration until user access? I think ideally, state should be migrated before modifying it (i.e. on reading metadata records from changelog); otherwise, there might be data loss or exception when serializing state changes in RocksDB. WDYT? Besides that, what about updating TTL? If we return existing state, than TTL settings won't be updated, right? > Maybe we could introduce a new more universal class to wrap these > information, WDYT? In my opinion, the problem with RegisteredKeyValueStateBackendMetaInfo is not its type; but rather the fact that it should be created by the (nested) state backend, instead of being injected from the outside. ChangelogBackend metaInfo and e.g. RocksDBBackend metaInfo don't have to be the same; and the former shouldn't know how to create metaInfo for the latter. It looks like the [design document and considering to expose upgrade method|https://issues.apache.org/jira/browse/FLINK-23143?focusedCommentId=17529581&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17529581] would be helpful after all. > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534149#comment-17534149 ] Hangxiang Yu commented on FLINK-23143: -- > I like the simplicity of your solution, but I think the concern I expressed > above (about creation of multiple state objects) is valid. The added methods we implement just mock what we do when restore (from rocksdb or hashmap). They will judge before adding new state. And judging will also works when users access. (We don't implement them by createInternalState) So I think it's not a problem ? > Besides that, RegisteredKeyValueStateBackendMetaInfo seems implementation > detail of a state backend, so it shouldn't be exposed. You are right. Maybe we could introduce a new more universal class to wrap these information, WDYT? > Regarding the key serializer upgrade, I think it's [not > supported|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/] > currently and therefore is out of the scope of this ticket. I agree. But we haven't implement the logic of compatibility check for key serializer as you could see in rocksdb and heap. What I describe above about key serializer is the difficulty of implementing the logic of compatibility check for key serializer. It will also works even if we lack it. Do you think it doesn't matter ? > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17534026#comment-17534026 ] Roman Khachatryan commented on FLINK-23143: --- Thanks a lot for sharing the prototype. I like the simplicity of your solution, but I think the concern I expressed above (about creation of multiple state objects) is valid. Besides that, RegisteredKeyValueStateBackendMetaInfo seems implementation detail of a state backend, so it shouldn't be exposed. Regarding the key serializer upgrade, I think it's [not supported|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/] currently and therefore is out of the scope of this ticket. > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17533756#comment-17533756 ] Hangxiang Yu commented on FLINK-23143: -- [~roman] Sure, I just pushed my draft as you could see. Another problem is the key serializer: # If the materialization part is empty, the key serializer will not be updated/check-compatibility. But the key serializer will be used to deserialize state in non-materialization part which will cause exception if the key serializer have changed. # For the non-materialization part, we may need to store its key serializer when snapshot and read the key serializer before build the delegated keyed state backend and changelog keyed state backend. But in the solution, we may need to change the process of {{ChangelogBackendRestoreOperation#restore}} . Do you have any other better ideas about it ? Thanks a lot! > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Labels: pull-request-available > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529581#comment-17529581 ] Roman Khachatryan commented on FLINK-23143: --- Do you mind sharing your POC? IIRC, getOrCreateKeyedState of a nested backend is not called by the changelog backend. My concern was that calling createInternalState() twice creates two state objects (e.g. HeapValueState). Another concern is that besides of serializers, the state itself also needs to be migrated (in case of RocksDB). So maybe it makes sense to pull HeapKeyedStateBackend.tryRegisterStateTable and RocksDBKeyedStateBackend.tryRegisterKvStateInformation into AbstractKeyedStateBackend (or some interface) and call it directly from ChangelogKeyedStateBackend (delegation approach wouldn't be needed in this case). Or extract only the migration part, e.g. upgrade(StateDescriptor). Besides that, updating the TTL policy should also be taken into account. Do you mind creating a short design doc describing what needs to be upgraded, and one or more options of how to do it? > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529258#comment-17529258 ] Hangxiang Yu commented on FLINK-23143: -- Maybe I missed something. So what do you mean about "store wrapping objects in a new map keyed by state name" ? In the solution, If we don't add a new state, could we avoid updating _RegisteredKeyValueStateBackendMetaInfo_ which use an _EagerlyRegisteredStateSerializerProvider_ that don't support to register a new serializaer? I mean adds _getOrCreateKeyedState(RegisteredKeyValueStateBackendMetaInfo metaInfo, StateDescriptor stateDesc)_ to _KeyedStateBackend_ so Chaneglog could use the method instead of _getOrCreateKeyedState(TypeSerializer namespaceSerializer, StateDescriptor stateDescriptor)_ when recovery because the pre method will register _RegisteredKeyValueStateBackendMetaInfo_ which use an _LazilyRegisteredStateSerializerProvider._ The proposal is simple and works but it adds a extra methods to _KeyedStateBackend_ and other state backend have to implement it. I think if your solution could solve the question I mentioned above, maybe it will be better. > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525953#comment-17525953 ] Roman Khachatryan commented on FLINK-23143: --- Hi, [~masteryhx] . I meant an approach similar to [FunctionDelegationHelper|https://github.com/apache/flink/blob/cc6d1a27bafe38e76d6f9f0d480fdcb7e017be3a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java] from ChangelogKeyedStateBackend. But instead of function, it would hold and update the Serializer. No state (like ValueState) should be added. It's already used in FLINK-23252 IIUC. I didn't fully understood your proposal, could you share your POC? > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-23143) Support state migration
[ https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524708#comment-17524708 ] Hangxiang Yu commented on FLINK-23143: -- Hi, [~roman]. IIUC, we need to create and maintain a new state in _ChangelogKeyedStateBackend_ in your solution ? It may not work well in the case of switching changelog from enabled to disabled ? Another solution is to make changelog part use correct _RegisteredKeyValueStateBackendMetaInfo/RegisteredPriorityQueueStateBackendMetaInfo_ when recovery. (We create _RegisteredKeyValueStateBackendMetaInfo/RegisteredPriorityQueueStateBackendMetaInfo_ with _StateMetaInfoSnapshot_ in _ChangelogBackendLogApplier#restoreKvMetaData_ and _ChangelogBackendLogApplier#restorePqMetaData_ but not use them) But two new methods may be introduced in {_}KeyedStateBackend{_}.e.g.{_}create(RegisteredPriorityQueueStateBackendMetaInfo metaInfo).{_} I have tried to do a simple POC and it works. But it may not be a good idea to introduce some new methods in _KeyedStateBackend._ WDYT? > Support state migration > --- > > Key: FLINK-23143 > URL: https://issues.apache.org/jira/browse/FLINK-23143 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Roman Khachatryan >Priority: Minor > Fix For: 1.16.0 > > > ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during > recovery; on 1st user access, it doesn't update metadata nor migrate state > (as opposed to other backends). > > The proposed solution is to > # wrap serializers (and maybe other objects) in getOrCreateKeyedState > # store wrapping objects in a new map keyed by state name > # pass wrapped objects to delegatedBackend.createInternalState > # on 1st user access, lookup wrapper and upgrade its wrapped serializer > This should be done for both KV/PQ states. > > See also [https://github.com/apache/flink/pull/15420#discussion_r656934791] > > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.7#820007)