[jira] [Commented] (FLINK-23143) Support state migration

2022-07-14 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566718#comment-17566718
 ] 

Hangxiang Yu commented on FLINK-23143:
--

Sure, It's public currently.

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23143) Support state migration

2022-07-14 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17566676#comment-17566676
 ] 

Martijn Visser commented on FLINK-23143:


Can we make sure that any design/Google doc is also made public or published on 
the Wiki?

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Assignee: Hangxiang Yu
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-23143) Support state migration

2022-05-19 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17539537#comment-17539537
 ] 

Hangxiang Yu commented on FLINK-23143:
--

I have updated the pr.
 > Does that mean postponing state migration until user access?
I think ideally, state should be migrated before modifying it (i.e. on reading 
metadata records from changelog); otherwise, there might be data loss or 
exception when serializing state changes in RocksDB. WDYT?



Currently, state migration may happen not only when user access, but also 
before modifing it as you said (considering materilization part may not be 
included in snapshot).

> Besides that, what about updating TTL? If we return existing state, than TTL 
> settings won't be updated, right?

Currently, ChangelogStateFactory will be disposed while finishing restore. So 
there is no state cache for ChangelogKeyedStateBackend and all TTL of states 
will be updated.



> ChangelogBackend metaInfo and e.g. RocksDBBackend metaInfo don't have to be 
> the same; and the former shouldn't know how to create metaInfo for the latter.

Sure, I agree. So I have change the methods (createInternalState and create) to 
add extra paramater. But I am not sure whether there is a better solution about 
interface change.

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-05-10 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534310#comment-17534310
 ] 

Roman Khachatryan commented on FLINK-23143:
---

> The added methods we implement just mock what we do when restore (from 
> rocksdb or hashmap). 
Yes, but we call them at most once on restore currently.

> They will judge before adding new state. And judging will also works when 
> users access.
In the PR (prototype), there is no such judgement, right? Do you mean it should 
be added?

Does that mean postponing state migration until user access?
I think ideally, state should be migrated before modifying it (i.e. on reading 
metadata records from changelog); otherwise, there might be data loss or 
exception when serializing state changes in RocksDB. WDYT?

Besides that, what about updating TTL? If we return existing state, than TTL 
settings won't be updated, right?

> Maybe we could introduce a new more universal class to wrap these 
> information, WDYT?
In my opinion, the problem with RegisteredKeyValueStateBackendMetaInfo is not 
its type; but rather the fact that it should be created by the (nested) state 
backend, instead of being injected from the outside. 
ChangelogBackend metaInfo and e.g. RocksDBBackend metaInfo don't have to be the 
same; and the former shouldn't know how to create metaInfo for the latter.

It looks like the [design document and considering to expose upgrade 
method|https://issues.apache.org/jira/browse/FLINK-23143?focusedCommentId=17529581=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17529581]
 would be helpful after all.

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-05-09 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534149#comment-17534149
 ] 

Hangxiang Yu commented on FLINK-23143:
--

> I like the simplicity of your solution, but I think the concern I expressed 
> above (about creation of multiple state objects) is valid.
The added methods we implement just mock what we do when restore (from rocksdb 
or hashmap). 
They will judge before adding new state. And judging will also works when users 
access.
(We don't implement them by createInternalState)
So I think it's not a problem ?

> Besides that, RegisteredKeyValueStateBackendMetaInfo seems implementation 
> detail of a state backend, so it shouldn't be exposed.
You are right. Maybe we could introduce a new more universal class to wrap 
these information, WDYT?

> Regarding the key serializer upgrade, I think it's [not 
> supported|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/]
>  currently and therefore is out of the scope of this ticket.
I agree. But we haven't implement the logic of compatibility check for key 
serializer as you could see in rocksdb and heap. 
What I describe above about key serializer is the difficulty of implementing 
the logic of compatibility check for key serializer.
It will also works even if we lack it. Do you think it doesn't matter ?

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-05-09 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17534026#comment-17534026
 ] 

Roman Khachatryan commented on FLINK-23143:
---

Thanks a lot for sharing the prototype.
I like the simplicity of your solution, but I think the concern I expressed 
above (about creation of multiple state objects) is valid.
Besides that, RegisteredKeyValueStateBackendMetaInfo seems implementation 
detail of a state backend, so it shouldn't be exposed.

Regarding the key serializer upgrade, I think it's [not 
supported|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/]
 currently and therefore is out of the scope of this ticket.

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-05-09 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17533756#comment-17533756
 ] 

Hangxiang Yu commented on FLINK-23143:
--

[~roman] 
Sure, I just pushed my draft as you could see.

Another problem is the key serializer:
 # If the materialization part is empty, the key serializer will not be 
updated/check-compatibility. But the key serializer will be used to deserialize 
state in non-materialization part which will cause exception if the key 
serializer have changed.
 # For the non-materialization part, we may need to store its key serializer 
when snapshot and read the key serializer before build the delegated keyed 
state backend and changelog keyed state backend. But in the solution, we may 
need to change the process of {{ChangelogBackendRestoreOperation#restore}} .

Do you have any other better ideas about it ? Thanks a lot!

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-04-28 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17529581#comment-17529581
 ] 

Roman Khachatryan commented on FLINK-23143:
---

Do you mind sharing your POC?

IIRC, getOrCreateKeyedState of a nested backend is not called by the changelog 
backend.

 
My concern was that calling createInternalState() twice creates two state 
objects (e.g. HeapValueState).

Another concern is that besides of serializers, the state itself also needs to 
be migrated (in case of RocksDB). So maybe it makes sense to pull 
HeapKeyedStateBackend.tryRegisterStateTable and 
RocksDBKeyedStateBackend.tryRegisterKvStateInformation into 
AbstractKeyedStateBackend (or some interface) and call it directly from 
ChangelogKeyedStateBackend (delegation approach wouldn't be needed in this 
case). Or extract only the migration part, e.g. upgrade(StateDescriptor).

Besides that, updating the TTL policy should also be taken into account.

Do you mind creating a short design doc describing what needs to be upgraded, 
and one or more options of how to do it?

 

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-04-28 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17529258#comment-17529258
 ] 

Hangxiang Yu commented on FLINK-23143:
--

Maybe I missed something. So what do you mean about "store wrapping objects in 
a new map keyed by state name" ?

In the solution, If we don't add a new state, could we avoid updating 
_RegisteredKeyValueStateBackendMetaInfo_ which use an

_EagerlyRegisteredStateSerializerProvider_ that don't support to register a new 
serializaer?



I mean adds

_getOrCreateKeyedState(RegisteredKeyValueStateBackendMetaInfo metaInfo, 
StateDescriptor stateDesc)_

to _KeyedStateBackend_ so Chaneglog could use the method instead of 

_getOrCreateKeyedState(TypeSerializer namespaceSerializer, 
StateDescriptor stateDescriptor)_

when recovery because the pre method will register 
_RegisteredKeyValueStateBackendMetaInfo_ which use an 

_LazilyRegisteredStateSerializerProvider._

 

The proposal is simple and works but it adds a extra methods to 
_KeyedStateBackend_ and other state backend have to implement it. 

I think if your solution could solve the question I mentioned above, maybe it 
will be better.

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-04-21 Thread Roman Khachatryan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17525953#comment-17525953
 ] 

Roman Khachatryan commented on FLINK-23143:
---

Hi, [~masteryhx] .

I meant an approach similar to 
[FunctionDelegationHelper|https://github.com/apache/flink/blob/cc6d1a27bafe38e76d6f9f0d480fdcb7e017be3a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/restore/FunctionDelegationHelper.java]
 from ChangelogKeyedStateBackend. But instead of function, it would hold and 
update the Serializer. No state (like ValueState) should be added. It's already 
used in FLINK-23252 IIUC.

 

I didn't  fully understood your proposal, could you share your POC?

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-23143) Support state migration

2022-04-19 Thread Hangxiang Yu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524708#comment-17524708
 ] 

Hangxiang Yu commented on FLINK-23143:
--

Hi, [~roman].
IIUC, we need to create and maintain a new state in 
_ChangelogKeyedStateBackend_ in your solution ?
It may not work well in the case of switching changelog from enabled to 
disabled ?

Another solution is to make changelog part use correct 
_RegisteredKeyValueStateBackendMetaInfo/RegisteredPriorityQueueStateBackendMetaInfo_
 when recovery.

(We create 
_RegisteredKeyValueStateBackendMetaInfo/RegisteredPriorityQueueStateBackendMetaInfo_
 with _StateMetaInfoSnapshot_ in

_ChangelogBackendLogApplier#restoreKvMetaData_ and 
_ChangelogBackendLogApplier#restorePqMetaData_ but not use them)

But two new methods may be introduced in 
{_}KeyedStateBackend{_}.e.g.{_}create(RegisteredPriorityQueueStateBackendMetaInfo
 metaInfo).{_}

I have tried to do a simple POC and it works. But it may not be a good idea to 
introduce some new methods in _KeyedStateBackend._
WDYT?

> Support state migration
> ---
>
> Key: FLINK-23143
> URL: https://issues.apache.org/jira/browse/FLINK-23143
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Roman Khachatryan
>Priority: Minor
> Fix For: 1.16.0
>
>
> ChangelogKeyedStateBackend.getOrCreateKeyedState is currently used during 
> recovery; on 1st user access, it doesn't update metadata nor migrate state 
> (as opposed to other backends).
>  
> The proposed solution is to
>  # wrap serializers (and maybe other objects) in getOrCreateKeyedState
>  # store wrapping objects in a new map keyed by state name
>  # pass wrapped objects to delegatedBackend.createInternalState
>  # on 1st user access, lookup wrapper and upgrade its wrapped serializer
> This should be done for both KV/PQ states.
>  
> See also [https://github.com/apache/flink/pull/15420#discussion_r656934791]
>  
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)